Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\EventLoop;
4
 
5
use BadMethodCallException;
6
use Event;
7
use EventBase;
8
use React\EventLoop\Tick\FutureTickQueue;
9
use React\EventLoop\Timer\Timer;
10
use SplObjectStorage;
11
 
12
/**
13
 * An `ext-libevent` based event loop.
14
 *
15
 * This uses the [`libevent` PECL extension](https://pecl.php.net/package/libevent).
16
 * `libevent` itself supports a number of system-specific backends (epoll, kqueue).
17
 *
18
 * This event loop does only work with PHP 5.
19
 * An [unofficial update](https://github.com/php/pecl-event-libevent/pull/2) for
20
 * PHP 7 does exist, but it is known to cause regular crashes due to `SEGFAULT`s.
21
 * To reiterate: Using this event loop on PHP 7 is not recommended.
22
 * Accordingly, the [`Factory`](#factory) will not try to use this event loop on
23
 * PHP 7.
24
 *
25
 * This event loop is known to trigger a readable listener only if
26
 * the stream *becomes* readable (edge-triggered) and may not trigger if the
27
 * stream has already been readable from the beginning.
28
 * This also implies that a stream may not be recognized as readable when data
29
 * is still left in PHP's internal stream buffers.
30
 * As such, it's recommended to use `stream_set_read_buffer($stream, 0);`
31
 * to disable PHP's internal read buffer in this case.
32
 * See also [`addReadStream()`](#addreadstream) for more details.
33
 *
34
 * @link https://pecl.php.net/package/libevent
35
 */
36
final class ExtLibeventLoop implements LoopInterface
37
{
38
    /** @internal */
39
    const MICROSECONDS_PER_SECOND = 1000000;
40
 
41
    private $eventBase;
42
    private $futureTickQueue;
43
    private $timerCallback;
44
    private $timerEvents;
45
    private $streamCallback;
46
    private $readEvents = array();
47
    private $writeEvents = array();
48
    private $readListeners = array();
49
    private $writeListeners = array();
50
    private $running;
51
    private $signals;
52
    private $signalEvents = array();
53
 
54
    public function __construct()
55
    {
56
        if (!\function_exists('event_base_new')) {
57
            throw new BadMethodCallException('Cannot create ExtLibeventLoop, ext-libevent extension missing');
58
        }
59
 
60
        $this->eventBase = \event_base_new();
61
        $this->futureTickQueue = new FutureTickQueue();
62
        $this->timerEvents = new SplObjectStorage();
63
        $this->signals = new SignalsHandler();
64
 
65
        $this->createTimerCallback();
66
        $this->createStreamCallback();
67
    }
68
 
69
    public function addReadStream($stream, $listener)
70
    {
71
        $key = (int) $stream;
72
        if (isset($this->readListeners[$key])) {
73
            return;
74
        }
75
 
76
        $event = \event_new();
77
        \event_set($event, $stream, \EV_PERSIST | \EV_READ, $this->streamCallback);
78
        \event_base_set($event, $this->eventBase);
79
        \event_add($event);
80
 
81
        $this->readEvents[$key] = $event;
82
        $this->readListeners[$key] = $listener;
83
    }
84
 
85
    public function addWriteStream($stream, $listener)
86
    {
87
        $key = (int) $stream;
88
        if (isset($this->writeListeners[$key])) {
89
            return;
90
        }
91
 
92
        $event = \event_new();
93
        \event_set($event, $stream, \EV_PERSIST | \EV_WRITE, $this->streamCallback);
94
        \event_base_set($event, $this->eventBase);
95
        \event_add($event);
96
 
97
        $this->writeEvents[$key] = $event;
98
        $this->writeListeners[$key] = $listener;
99
    }
100
 
101
    public function removeReadStream($stream)
102
    {
103
        $key = (int) $stream;
104
 
105
        if (isset($this->readListeners[$key])) {
106
            $event = $this->readEvents[$key];
107
            \event_del($event);
108
            \event_free($event);
109
 
110
            unset(
111
                $this->readEvents[$key],
112
                $this->readListeners[$key]
113
            );
114
        }
115
    }
116
 
117
    public function removeWriteStream($stream)
118
    {
119
        $key = (int) $stream;
120
 
121
        if (isset($this->writeListeners[$key])) {
122
            $event = $this->writeEvents[$key];
123
            \event_del($event);
124
            \event_free($event);
125
 
126
            unset(
127
                $this->writeEvents[$key],
128
                $this->writeListeners[$key]
129
            );
130
        }
131
    }
132
 
133
    public function addTimer($interval, $callback)
134
    {
135
        $timer = new Timer($interval, $callback, false);
136
 
137
        $this->scheduleTimer($timer);
138
 
139
        return $timer;
140
    }
141
 
142
    public function addPeriodicTimer($interval, $callback)
143
    {
144
        $timer = new Timer($interval, $callback, true);
145
 
146
        $this->scheduleTimer($timer);
147
 
148
        return $timer;
149
    }
150
 
151
    public function cancelTimer(TimerInterface $timer)
152
    {
153
        if ($this->timerEvents->contains($timer)) {
154
            $event = $this->timerEvents[$timer];
155
            \event_del($event);
156
            \event_free($event);
157
 
158
            $this->timerEvents->detach($timer);
159
        }
160
    }
161
 
162
    public function futureTick($listener)
163
    {
164
        $this->futureTickQueue->add($listener);
165
    }
166
 
167
    public function addSignal($signal, $listener)
168
    {
169
        $this->signals->add($signal, $listener);
170
 
171
        if (!isset($this->signalEvents[$signal])) {
172
            $this->signalEvents[$signal] = \event_new();
173
            \event_set($this->signalEvents[$signal], $signal, \EV_PERSIST | \EV_SIGNAL, array($this->signals, 'call'));
174
            \event_base_set($this->signalEvents[$signal], $this->eventBase);
175
            \event_add($this->signalEvents[$signal]);
176
        }
177
    }
178
 
179
    public function removeSignal($signal, $listener)
180
    {
181
        $this->signals->remove($signal, $listener);
182
 
183
        if (isset($this->signalEvents[$signal]) && $this->signals->count($signal) === 0) {
184
            \event_del($this->signalEvents[$signal]);
185
            \event_free($this->signalEvents[$signal]);
186
            unset($this->signalEvents[$signal]);
187
        }
188
    }
189
 
190
    public function run()
191
    {
192
        $this->running = true;
193
 
194
        while ($this->running) {
195
            $this->futureTickQueue->tick();
196
 
197
            $flags = \EVLOOP_ONCE;
198
            if (!$this->running || !$this->futureTickQueue->isEmpty()) {
199
                $flags |= \EVLOOP_NONBLOCK;
200
            } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count() && $this->signals->isEmpty()) {
201
                break;
202
            }
203
 
204
            \event_base_loop($this->eventBase, $flags);
205
        }
206
    }
207
 
208
    public function stop()
209
    {
210
        $this->running = false;
211
    }
212
 
213
    /**
214
     * Schedule a timer for execution.
215
     *
216
     * @param TimerInterface $timer
217
     */
218
    private function scheduleTimer(TimerInterface $timer)
219
    {
220
        $this->timerEvents[$timer] = $event = \event_timer_new();
221
 
222
        \event_timer_set($event, $this->timerCallback, $timer);
223
        \event_base_set($event, $this->eventBase);
224
        \event_add($event, $timer->getInterval() * self::MICROSECONDS_PER_SECOND);
225
    }
226
 
227
    /**
228
     * Create a callback used as the target of timer events.
229
     *
230
     * A reference is kept to the callback for the lifetime of the loop
231
     * to prevent "Cannot destroy active lambda function" fatal error from
232
     * the event extension.
233
     */
234
    private function createTimerCallback()
235
    {
236
        $that = $this;
237
        $timers = $this->timerEvents;
238
        $this->timerCallback = function ($_, $__, $timer) use ($timers, $that) {
239
            \call_user_func($timer->getCallback(), $timer);
240
 
241
            // Timer already cancelled ...
242
            if (!$timers->contains($timer)) {
243
                return;
244
            }
245
 
246
            // Reschedule periodic timers ...
247
            if ($timer->isPeriodic()) {
248
                \event_add(
249
                    $timers[$timer],
250
                    $timer->getInterval() * ExtLibeventLoop::MICROSECONDS_PER_SECOND
251
                );
252
 
253
            // Clean-up one shot timers ...
254
            } else {
255
                $that->cancelTimer($timer);
256
            }
257
        };
258
    }
259
 
260
    /**
261
     * Create a callback used as the target of stream events.
262
     *
263
     * A reference is kept to the callback for the lifetime of the loop
264
     * to prevent "Cannot destroy active lambda function" fatal error from
265
     * the event extension.
266
     */
267
    private function createStreamCallback()
268
    {
269
        $read =& $this->readListeners;
270
        $write =& $this->writeListeners;
271
        $this->streamCallback = function ($stream, $flags) use (&$read, &$write) {
272
            $key = (int) $stream;
273
 
274
            if (\EV_READ === (\EV_READ & $flags) && isset($read[$key])) {
275
                \call_user_func($read[$key], $stream);
276
            }
277
 
278
            if (\EV_WRITE === (\EV_WRITE & $flags) && isset($write[$key])) {
279
                \call_user_func($write[$key], $stream);
280
            }
281
        };
282
    }
283
}