| 3 |
liveuser |
1 |
<?php
|
|
|
2 |
|
|
|
3 |
namespace React\EventLoop;
|
|
|
4 |
|
|
|
5 |
use React\EventLoop\Tick\FutureTickQueue;
|
|
|
6 |
use React\EventLoop\Timer\Timer;
|
|
|
7 |
use React\EventLoop\Timer\Timers;
|
|
|
8 |
|
|
|
9 |
/**
|
|
|
10 |
* A `stream_select()` based event loop.
|
|
|
11 |
*
|
|
|
12 |
* This uses the [`stream_select()`](https://www.php.net/manual/en/function.stream-select.php)
|
|
|
13 |
* function and is the only implementation which works out of the box with PHP.
|
|
|
14 |
*
|
|
|
15 |
* This event loop works out of the box on PHP 5.4 through PHP 7+ and HHVM.
|
|
|
16 |
* This means that no installation is required and this library works on all
|
|
|
17 |
* platforms and supported PHP versions.
|
|
|
18 |
* Accordingly, the [`Factory`](#factory) will use this event loop by default if
|
|
|
19 |
* you do not install any of the event loop extensions listed below.
|
|
|
20 |
*
|
|
|
21 |
* Under the hood, it does a simple `select` system call.
|
|
|
22 |
* This system call is limited to the maximum file descriptor number of
|
|
|
23 |
* `FD_SETSIZE` (platform dependent, commonly 1024) and scales with `O(m)`
|
|
|
24 |
* (`m` being the maximum file descriptor number passed).
|
|
|
25 |
* This means that you may run into issues when handling thousands of streams
|
|
|
26 |
* concurrently and you may want to look into using one of the alternative
|
|
|
27 |
* event loop implementations listed below in this case.
|
|
|
28 |
* If your use case is among the many common use cases that involve handling only
|
|
|
29 |
* dozens or a few hundred streams at once, then this event loop implementation
|
|
|
30 |
* performs really well.
|
|
|
31 |
*
|
|
|
32 |
* If you want to use signal handling (see also [`addSignal()`](#addsignal) below),
|
|
|
33 |
* this event loop implementation requires `ext-pcntl`.
|
|
|
34 |
* This extension is only available for Unix-like platforms and does not support
|
|
|
35 |
* Windows.
|
|
|
36 |
* It is commonly installed as part of many PHP distributions.
|
|
|
37 |
* If this extension is missing (or you're running on Windows), signal handling is
|
|
|
38 |
* not supported and throws a `BadMethodCallException` instead.
|
|
|
39 |
*
|
|
|
40 |
* This event loop is known to rely on wall-clock time to schedule future timers
|
|
|
41 |
* when using any version before PHP 7.3, because a monotonic time source is
|
|
|
42 |
* only available as of PHP 7.3 (`hrtime()`).
|
|
|
43 |
* While this does not affect many common use cases, this is an important
|
|
|
44 |
* distinction for programs that rely on a high time precision or on systems
|
|
|
45 |
* that are subject to discontinuous time adjustments (time jumps).
|
|
|
46 |
* This means that if you schedule a timer to trigger in 30s on PHP < 7.3 and
|
|
|
47 |
* then adjust your system time forward by 20s, the timer may trigger in 10s.
|
|
|
48 |
* See also [`addTimer()`](#addtimer) for more details.
|
|
|
49 |
*
|
|
|
50 |
* @link https://www.php.net/manual/en/function.stream-select.php
|
|
|
51 |
*/
|
|
|
52 |
final class StreamSelectLoop implements LoopInterface
|
|
|
53 |
{
|
|
|
54 |
/** @internal */
|
|
|
55 |
const MICROSECONDS_PER_SECOND = 1000000;
|
|
|
56 |
|
|
|
57 |
private $futureTickQueue;
|
|
|
58 |
private $timers;
|
|
|
59 |
private $readStreams = array();
|
|
|
60 |
private $readListeners = array();
|
|
|
61 |
private $writeStreams = array();
|
|
|
62 |
private $writeListeners = array();
|
|
|
63 |
private $running;
|
|
|
64 |
private $pcntl = false;
|
|
|
65 |
private $pcntlPoll = false;
|
|
|
66 |
private $signals;
|
|
|
67 |
|
|
|
68 |
public function __construct()
|
|
|
69 |
{
|
|
|
70 |
$this->futureTickQueue = new FutureTickQueue();
|
|
|
71 |
$this->timers = new Timers();
|
|
|
72 |
$this->pcntl = \function_exists('pcntl_signal') && \function_exists('pcntl_signal_dispatch');
|
|
|
73 |
$this->pcntlPoll = $this->pcntl && !\function_exists('pcntl_async_signals');
|
|
|
74 |
$this->signals = new SignalsHandler();
|
|
|
75 |
|
|
|
76 |
// prefer async signals if available (PHP 7.1+) or fall back to dispatching on each tick
|
|
|
77 |
if ($this->pcntl && !$this->pcntlPoll) {
|
|
|
78 |
\pcntl_async_signals(true);
|
|
|
79 |
}
|
|
|
80 |
}
|
|
|
81 |
|
|
|
82 |
public function addReadStream($stream, $listener)
|
|
|
83 |
{
|
|
|
84 |
$key = (int) $stream;
|
|
|
85 |
|
|
|
86 |
if (!isset($this->readStreams[$key])) {
|
|
|
87 |
$this->readStreams[$key] = $stream;
|
|
|
88 |
$this->readListeners[$key] = $listener;
|
|
|
89 |
}
|
|
|
90 |
}
|
|
|
91 |
|
|
|
92 |
public function addWriteStream($stream, $listener)
|
|
|
93 |
{
|
|
|
94 |
$key = (int) $stream;
|
|
|
95 |
|
|
|
96 |
if (!isset($this->writeStreams[$key])) {
|
|
|
97 |
$this->writeStreams[$key] = $stream;
|
|
|
98 |
$this->writeListeners[$key] = $listener;
|
|
|
99 |
}
|
|
|
100 |
}
|
|
|
101 |
|
|
|
102 |
public function removeReadStream($stream)
|
|
|
103 |
{
|
|
|
104 |
$key = (int) $stream;
|
|
|
105 |
|
|
|
106 |
unset(
|
|
|
107 |
$this->readStreams[$key],
|
|
|
108 |
$this->readListeners[$key]
|
|
|
109 |
);
|
|
|
110 |
}
|
|
|
111 |
|
|
|
112 |
public function removeWriteStream($stream)
|
|
|
113 |
{
|
|
|
114 |
$key = (int) $stream;
|
|
|
115 |
|
|
|
116 |
unset(
|
|
|
117 |
$this->writeStreams[$key],
|
|
|
118 |
$this->writeListeners[$key]
|
|
|
119 |
);
|
|
|
120 |
}
|
|
|
121 |
|
|
|
122 |
public function addTimer($interval, $callback)
|
|
|
123 |
{
|
|
|
124 |
$timer = new Timer($interval, $callback, false);
|
|
|
125 |
|
|
|
126 |
$this->timers->add($timer);
|
|
|
127 |
|
|
|
128 |
return $timer;
|
|
|
129 |
}
|
|
|
130 |
|
|
|
131 |
public function addPeriodicTimer($interval, $callback)
|
|
|
132 |
{
|
|
|
133 |
$timer = new Timer($interval, $callback, true);
|
|
|
134 |
|
|
|
135 |
$this->timers->add($timer);
|
|
|
136 |
|
|
|
137 |
return $timer;
|
|
|
138 |
}
|
|
|
139 |
|
|
|
140 |
public function cancelTimer(TimerInterface $timer)
|
|
|
141 |
{
|
|
|
142 |
$this->timers->cancel($timer);
|
|
|
143 |
}
|
|
|
144 |
|
|
|
145 |
public function futureTick($listener)
|
|
|
146 |
{
|
|
|
147 |
$this->futureTickQueue->add($listener);
|
|
|
148 |
}
|
|
|
149 |
|
|
|
150 |
public function addSignal($signal, $listener)
|
|
|
151 |
{
|
|
|
152 |
if ($this->pcntl === false) {
|
|
|
153 |
throw new \BadMethodCallException('Event loop feature "signals" isn\'t supported by the "StreamSelectLoop"');
|
|
|
154 |
}
|
|
|
155 |
|
|
|
156 |
$first = $this->signals->count($signal) === 0;
|
|
|
157 |
$this->signals->add($signal, $listener);
|
|
|
158 |
|
|
|
159 |
if ($first) {
|
|
|
160 |
\pcntl_signal($signal, array($this->signals, 'call'));
|
|
|
161 |
}
|
|
|
162 |
}
|
|
|
163 |
|
|
|
164 |
public function removeSignal($signal, $listener)
|
|
|
165 |
{
|
|
|
166 |
if (!$this->signals->count($signal)) {
|
|
|
167 |
return;
|
|
|
168 |
}
|
|
|
169 |
|
|
|
170 |
$this->signals->remove($signal, $listener);
|
|
|
171 |
|
|
|
172 |
if ($this->signals->count($signal) === 0) {
|
|
|
173 |
\pcntl_signal($signal, \SIG_DFL);
|
|
|
174 |
}
|
|
|
175 |
}
|
|
|
176 |
|
|
|
177 |
public function run()
|
|
|
178 |
{
|
|
|
179 |
$this->running = true;
|
|
|
180 |
|
|
|
181 |
while ($this->running) {
|
|
|
182 |
$this->futureTickQueue->tick();
|
|
|
183 |
|
|
|
184 |
$this->timers->tick();
|
|
|
185 |
|
|
|
186 |
// Future-tick queue has pending callbacks ...
|
|
|
187 |
if (!$this->running || !$this->futureTickQueue->isEmpty()) {
|
|
|
188 |
$timeout = 0;
|
|
|
189 |
|
|
|
190 |
// There is a pending timer, only block until it is due ...
|
|
|
191 |
} elseif ($scheduledAt = $this->timers->getFirst()) {
|
|
|
192 |
$timeout = $scheduledAt - $this->timers->getTime();
|
|
|
193 |
if ($timeout < 0) {
|
|
|
194 |
$timeout = 0;
|
|
|
195 |
} else {
|
|
|
196 |
// Convert float seconds to int microseconds.
|
|
|
197 |
// Ensure we do not exceed maximum integer size, which may
|
|
|
198 |
// cause the loop to tick once every ~35min on 32bit systems.
|
|
|
199 |
$timeout *= self::MICROSECONDS_PER_SECOND;
|
|
|
200 |
$timeout = $timeout > \PHP_INT_MAX ? \PHP_INT_MAX : (int)$timeout;
|
|
|
201 |
}
|
|
|
202 |
|
|
|
203 |
// The only possible event is stream or signal activity, so wait forever ...
|
|
|
204 |
} elseif ($this->readStreams || $this->writeStreams || !$this->signals->isEmpty()) {
|
|
|
205 |
$timeout = null;
|
|
|
206 |
|
|
|
207 |
// There's nothing left to do ...
|
|
|
208 |
} else {
|
|
|
209 |
break;
|
|
|
210 |
}
|
|
|
211 |
|
|
|
212 |
$this->waitForStreamActivity($timeout);
|
|
|
213 |
}
|
|
|
214 |
}
|
|
|
215 |
|
|
|
216 |
public function stop()
|
|
|
217 |
{
|
|
|
218 |
$this->running = false;
|
|
|
219 |
}
|
|
|
220 |
|
|
|
221 |
/**
|
|
|
222 |
* Wait/check for stream activity, or until the next timer is due.
|
|
|
223 |
*
|
|
|
224 |
* @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
|
|
|
225 |
*/
|
|
|
226 |
private function waitForStreamActivity($timeout)
|
|
|
227 |
{
|
|
|
228 |
$read = $this->readStreams;
|
|
|
229 |
$write = $this->writeStreams;
|
|
|
230 |
|
|
|
231 |
$available = $this->streamSelect($read, $write, $timeout);
|
|
|
232 |
if ($this->pcntlPoll) {
|
|
|
233 |
\pcntl_signal_dispatch();
|
|
|
234 |
}
|
|
|
235 |
if (false === $available) {
|
|
|
236 |
// if a system call has been interrupted,
|
|
|
237 |
// we cannot rely on it's outcome
|
|
|
238 |
return;
|
|
|
239 |
}
|
|
|
240 |
|
|
|
241 |
foreach ($read as $stream) {
|
|
|
242 |
$key = (int) $stream;
|
|
|
243 |
|
|
|
244 |
if (isset($this->readListeners[$key])) {
|
|
|
245 |
\call_user_func($this->readListeners[$key], $stream);
|
|
|
246 |
}
|
|
|
247 |
}
|
|
|
248 |
|
|
|
249 |
foreach ($write as $stream) {
|
|
|
250 |
$key = (int) $stream;
|
|
|
251 |
|
|
|
252 |
if (isset($this->writeListeners[$key])) {
|
|
|
253 |
\call_user_func($this->writeListeners[$key], $stream);
|
|
|
254 |
}
|
|
|
255 |
}
|
|
|
256 |
}
|
|
|
257 |
|
|
|
258 |
/**
|
|
|
259 |
* Emulate a stream_select() implementation that does not break when passed
|
|
|
260 |
* empty stream arrays.
|
|
|
261 |
*
|
|
|
262 |
* @param array $read An array of read streams to select upon.
|
|
|
263 |
* @param array $write An array of write streams to select upon.
|
|
|
264 |
* @param int|null $timeout Activity timeout in microseconds, or null to wait forever.
|
|
|
265 |
*
|
|
|
266 |
* @return int|false The total number of streams that are ready for read/write.
|
|
|
267 |
* Can return false if stream_select() is interrupted by a signal.
|
|
|
268 |
*/
|
|
|
269 |
private function streamSelect(array &$read, array &$write, $timeout)
|
|
|
270 |
{
|
|
|
271 |
if ($read || $write) {
|
|
|
272 |
// We do not usually use or expose the `exceptfds` parameter passed to the underlying `select`.
|
|
|
273 |
// However, Windows does not report failed connection attempts in `writefds` passed to `select` like most other platforms.
|
|
|
274 |
// Instead, it uses `writefds` only for successful connection attempts and `exceptfds` for failed connection attempts.
|
|
|
275 |
// We work around this by adding all sockets that look like a pending connection attempt to `exceptfds` automatically on Windows and merge it back later.
|
|
|
276 |
// This ensures the public API matches other loop implementations across all platforms (see also test suite or rather test matrix).
|
|
|
277 |
// Lacking better APIs, every write-only socket that has not yet read any data is assumed to be in a pending connection attempt state.
|
|
|
278 |
// @link https://docs.microsoft.com/de-de/windows/win32/api/winsock2/nf-winsock2-select
|
|
|
279 |
$except = null;
|
|
|
280 |
if (\DIRECTORY_SEPARATOR === '\\') {
|
|
|
281 |
$except = array();
|
|
|
282 |
foreach ($write as $key => $socket) {
|
|
|
283 |
if (!isset($read[$key]) && @\ftell($socket) === 0) {
|
|
|
284 |
$except[$key] = $socket;
|
|
|
285 |
}
|
|
|
286 |
}
|
|
|
287 |
}
|
|
|
288 |
|
|
|
289 |
// suppress warnings that occur, when stream_select is interrupted by a signal
|
|
|
290 |
$ret = @\stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
|
|
|
291 |
|
|
|
292 |
if ($except) {
|
|
|
293 |
$write = \array_merge($write, $except);
|
|
|
294 |
}
|
|
|
295 |
return $ret;
|
|
|
296 |
}
|
|
|
297 |
|
|
|
298 |
if ($timeout > 0) {
|
|
|
299 |
\usleep($timeout);
|
|
|
300 |
} elseif ($timeout === null) {
|
|
|
301 |
// wait forever (we only reach this if we're only awaiting signals)
|
|
|
302 |
// this may be interrupted and return earlier when a signal is received
|
|
|
303 |
\sleep(PHP_INT_MAX);
|
|
|
304 |
}
|
|
|
305 |
|
|
|
306 |
return 0;
|
|
|
307 |
}
|
|
|
308 |
}
|