Subversion Repositories qbpwcf-lib(archive)

Rev

Rev 915 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
1 liveuser 1
<?php
2
 
3
namespace React\EventLoop;
4
 
5
use Event;
6
use EventBase;
7
use React\EventLoop\Tick\FutureTickQueue;
8
use React\EventLoop\Tick\NextTickQueue;
9
use React\EventLoop\Timer\Timer;
10
use React\EventLoop\Timer\TimerInterface;
11
use SplObjectStorage;
12
 
13
/**
14
 * An ext-libevent based event-loop.
15
 */
16
class LibEventLoop implements LoopInterface
17
{
18
    const MICROSECONDS_PER_SECOND = 1000000;
19
 
20
    private $eventBase;
21
    private $nextTickQueue;
22
    private $futureTickQueue;
23
    private $timerCallback;
24
    private $timerEvents;
25
    private $streamCallback;
26
    private $streamEvents = [];
27
    private $streamFlags = [];
28
    private $readListeners = [];
29
    private $writeListeners = [];
30
    private $running;
31
 
32
    public function __construct()
33
    {
34
        $this->eventBase = event_base_new();
35
        $this->nextTickQueue = new NextTickQueue($this);
36
        $this->futureTickQueue = new FutureTickQueue($this);
37
        $this->timerEvents = new SplObjectStorage();
38
 
39
        $this->createTimerCallback();
40
        $this->createStreamCallback();
41
    }
42
 
43
    /**
44
     * {@inheritdoc}
45
     */
46
    public function addReadStream($stream, callable $listener)
47
    {
48
        $key = (int) $stream;
49
 
50
        if (!isset($this->readListeners[$key])) {
51
            $this->readListeners[$key] = $listener;
52
            $this->subscribeStreamEvent($stream, EV_READ);
53
        }
54
    }
55
 
56
    /**
57
     * {@inheritdoc}
58
     */
59
    public function addWriteStream($stream, callable $listener)
60
    {
61
        $key = (int) $stream;
62
 
63
        if (!isset($this->writeListeners[$key])) {
64
            $this->writeListeners[$key] = $listener;
65
            $this->subscribeStreamEvent($stream, EV_WRITE);
66
        }
67
    }
68
 
69
    /**
70
     * {@inheritdoc}
71
     */
72
    public function removeReadStream($stream)
73
    {
74
        $key = (int) $stream;
75
 
76
        if (isset($this->readListeners[$key])) {
77
            unset($this->readListeners[$key]);
78
            $this->unsubscribeStreamEvent($stream, EV_READ);
79
        }
80
    }
81
 
82
    /**
83
     * {@inheritdoc}
84
     */
85
    public function removeWriteStream($stream)
86
    {
87
        $key = (int) $stream;
88
 
89
        if (isset($this->writeListeners[$key])) {
90
            unset($this->writeListeners[$key]);
91
            $this->unsubscribeStreamEvent($stream, EV_WRITE);
92
        }
93
    }
94
 
95
    /**
96
     * {@inheritdoc}
97
     */
98
    public function removeStream($stream)
99
    {
100
        $key = (int) $stream;
101
 
102
        if (isset($this->streamEvents[$key])) {
103
            $event = $this->streamEvents[$key];
104
 
105
            event_del($event);
106
            event_free($event);
107
 
108
            unset(
109
                $this->streamFlags[$key],
110
                $this->streamEvents[$key],
111
                $this->readListeners[$key],
112
                $this->writeListeners[$key]
113
            );
114
        }
115
    }
116
 
117
    /**
118
     * {@inheritdoc}
119
     */
120
    public function addTimer($interval, callable $callback)
121
    {
122
        $timer = new Timer($this, $interval, $callback, false);
123
 
124
        $this->scheduleTimer($timer);
125
 
126
        return $timer;
127
    }
128
 
129
    /**
130
     * {@inheritdoc}
131
     */
132
    public function addPeriodicTimer($interval, callable $callback)
133
    {
134
        $timer = new Timer($this, $interval, $callback, true);
135
 
136
        $this->scheduleTimer($timer);
137
 
138
        return $timer;
139
    }
140
 
141
    /**
142
     * {@inheritdoc}
143
     */
144
    public function cancelTimer(TimerInterface $timer)
145
    {
146
        if ($this->isTimerActive($timer)) {
147
            $event = $this->timerEvents[$timer];
148
 
149
            event_del($event);
150
            event_free($event);
151
 
152
            $this->timerEvents->detach($timer);
153
        }
154
    }
155
 
156
    /**
157
     * {@inheritdoc}
158
     */
159
    public function isTimerActive(TimerInterface $timer)
160
    {
161
        return $this->timerEvents->contains($timer);
162
    }
163
 
164
    /**
165
     * {@inheritdoc}
166
     */
167
    public function nextTick(callable $listener)
168
    {
169
        $this->nextTickQueue->add($listener);
170
    }
171
 
172
    /**
173
     * {@inheritdoc}
174
     */
175
    public function futureTick(callable $listener)
176
    {
177
        $this->futureTickQueue->add($listener);
178
    }
179
 
180
    /**
181
     * {@inheritdoc}
182
     */
183
    public function tick()
184
    {
185
        $this->nextTickQueue->tick();
186
 
187
        $this->futureTickQueue->tick();
188
 
189
        event_base_loop($this->eventBase, EVLOOP_ONCE | EVLOOP_NONBLOCK);
190
    }
191
 
192
    /**
193
     * {@inheritdoc}
194
     */
195
    public function run()
196
    {
197
        $this->running = true;
198
 
199
        while ($this->running) {
200
            $this->nextTickQueue->tick();
201
 
202
            $this->futureTickQueue->tick();
203
 
204
            $flags = EVLOOP_ONCE;
205
            if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
206
                $flags |= EVLOOP_NONBLOCK;
207
            } elseif (!$this->streamEvents && !$this->timerEvents->count()) {
208
                break;
209
            }
210
 
211
            event_base_loop($this->eventBase, $flags);
212
        }
213
    }
214
 
215
    /**
216
     * {@inheritdoc}
217
     */
218
    public function stop()
219
    {
220
        $this->running = false;
221
    }
222
 
223
    /**
224
     * Schedule a timer for execution.
225
     *
226
     * @param TimerInterface $timer
227
     */
228
    private function scheduleTimer(TimerInterface $timer)
229
    {
230
        $this->timerEvents[$timer] = $event = event_timer_new();
231
 
232
        event_timer_set($event, $this->timerCallback, $timer);
233
        event_base_set($event, $this->eventBase);
234
        event_add($event, $timer->getInterval() * self::MICROSECONDS_PER_SECOND);
235
    }
236
 
237
    /**
238
     * Create a new ext-libevent event resource, or update the existing one.
239
     *
240
     * @param resource $stream
241
     * @param integer  $flag   EV_READ or EV_WRITE
242
     */
243
    private function subscribeStreamEvent($stream, $flag)
244
    {
245
        $key = (int) $stream;
246
 
247
        if (isset($this->streamEvents[$key])) {
248
            $event = $this->streamEvents[$key];
249
            $flags = $this->streamFlags[$key] |= $flag;
250
 
251
            event_del($event);
252
            event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);
253
        } else {
254
            $event = event_new();
255
 
256
            event_set($event, $stream, EV_PERSIST | $flag, $this->streamCallback);
257
            event_base_set($event, $this->eventBase);
258
 
259
            $this->streamEvents[$key] = $event;
260
            $this->streamFlags[$key] = $flag;
261
        }
262
 
263
        event_add($event);
264
    }
265
 
266
    /**
267
     * Update the ext-libevent event resource for this stream to stop listening to
268
     * the given event type, or remove it entirely if it's no longer needed.
269
     *
270
     * @param resource $stream
271
     * @param integer  $flag   EV_READ or EV_WRITE
272
     */
273
    private function unsubscribeStreamEvent($stream, $flag)
274
    {
275
        $key = (int) $stream;
276
 
277
        $flags = $this->streamFlags[$key] &= ~$flag;
278
 
279
        if (0 === $flags) {
280
            $this->removeStream($stream);
281
 
282
            return;
283
        }
284
 
285
        $event = $this->streamEvents[$key];
286
 
287
        event_del($event);
288
        event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);
289
        event_add($event);
290
    }
291
 
292
    /**
293
     * Create a callback used as the target of timer events.
294
     *
295
     * A reference is kept to the callback for the lifetime of the loop
296
     * to prevent "Cannot destroy active lambda function" fatal error from
297
     * the event extension.
298
     */
299
    private function createTimerCallback()
300
    {
301
        $this->timerCallback = function ($_, $__, $timer) {
302
            call_user_func($timer->getCallback(), $timer);
303
 
304
            // Timer already cancelled ...
305
            if (!$this->isTimerActive($timer)) {
306
                return;
307
 
308
            // Reschedule periodic timers ...
309
            } elseif ($timer->isPeriodic()) {
310
                event_add(
311
                    $this->timerEvents[$timer],
312
                    $timer->getInterval() * self::MICROSECONDS_PER_SECOND
313
                );
314
 
315
            // Clean-up one shot timers ...
316
            } else {
317
                $this->cancelTimer($timer);
318
            }
319
        };
320
    }
321
 
322
    /**
323
     * Create a callback used as the target of stream events.
324
     *
325
     * A reference is kept to the callback for the lifetime of the loop
326
     * to prevent "Cannot destroy active lambda function" fatal error from
327
     * the event extension.
328
     */
329
    private function createStreamCallback()
330
    {
331
        $this->streamCallback = function ($stream, $flags) {
332
            $key = (int) $stream;
333
 
334
            if (EV_READ === (EV_READ & $flags) && isset($this->readListeners[$key])) {
335
                call_user_func($this->readListeners[$key], $stream, $this);
336
            }
337
 
338
            if (EV_WRITE === (EV_WRITE & $flags) && isset($this->writeListeners[$key])) {
339
                call_user_func($this->writeListeners[$key], $stream, $this);
340
            }
341
        };
342
    }
343
}