Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\EventLoop;
4
 
5
use React\EventLoop\Tick\FutureTickQueue;
6
use React\EventLoop\Timer\Timer;
7
use React\EventLoop\Timer\Timers;
8
 
9
/**
10
 * A `stream_select()` based event loop.
11
 *
12
 * This uses the [`stream_select()`](https://www.php.net/manual/en/function.stream-select.php)
13
 * function and is the only implementation which works out of the box with PHP.
14
 *
15
 * This event loop works out of the box on PHP 5.4 through PHP 7+ and HHVM.
16
 * This means that no installation is required and this library works on all
17
 * platforms and supported PHP versions.
18
 * Accordingly, the [`Factory`](#factory) will use this event loop by default if
19
 * you do not install any of the event loop extensions listed below.
20
 *
21
 * Under the hood, it does a simple `select` system call.
22
 * This system call is limited to the maximum file descriptor number of
23
 * `FD_SETSIZE` (platform dependent, commonly 1024) and scales with `O(m)`
24
 * (`m` being the maximum file descriptor number passed).
25
 * This means that you may run into issues when handling thousands of streams
26
 * concurrently and you may want to look into using one of the alternative
27
 * event loop implementations listed below in this case.
28
 * If your use case is among the many common use cases that involve handling only
29
 * dozens or a few hundred streams at once, then this event loop implementation
30
 * performs really well.
31
 *
32
 * If you want to use signal handling (see also [`addSignal()`](#addsignal) below),
33
 * this event loop implementation requires `ext-pcntl`.
34
 * This extension is only available for Unix-like platforms and does not support
35
 * Windows.
36
 * It is commonly installed as part of many PHP distributions.
37
 * If this extension is missing (or you're running on Windows), signal handling is
38
 * not supported and throws a `BadMethodCallException` instead.
39
 *
40
 * This event loop is known to rely on wall-clock time to schedule future timers
41
 * when using any version before PHP 7.3, because a monotonic time source is
42
 * only available as of PHP 7.3 (`hrtime()`).
43
 * While this does not affect many common use cases, this is an important
44
 * distinction for programs that rely on a high time precision or on systems
45
 * that are subject to discontinuous time adjustments (time jumps).
46
 * This means that if you schedule a timer to trigger in 30s on PHP < 7.3 and
47
 * then adjust your system time forward by 20s, the timer may trigger in 10s.
48
 * See also [`addTimer()`](#addtimer) for more details.
49
 *
50
 * @link https://www.php.net/manual/en/function.stream-select.php
51
 */
52
final class StreamSelectLoop implements LoopInterface
53
{
54
    /** @internal */
55
    const MICROSECONDS_PER_SECOND = 1000000;
56
 
57
    private $futureTickQueue;
58
    private $timers;
59
    private $readStreams = array();
60
    private $readListeners = array();
61
    private $writeStreams = array();
62
    private $writeListeners = array();
63
    private $running;
64
    private $pcntl = false;
65
    private $pcntlPoll = false;
66
    private $signals;
67
 
68
    public function __construct()
69
    {
70
        $this->futureTickQueue = new FutureTickQueue();
71
        $this->timers = new Timers();
72
        $this->pcntl = \function_exists('pcntl_signal') && \function_exists('pcntl_signal_dispatch');
73
        $this->pcntlPoll = $this->pcntl && !\function_exists('pcntl_async_signals');
74
        $this->signals = new SignalsHandler();
75
 
76
        // prefer async signals if available (PHP 7.1+) or fall back to dispatching on each tick
77
        if ($this->pcntl && !$this->pcntlPoll) {
78
            \pcntl_async_signals(true);
79
        }
80
    }
81
 
82
    public function addReadStream($stream, $listener)
83
    {
84
        $key = (int) $stream;
85
 
86
        if (!isset($this->readStreams[$key])) {
87
            $this->readStreams[$key] = $stream;
88
            $this->readListeners[$key] = $listener;
89
        }
90
    }
91
 
92
    public function addWriteStream($stream, $listener)
93
    {
94
        $key = (int) $stream;
95
 
96
        if (!isset($this->writeStreams[$key])) {
97
            $this->writeStreams[$key] = $stream;
98
            $this->writeListeners[$key] = $listener;
99
        }
100
    }
101
 
102
    public function removeReadStream($stream)
103
    {
104
        $key = (int) $stream;
105
 
106
        unset(
107
            $this->readStreams[$key],
108
            $this->readListeners[$key]
109
        );
110
    }
111
 
112
    public function removeWriteStream($stream)
113
    {
114
        $key = (int) $stream;
115
 
116
        unset(
117
            $this->writeStreams[$key],
118
            $this->writeListeners[$key]
119
        );
120
    }
121
 
122
    public function addTimer($interval, $callback)
123
    {
124
        $timer = new Timer($interval, $callback, false);
125
 
126
        $this->timers->add($timer);
127
 
128
        return $timer;
129
    }
130
 
131
    public function addPeriodicTimer($interval, $callback)
132
    {
133
        $timer = new Timer($interval, $callback, true);
134
 
135
        $this->timers->add($timer);
136
 
137
        return $timer;
138
    }
139
 
140
    public function cancelTimer(TimerInterface $timer)
141
    {
142
        $this->timers->cancel($timer);
143
    }
144
 
145
    public function futureTick($listener)
146
    {
147
        $this->futureTickQueue->add($listener);
148
    }
149
 
150
    public function addSignal($signal, $listener)
151
    {
152
        if ($this->pcntl === false) {
153
            throw new \BadMethodCallException('Event loop feature "signals" isn\'t supported by the "StreamSelectLoop"');
154
        }
155
 
156
        $first = $this->signals->count($signal) === 0;
157
        $this->signals->add($signal, $listener);
158
 
159
        if ($first) {
160
            \pcntl_signal($signal, array($this->signals, 'call'));
161
        }
162
    }
163
 
164
    public function removeSignal($signal, $listener)
165
    {
166
        if (!$this->signals->count($signal)) {
167
            return;
168
        }
169
 
170
        $this->signals->remove($signal, $listener);
171
 
172
        if ($this->signals->count($signal) === 0) {
173
            \pcntl_signal($signal, \SIG_DFL);
174
        }
175
    }
176
 
177
    public function run()
178
    {
179
        $this->running = true;
180
 
181
        while ($this->running) {
182
            $this->futureTickQueue->tick();
183
 
184
            $this->timers->tick();
185
 
186
            // Future-tick queue has pending callbacks ...
187
            if (!$this->running || !$this->futureTickQueue->isEmpty()) {
188
                $timeout = 0;
189
 
190
            // There is a pending timer, only block until it is due ...
191
            } elseif ($scheduledAt = $this->timers->getFirst()) {
192
                $timeout = $scheduledAt - $this->timers->getTime();
193
                if ($timeout < 0) {
194
                    $timeout = 0;
195
                } else {
196
                    // Convert float seconds to int microseconds.
197
                    // Ensure we do not exceed maximum integer size, which may
198
                    // cause the loop to tick once every ~35min on 32bit systems.
199
                    $timeout *= self::MICROSECONDS_PER_SECOND;
200
                    $timeout = $timeout > \PHP_INT_MAX ? \PHP_INT_MAX : (int)$timeout;
201
                }
202
 
203
            // The only possible event is stream or signal activity, so wait forever ...
204
            } elseif ($this->readStreams || $this->writeStreams || !$this->signals->isEmpty()) {
205
                $timeout = null;
206
 
207
            // There's nothing left to do ...
208
            } else {
209
                break;
210
            }
211
 
212
            $this->waitForStreamActivity($timeout);
213
        }
214
    }
215
 
216
    public function stop()
217
    {
218
        $this->running = false;
219
    }
220
 
221
    /**
222
     * Wait/check for stream activity, or until the next timer is due.
223
     *
224
     * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
225
     */
226
    private function waitForStreamActivity($timeout)
227
    {
228
        $read  = $this->readStreams;
229
        $write = $this->writeStreams;
230
 
231
        $available = $this->streamSelect($read, $write, $timeout);
232
        if ($this->pcntlPoll) {
233
            \pcntl_signal_dispatch();
234
        }
235
        if (false === $available) {
236
            // if a system call has been interrupted,
237
            // we cannot rely on it's outcome
238
            return;
239
        }
240
 
241
        foreach ($read as $stream) {
242
            $key = (int) $stream;
243
 
244
            if (isset($this->readListeners[$key])) {
245
                \call_user_func($this->readListeners[$key], $stream);
246
            }
247
        }
248
 
249
        foreach ($write as $stream) {
250
            $key = (int) $stream;
251
 
252
            if (isset($this->writeListeners[$key])) {
253
                \call_user_func($this->writeListeners[$key], $stream);
254
            }
255
        }
256
    }
257
 
258
    /**
259
     * Emulate a stream_select() implementation that does not break when passed
260
     * empty stream arrays.
261
     *
262
     * @param array    $read    An array of read streams to select upon.
263
     * @param array    $write   An array of write streams to select upon.
264
     * @param int|null $timeout Activity timeout in microseconds, or null to wait forever.
265
     *
266
     * @return int|false The total number of streams that are ready for read/write.
267
     *     Can return false if stream_select() is interrupted by a signal.
268
     */
269
    private function streamSelect(array &$read, array &$write, $timeout)
270
    {
271
        if ($read || $write) {
272
            // We do not usually use or expose the `exceptfds` parameter passed to the underlying `select`.
273
            // However, Windows does not report failed connection attempts in `writefds` passed to `select` like most other platforms.
274
            // Instead, it uses `writefds` only for successful connection attempts and `exceptfds` for failed connection attempts.
275
            // We work around this by adding all sockets that look like a pending connection attempt to `exceptfds` automatically on Windows and merge it back later.
276
            // This ensures the public API matches other loop implementations across all platforms (see also test suite or rather test matrix).
277
            // Lacking better APIs, every write-only socket that has not yet read any data is assumed to be in a pending connection attempt state.
278
            // @link https://docs.microsoft.com/de-de/windows/win32/api/winsock2/nf-winsock2-select
279
            $except = null;
280
            if (\DIRECTORY_SEPARATOR === '\\') {
281
                $except = array();
282
                foreach ($write as $key => $socket) {
283
                    if (!isset($read[$key]) && @\ftell($socket) === 0) {
284
                        $except[$key] = $socket;
285
                    }
286
                }
287
            }
288
 
289
            // suppress warnings that occur, when stream_select is interrupted by a signal
290
            $ret = @\stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
291
 
292
            if ($except) {
293
                $write = \array_merge($write, $except);
294
            }
295
            return $ret;
296
        }
297
 
298
        if ($timeout > 0) {
299
            \usleep($timeout);
300
        } elseif ($timeout === null) {
301
            // wait forever (we only reach this if we're only awaiting signals)
302
            // this may be interrupted and return earlier when a signal is received
303
            \sleep(PHP_INT_MAX);
304
        }
305
 
306
        return 0;
307
    }
308
}