Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\EventLoop;
4
 
5
use BadMethodCallException;
6
use Event;
7
use EventBase;
8
use React\EventLoop\Tick\FutureTickQueue;
9
use React\EventLoop\Timer\Timer;
10
use SplObjectStorage;
11
 
12
/**
13
 * An `ext-event` based event loop.
14
 *
15
 * This uses the [`event` PECL extension](https://pecl.php.net/package/event).
16
 * It supports the same backends as libevent.
17
 *
18
 * This loop is known to work with PHP 5.4 through PHP 7+.
19
 *
20
 * @link https://pecl.php.net/package/event
21
 */
22
final class ExtEventLoop implements LoopInterface
23
{
24
    private $eventBase;
25
    private $futureTickQueue;
26
    private $timerCallback;
27
    private $timerEvents;
28
    private $streamCallback;
29
    private $readEvents = array();
30
    private $writeEvents = array();
31
    private $readListeners = array();
32
    private $writeListeners = array();
33
    private $readRefs = array();
34
    private $writeRefs = array();
35
    private $running;
36
    private $signals;
37
    private $signalEvents = array();
38
 
39
    public function __construct()
40
    {
41
        if (!\class_exists('EventBase', false)) {
42
            throw new BadMethodCallException('Cannot create ExtEventLoop, ext-event extension missing');
43
        }
44
 
45
        // support arbitrary file descriptors and not just sockets
46
        // Windows only has limited file descriptor support, so do not require this (will fail otherwise)
47
        // @link http://www.wangafu.net/~nickm/libevent-book/Ref2_eventbase.html#_setting_up_a_complicated_event_base
48
        $config = new \EventConfig();
49
        if (\DIRECTORY_SEPARATOR !== '\\') {
50
            $config->requireFeatures(\EventConfig::FEATURE_FDS);
51
        }
52
 
53
        $this->eventBase = new EventBase($config);
54
        $this->futureTickQueue = new FutureTickQueue();
55
        $this->timerEvents = new SplObjectStorage();
56
        $this->signals = new SignalsHandler();
57
 
58
        $this->createTimerCallback();
59
        $this->createStreamCallback();
60
    }
61
 
62
    public function __destruct()
63
    {
64
        // explicitly clear all references to Event objects to prevent SEGFAULTs on Windows
65
        foreach ($this->timerEvents as $timer) {
66
            $this->timerEvents->detach($timer);
67
        }
68
 
69
        $this->readEvents = array();
70
        $this->writeEvents = array();
71
    }
72
 
73
    public function addReadStream($stream, $listener)
74
    {
75
        $key = (int) $stream;
76
        if (isset($this->readListeners[$key])) {
77
            return;
78
        }
79
 
80
        $event = new Event($this->eventBase, $stream, Event::PERSIST | Event::READ, $this->streamCallback);
81
        $event->add();
82
        $this->readEvents[$key] = $event;
83
        $this->readListeners[$key] = $listener;
84
 
85
        // ext-event does not increase refcount on stream resources for PHP 7+
86
        // manually keep track of stream resource to prevent premature garbage collection
87
        if (\PHP_VERSION_ID >= 70000) {
88
            $this->readRefs[$key] = $stream;
89
        }
90
    }
91
 
92
    public function addWriteStream($stream, $listener)
93
    {
94
        $key = (int) $stream;
95
        if (isset($this->writeListeners[$key])) {
96
            return;
97
        }
98
 
99
        $event = new Event($this->eventBase, $stream, Event::PERSIST | Event::WRITE, $this->streamCallback);
100
        $event->add();
101
        $this->writeEvents[$key] = $event;
102
        $this->writeListeners[$key] = $listener;
103
 
104
        // ext-event does not increase refcount on stream resources for PHP 7+
105
        // manually keep track of stream resource to prevent premature garbage collection
106
        if (\PHP_VERSION_ID >= 70000) {
107
            $this->writeRefs[$key] = $stream;
108
        }
109
    }
110
 
111
    public function removeReadStream($stream)
112
    {
113
        $key = (int) $stream;
114
 
115
        if (isset($this->readEvents[$key])) {
116
            $this->readEvents[$key]->free();
117
            unset(
118
                $this->readEvents[$key],
119
                $this->readListeners[$key],
120
                $this->readRefs[$key]
121
            );
122
        }
123
    }
124
 
125
    public function removeWriteStream($stream)
126
    {
127
        $key = (int) $stream;
128
 
129
        if (isset($this->writeEvents[$key])) {
130
            $this->writeEvents[$key]->free();
131
            unset(
132
                $this->writeEvents[$key],
133
                $this->writeListeners[$key],
134
                $this->writeRefs[$key]
135
            );
136
        }
137
    }
138
 
139
    public function addTimer($interval, $callback)
140
    {
141
        $timer = new Timer($interval, $callback, false);
142
 
143
        $this->scheduleTimer($timer);
144
 
145
        return $timer;
146
    }
147
 
148
    public function addPeriodicTimer($interval, $callback)
149
    {
150
        $timer = new Timer($interval, $callback, true);
151
 
152
        $this->scheduleTimer($timer);
153
 
154
        return $timer;
155
    }
156
 
157
    public function cancelTimer(TimerInterface $timer)
158
    {
159
        if ($this->timerEvents->contains($timer)) {
160
            $this->timerEvents[$timer]->free();
161
            $this->timerEvents->detach($timer);
162
        }
163
    }
164
 
165
    public function futureTick($listener)
166
    {
167
        $this->futureTickQueue->add($listener);
168
    }
169
 
170
    public function addSignal($signal, $listener)
171
    {
172
        $this->signals->add($signal, $listener);
173
 
174
        if (!isset($this->signalEvents[$signal])) {
175
            $this->signalEvents[$signal] = Event::signal($this->eventBase, $signal, array($this->signals, 'call'));
176
            $this->signalEvents[$signal]->add();
177
        }
178
    }
179
 
180
    public function removeSignal($signal, $listener)
181
    {
182
        $this->signals->remove($signal, $listener);
183
 
184
        if (isset($this->signalEvents[$signal]) && $this->signals->count($signal) === 0) {
185
            $this->signalEvents[$signal]->free();
186
            unset($this->signalEvents[$signal]);
187
        }
188
    }
189
 
190
    public function run()
191
    {
192
        $this->running = true;
193
 
194
        while ($this->running) {
195
            $this->futureTickQueue->tick();
196
 
197
            $flags = EventBase::LOOP_ONCE;
198
            if (!$this->running || !$this->futureTickQueue->isEmpty()) {
199
                $flags |= EventBase::LOOP_NONBLOCK;
200
            } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count() && $this->signals->isEmpty()) {
201
                break;
202
            }
203
 
204
            $this->eventBase->loop($flags);
205
        }
206
    }
207
 
208
    public function stop()
209
    {
210
        $this->running = false;
211
    }
212
 
213
    /**
214
     * Schedule a timer for execution.
215
     *
216
     * @param TimerInterface $timer
217
     */
218
    private function scheduleTimer(TimerInterface $timer)
219
    {
220
        $flags = Event::TIMEOUT;
221
 
222
        if ($timer->isPeriodic()) {
223
            $flags |= Event::PERSIST;
224
        }
225
 
226
        $event = new Event($this->eventBase, -1, $flags, $this->timerCallback, $timer);
227
        $this->timerEvents[$timer] = $event;
228
 
229
        $event->add($timer->getInterval());
230
    }
231
 
232
    /**
233
     * Create a callback used as the target of timer events.
234
     *
235
     * A reference is kept to the callback for the lifetime of the loop
236
     * to prevent "Cannot destroy active lambda function" fatal error from
237
     * the event extension.
238
     */
239
    private function createTimerCallback()
240
    {
241
        $timers = $this->timerEvents;
242
        $this->timerCallback = function ($_, $__, $timer) use ($timers) {
243
            \call_user_func($timer->getCallback(), $timer);
244
 
245
            if (!$timer->isPeriodic() && $timers->contains($timer)) {
246
                $this->cancelTimer($timer);
247
            }
248
        };
249
    }
250
 
251
    /**
252
     * Create a callback used as the target of stream events.
253
     *
254
     * A reference is kept to the callback for the lifetime of the loop
255
     * to prevent "Cannot destroy active lambda function" fatal error from
256
     * the event extension.
257
     */
258
    private function createStreamCallback()
259
    {
260
        $read =& $this->readListeners;
261
        $write =& $this->writeListeners;
262
        $this->streamCallback = function ($stream, $flags) use (&$read, &$write) {
263
            $key = (int) $stream;
264
 
265
            if (Event::READ === (Event::READ & $flags) && isset($read[$key])) {
266
                \call_user_func($read[$key], $stream);
267
            }
268
 
269
            if (Event::WRITE === (Event::WRITE & $flags) && isset($write[$key])) {
270
                \call_user_func($write[$key], $stream);
271
            }
272
        };
273
    }
274
}