Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\EventLoop;
4
 
5
use React\EventLoop\Tick\FutureTickQueue;
6
use React\EventLoop\Timer\Timer;
7
use SplObjectStorage;
8
 
9
/**
10
 * An `ext-uv` based event loop.
11
 *
12
 * This loop uses the [`uv` PECL extension](https://pecl.php.net/package/uv),
13
 * that provides an interface to `libuv` library.
14
 *
15
 * This loop is known to work with PHP 7+.
16
 *
17
 * @see https://github.com/bwoebi/php-uv
18
 */
19
final class ExtUvLoop implements LoopInterface
20
{
21
    private $uv;
22
    private $futureTickQueue;
23
    private $timers;
24
    private $streamEvents = array();
25
    private $readStreams = array();
26
    private $writeStreams = array();
27
    private $running;
28
    private $signals;
29
    private $signalEvents = array();
30
    private $streamListener;
31
 
32
    public function __construct()
33
    {
34
        if (!\function_exists('uv_loop_new')) {
35
            throw new \BadMethodCallException('Cannot create LibUvLoop, ext-uv extension missing');
36
        }
37
 
38
        $this->uv = \uv_loop_new();
39
        $this->futureTickQueue = new FutureTickQueue();
40
        $this->timers = new SplObjectStorage();
41
        $this->streamListener = $this->createStreamListener();
42
        $this->signals = new SignalsHandler();
43
    }
44
 
45
    /**
46
     * Returns the underlying ext-uv event loop. (Internal ReactPHP use only.)
47
     *
48
     * @internal
49
     *
50
     * @return resource
51
     */
52
    public function getUvLoop()
53
    {
54
        return $this->uv;
55
    }
56
 
57
    /**
58
     * {@inheritdoc}
59
     */
60
    public function addReadStream($stream, $listener)
61
    {
62
        if (isset($this->readStreams[(int) $stream])) {
63
            return;
64
        }
65
 
66
        $this->readStreams[(int) $stream] = $listener;
67
        $this->addStream($stream);
68
    }
69
 
70
    /**
71
     * {@inheritdoc}
72
     */
73
    public function addWriteStream($stream, $listener)
74
    {
75
        if (isset($this->writeStreams[(int) $stream])) {
76
            return;
77
        }
78
 
79
        $this->writeStreams[(int) $stream] = $listener;
80
        $this->addStream($stream);
81
    }
82
 
83
    /**
84
     * {@inheritdoc}
85
     */
86
    public function removeReadStream($stream)
87
    {
88
        if (!isset($this->streamEvents[(int) $stream])) {
89
            return;
90
        }
91
 
92
        unset($this->readStreams[(int) $stream]);
93
        $this->removeStream($stream);
94
    }
95
 
96
    /**
97
     * {@inheritdoc}
98
     */
99
    public function removeWriteStream($stream)
100
    {
101
        if (!isset($this->streamEvents[(int) $stream])) {
102
            return;
103
        }
104
 
105
        unset($this->writeStreams[(int) $stream]);
106
        $this->removeStream($stream);
107
    }
108
 
109
    /**
110
     * {@inheritdoc}
111
     */
112
    public function addTimer($interval, $callback)
113
    {
114
        $timer = new Timer($interval, $callback, false);
115
 
116
        $that = $this;
117
        $timers = $this->timers;
118
        $callback = function () use ($timer, $timers, $that) {
119
            \call_user_func($timer->getCallback(), $timer);
120
 
121
            if ($timers->contains($timer)) {
122
                $that->cancelTimer($timer);
123
            }
124
        };
125
 
126
        $event = \uv_timer_init($this->uv);
127
        $this->timers->attach($timer, $event);
128
        \uv_timer_start(
129
            $event,
130
            $this->convertFloatSecondsToMilliseconds($interval),
131
            0,
132
            $callback
133
        );
134
 
135
        return $timer;
136
    }
137
 
138
    /**
139
     * {@inheritdoc}
140
     */
141
    public function addPeriodicTimer($interval, $callback)
142
    {
143
        $timer = new Timer($interval, $callback, true);
144
 
145
        $callback = function () use ($timer) {
146
            \call_user_func($timer->getCallback(), $timer);
147
        };
148
 
149
        $interval = $this->convertFloatSecondsToMilliseconds($interval);
150
        $event = \uv_timer_init($this->uv);
151
        $this->timers->attach($timer, $event);
152
        \uv_timer_start(
153
            $event,
154
            $interval,
155
            (int) $interval === 0 ? 1 : $interval,
156
            $callback
157
        );
158
 
159
        return $timer;
160
    }
161
 
162
    /**
163
     * {@inheritdoc}
164
     */
165
    public function cancelTimer(TimerInterface $timer)
166
    {
167
        if (isset($this->timers[$timer])) {
168
            @\uv_timer_stop($this->timers[$timer]);
169
            $this->timers->detach($timer);
170
        }
171
    }
172
 
173
    /**
174
     * {@inheritdoc}
175
     */
176
    public function futureTick($listener)
177
    {
178
        $this->futureTickQueue->add($listener);
179
    }
180
 
181
    public function addSignal($signal, $listener)
182
    {
183
        $this->signals->add($signal, $listener);
184
 
185
        if (!isset($this->signalEvents[$signal])) {
186
            $signals = $this->signals;
187
            $this->signalEvents[$signal] = \uv_signal_init($this->uv);
188
            \uv_signal_start($this->signalEvents[$signal], function () use ($signals, $signal) {
189
                $signals->call($signal);
190
            }, $signal);
191
        }
192
    }
193
 
194
    public function removeSignal($signal, $listener)
195
    {
196
        $this->signals->remove($signal, $listener);
197
 
198
        if (isset($this->signalEvents[$signal]) && $this->signals->count($signal) === 0) {
199
            \uv_signal_stop($this->signalEvents[$signal]);
200
            unset($this->signalEvents[$signal]);
201
        }
202
    }
203
 
204
    /**
205
     * {@inheritdoc}
206
     */
207
    public function run()
208
    {
209
        $this->running = true;
210
 
211
        while ($this->running) {
212
            $this->futureTickQueue->tick();
213
 
214
            $hasPendingCallbacks = !$this->futureTickQueue->isEmpty();
215
            $wasJustStopped = !$this->running;
216
            $nothingLeftToDo = !$this->readStreams
217
                && !$this->writeStreams
218
                && !$this->timers->count()
219
                && $this->signals->isEmpty();
220
 
221
            // Use UV::RUN_ONCE when there are only I/O events active in the loop and block until one of those triggers,
222
            // otherwise use UV::RUN_NOWAIT.
223
            // @link http://docs.libuv.org/en/v1.x/loop.html#c.uv_run
224
            $flags = \UV::RUN_ONCE;
225
            if ($wasJustStopped || $hasPendingCallbacks) {
226
                $flags = \UV::RUN_NOWAIT;
227
            } elseif ($nothingLeftToDo) {
228
                break;
229
            }
230
 
231
            \uv_run($this->uv, $flags);
232
        }
233
    }
234
 
235
    /**
236
     * {@inheritdoc}
237
     */
238
    public function stop()
239
    {
240
        $this->running = false;
241
    }
242
 
243
    private function addStream($stream)
244
    {
245
        if (!isset($this->streamEvents[(int) $stream])) {
246
            $this->streamEvents[(int)$stream] = \uv_poll_init_socket($this->uv, $stream);
247
        }
248
 
249
        if ($this->streamEvents[(int) $stream] !== false) {
250
            $this->pollStream($stream);
251
        }
252
    }
253
 
254
    private function removeStream($stream)
255
    {
256
        if (!isset($this->streamEvents[(int) $stream])) {
257
            return;
258
        }
259
 
260
        if (!isset($this->readStreams[(int) $stream])
261
            && !isset($this->writeStreams[(int) $stream])) {
262
            \uv_poll_stop($this->streamEvents[(int) $stream]);
263
            \uv_close($this->streamEvents[(int) $stream]);
264
            unset($this->streamEvents[(int) $stream]);
265
            return;
266
        }
267
 
268
        $this->pollStream($stream);
269
    }
270
 
271
    private function pollStream($stream)
272
    {
273
        if (!isset($this->streamEvents[(int) $stream])) {
274
            return;
275
        }
276
 
277
        $flags = 0;
278
        if (isset($this->readStreams[(int) $stream])) {
279
            $flags |= \UV::READABLE;
280
        }
281
 
282
        if (isset($this->writeStreams[(int) $stream])) {
283
            $flags |= \UV::WRITABLE;
284
        }
285
 
286
        \uv_poll_start($this->streamEvents[(int) $stream], $flags, $this->streamListener);
287
    }
288
 
289
    /**
290
     * Create a stream listener
291
     *
292
     * @return callable Returns a callback
293
     */
294
    private function createStreamListener()
295
    {
296
        $callback = function ($event, $status, $events, $stream) {
297
            // libuv automatically stops polling on error, re-enable polling to match other loop implementations
298
            if ($status !== 0) {
299
                $this->pollStream($stream);
300
 
301
                // libuv may report no events on error, but this should still invoke stream listeners to report closed connections
302
                // re-enable both readable and writable, correct listeners will be checked below anyway
303
                if ($events === 0) {
304
                    $events = \UV::READABLE | \UV::WRITABLE;
305
                }
306
            }
307
 
308
            if (isset($this->readStreams[(int) $stream]) && ($events & \UV::READABLE)) {
309
                \call_user_func($this->readStreams[(int) $stream], $stream);
310
            }
311
 
312
            if (isset($this->writeStreams[(int) $stream]) && ($events & \UV::WRITABLE)) {
313
                \call_user_func($this->writeStreams[(int) $stream], $stream);
314
            }
315
        };
316
 
317
        return $callback;
318
    }
319
 
320
    /**
321
     * @param float $interval
322
     * @return int
323
     */
324
    private function convertFloatSecondsToMilliseconds($interval)
325
    {
326
        if ($interval < 0) {
327
            return 0;
328
        }
329
 
330
        $maxValue = (int) (\PHP_INT_MAX / 1000);
331
        $intInterval = (int) $interval;
332
 
333
        if (($intInterval <= 0 && $interval > 1) || $intInterval >= $maxValue) {
334
            throw new \InvalidArgumentException(
335
                "Interval overflow, value must be lower than '{$maxValue}', but '{$interval}' passed."
336
            );
337
        }
338
 
339
        return (int) \floor($interval * 1000);
340
    }
341
}