| 3 |
liveuser |
1 |
<?php
|
|
|
2 |
|
|
|
3 |
namespace React\Stream;
|
|
|
4 |
|
|
|
5 |
use Evenement\EventEmitter;
|
|
|
6 |
use React\EventLoop\LoopInterface;
|
|
|
7 |
use InvalidArgumentException;
|
|
|
8 |
|
|
|
9 |
final class ReadableResourceStream extends EventEmitter implements ReadableStreamInterface
|
|
|
10 |
{
|
|
|
11 |
/**
|
|
|
12 |
* @var resource
|
|
|
13 |
*/
|
|
|
14 |
private $stream;
|
|
|
15 |
|
|
|
16 |
private $loop;
|
|
|
17 |
|
|
|
18 |
/**
|
|
|
19 |
* Controls the maximum buffer size in bytes to read at once from the stream.
|
|
|
20 |
*
|
|
|
21 |
* This value SHOULD NOT be changed unless you know what you're doing.
|
|
|
22 |
*
|
|
|
23 |
* This can be a positive number which means that up to X bytes will be read
|
|
|
24 |
* at once from the underlying stream resource. Note that the actual number
|
|
|
25 |
* of bytes read may be lower if the stream resource has less than X bytes
|
|
|
26 |
* currently available.
|
|
|
27 |
*
|
|
|
28 |
* This can be `-1` which means read everything available from the
|
|
|
29 |
* underlying stream resource.
|
|
|
30 |
* This should read until the stream resource is not readable anymore
|
|
|
31 |
* (i.e. underlying buffer drained), note that this does not neccessarily
|
|
|
32 |
* mean it reached EOF.
|
|
|
33 |
*
|
|
|
34 |
* @var int
|
|
|
35 |
*/
|
|
|
36 |
private $bufferSize;
|
|
|
37 |
|
|
|
38 |
private $closed = false;
|
|
|
39 |
private $listening = false;
|
|
|
40 |
|
|
|
41 |
public function __construct($stream, LoopInterface $loop, $readChunkSize = null)
|
|
|
42 |
{
|
|
|
43 |
if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
|
|
|
44 |
throw new InvalidArgumentException('First parameter must be a valid stream resource');
|
|
|
45 |
}
|
|
|
46 |
|
|
|
47 |
// ensure resource is opened for reading (fopen mode must contain "r" or "+")
|
|
|
48 |
$meta = \stream_get_meta_data($stream);
|
|
|
49 |
if (isset($meta['mode']) && $meta['mode'] !== '' && \strpos($meta['mode'], 'r') === \strpos($meta['mode'], '+')) {
|
|
|
50 |
throw new InvalidArgumentException('Given stream resource is not opened in read mode');
|
|
|
51 |
}
|
|
|
52 |
|
|
|
53 |
// this class relies on non-blocking I/O in order to not interrupt the event loop
|
|
|
54 |
// e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
|
|
|
55 |
if (\stream_set_blocking($stream, false) !== true) {
|
|
|
56 |
throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
|
|
|
57 |
}
|
|
|
58 |
|
|
|
59 |
// Use unbuffered read operations on the underlying stream resource.
|
|
|
60 |
// Reading chunks from the stream may otherwise leave unread bytes in
|
|
|
61 |
// PHP's stream buffers which some event loop implementations do not
|
|
|
62 |
// trigger events on (edge triggered).
|
|
|
63 |
// This does not affect the default event loop implementation (level
|
|
|
64 |
// triggered), so we can ignore platforms not supporting this (HHVM).
|
|
|
65 |
// Pipe streams (such as STDIN) do not seem to require this and legacy
|
|
|
66 |
// PHP versions cause SEGFAULTs on unbuffered pipe streams, so skip this.
|
|
|
67 |
if (\function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
|
|
|
68 |
\stream_set_read_buffer($stream, 0);
|
|
|
69 |
}
|
|
|
70 |
|
|
|
71 |
$this->stream = $stream;
|
|
|
72 |
$this->loop = $loop;
|
|
|
73 |
$this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;
|
|
|
74 |
|
|
|
75 |
$this->resume();
|
|
|
76 |
}
|
|
|
77 |
|
|
|
78 |
public function isReadable()
|
|
|
79 |
{
|
|
|
80 |
return !$this->closed;
|
|
|
81 |
}
|
|
|
82 |
|
|
|
83 |
public function pause()
|
|
|
84 |
{
|
|
|
85 |
if ($this->listening) {
|
|
|
86 |
$this->loop->removeReadStream($this->stream);
|
|
|
87 |
$this->listening = false;
|
|
|
88 |
}
|
|
|
89 |
}
|
|
|
90 |
|
|
|
91 |
public function resume()
|
|
|
92 |
{
|
|
|
93 |
if (!$this->listening && !$this->closed) {
|
|
|
94 |
$this->loop->addReadStream($this->stream, array($this, 'handleData'));
|
|
|
95 |
$this->listening = true;
|
|
|
96 |
}
|
|
|
97 |
}
|
|
|
98 |
|
|
|
99 |
public function pipe(WritableStreamInterface $dest, array $options = array())
|
|
|
100 |
{
|
|
|
101 |
return Util::pipe($this, $dest, $options);
|
|
|
102 |
}
|
|
|
103 |
|
|
|
104 |
public function close()
|
|
|
105 |
{
|
|
|
106 |
if ($this->closed) {
|
|
|
107 |
return;
|
|
|
108 |
}
|
|
|
109 |
|
|
|
110 |
$this->closed = true;
|
|
|
111 |
|
|
|
112 |
$this->emit('close');
|
|
|
113 |
$this->pause();
|
|
|
114 |
$this->removeAllListeners();
|
|
|
115 |
|
|
|
116 |
if (\is_resource($this->stream)) {
|
|
|
117 |
\fclose($this->stream);
|
|
|
118 |
}
|
|
|
119 |
}
|
|
|
120 |
|
|
|
121 |
/** @internal */
|
|
|
122 |
public function handleData()
|
|
|
123 |
{
|
|
|
124 |
$error = null;
|
|
|
125 |
\set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
|
|
|
126 |
$error = new \ErrorException(
|
|
|
127 |
$errstr,
|
|
|
128 |
0,
|
|
|
129 |
$errno,
|
|
|
130 |
$errfile,
|
|
|
131 |
$errline
|
|
|
132 |
);
|
|
|
133 |
});
|
|
|
134 |
|
|
|
135 |
$data = \stream_get_contents($this->stream, $this->bufferSize);
|
|
|
136 |
|
|
|
137 |
\restore_error_handler();
|
|
|
138 |
|
|
|
139 |
if ($error !== null) {
|
|
|
140 |
$this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
|
|
|
141 |
$this->close();
|
|
|
142 |
return;
|
|
|
143 |
}
|
|
|
144 |
|
|
|
145 |
if ($data !== '') {
|
|
|
146 |
$this->emit('data', array($data));
|
|
|
147 |
} elseif (\feof($this->stream)) {
|
|
|
148 |
// no data read => we reached the end and close the stream
|
|
|
149 |
$this->emit('end');
|
|
|
150 |
$this->close();
|
|
|
151 |
}
|
|
|
152 |
}
|
|
|
153 |
|
|
|
154 |
/**
|
|
|
155 |
* Returns whether this is a pipe resource in a legacy environment
|
|
|
156 |
*
|
|
|
157 |
* This works around a legacy PHP bug (#61019) that was fixed in PHP 5.4.28+
|
|
|
158 |
* and PHP 5.5.12+ and newer.
|
|
|
159 |
*
|
|
|
160 |
* @param resource $resource
|
|
|
161 |
* @return bool
|
|
|
162 |
* @link https://github.com/reactphp/child-process/issues/40
|
|
|
163 |
*
|
|
|
164 |
* @codeCoverageIgnore
|
|
|
165 |
*/
|
|
|
166 |
private function isLegacyPipe($resource)
|
|
|
167 |
{
|
|
|
168 |
if (\PHP_VERSION_ID < 50428 || (\PHP_VERSION_ID >= 50500 && \PHP_VERSION_ID < 50512)) {
|
|
|
169 |
$meta = \stream_get_meta_data($resource);
|
|
|
170 |
|
|
|
171 |
if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
|
|
|
172 |
return true;
|
|
|
173 |
}
|
|
|
174 |
}
|
|
|
175 |
return false;
|
|
|
176 |
}
|
|
|
177 |
}
|