Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\Stream;
4
 
5
use Evenement\EventEmitter;
6
use React\EventLoop\LoopInterface;
7
use InvalidArgumentException;
8
 
9
final class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface
10
{
11
    private $stream;
12
    private $loop;
13
 
14
    /**
15
     * Controls the maximum buffer size in bytes to read at once from the stream.
16
     *
17
     * This can be a positive number which means that up to X bytes will be read
18
     * at once from the underlying stream resource. Note that the actual number
19
     * of bytes read may be lower if the stream resource has less than X bytes
20
     * currently available.
21
     *
22
     * This can be `-1` which means read everything available from the
23
     * underlying stream resource.
24
     * This should read until the stream resource is not readable anymore
25
     * (i.e. underlying buffer drained), note that this does not neccessarily
26
     * mean it reached EOF.
27
     *
28
     * @var int
29
     */
30
    private $bufferSize;
31
    private $buffer;
32
 
33
    private $readable = true;
34
    private $writable = true;
35
    private $closing = false;
36
    private $listening = false;
37
 
38
    public function __construct($stream, LoopInterface $loop, $readChunkSize = null, WritableStreamInterface $buffer = null)
39
    {
40
        if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
41
             throw new InvalidArgumentException('First parameter must be a valid stream resource');
42
        }
43
 
44
        // ensure resource is opened for reading and wrting (fopen mode must contain "+")
45
        $meta = \stream_get_meta_data($stream);
46
        if (isset($meta['mode']) && $meta['mode'] !== '' && \strpos($meta['mode'], '+') === false) {
47
            throw new InvalidArgumentException('Given stream resource is not opened in read and write mode');
48
        }
49
 
50
        // this class relies on non-blocking I/O in order to not interrupt the event loop
51
        // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
52
        if (\stream_set_blocking($stream, false) !== true) {
53
            throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
54
        }
55
 
56
        // Use unbuffered read operations on the underlying stream resource.
57
        // Reading chunks from the stream may otherwise leave unread bytes in
58
        // PHP's stream buffers which some event loop implementations do not
59
        // trigger events on (edge triggered).
60
        // This does not affect the default event loop implementation (level
61
        // triggered), so we can ignore platforms not supporting this (HHVM).
62
        // Pipe streams (such as STDIN) do not seem to require this and legacy
63
        // PHP versions cause SEGFAULTs on unbuffered pipe streams, so skip this.
64
        if (\function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
65
            \stream_set_read_buffer($stream, 0);
66
        }
67
 
68
        if ($buffer === null) {
69
            $buffer = new WritableResourceStream($stream, $loop);
70
        }
71
 
72
        $this->stream = $stream;
73
        $this->loop = $loop;
74
        $this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;
75
        $this->buffer = $buffer;
76
 
77
        $that = $this;
78
 
79
        $this->buffer->on('error', function ($error) use ($that) {
80
            $that->emit('error', array($error));
81
        });
82
 
83
        $this->buffer->on('close', array($this, 'close'));
84
 
85
        $this->buffer->on('drain', function () use ($that) {
86
            $that->emit('drain');
87
        });
88
 
89
        $this->resume();
90
    }
91
 
92
    public function isReadable()
93
    {
94
        return $this->readable;
95
    }
96
 
97
    public function isWritable()
98
    {
99
        return $this->writable;
100
    }
101
 
102
    public function pause()
103
    {
104
        if ($this->listening) {
105
            $this->loop->removeReadStream($this->stream);
106
            $this->listening = false;
107
        }
108
    }
109
 
110
    public function resume()
111
    {
112
        if (!$this->listening && $this->readable) {
113
            $this->loop->addReadStream($this->stream, array($this, 'handleData'));
114
            $this->listening = true;
115
        }
116
    }
117
 
118
    public function write($data)
119
    {
120
        if (!$this->writable) {
121
            return false;
122
        }
123
 
124
        return $this->buffer->write($data);
125
    }
126
 
127
    public function close()
128
    {
129
        if (!$this->writable && !$this->closing) {
130
            return;
131
        }
132
 
133
        $this->closing = false;
134
 
135
        $this->readable = false;
136
        $this->writable = false;
137
 
138
        $this->emit('close');
139
        $this->pause();
140
        $this->buffer->close();
141
        $this->removeAllListeners();
142
 
143
        if (\is_resource($this->stream)) {
144
            \fclose($this->stream);
145
        }
146
    }
147
 
148
    public function end($data = null)
149
    {
150
        if (!$this->writable) {
151
            return;
152
        }
153
 
154
        $this->closing = true;
155
 
156
        $this->readable = false;
157
        $this->writable = false;
158
        $this->pause();
159
 
160
        $this->buffer->end($data);
161
    }
162
 
163
    public function pipe(WritableStreamInterface $dest, array $options = array())
164
    {
165
        return Util::pipe($this, $dest, $options);
166
    }
167
 
168
    /** @internal */
169
    public function handleData($stream)
170
    {
171
        $error = null;
172
        \set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
173
            $error = new \ErrorException(
174
                $errstr,
175
                0,
176
                $errno,
177
                $errfile,
178
                $errline
179
            );
180
        });
181
 
182
        $data = \stream_get_contents($stream, $this->bufferSize);
183
 
184
        \restore_error_handler();
185
 
186
        if ($error !== null) {
187
            $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
188
            $this->close();
189
            return;
190
        }
191
 
192
        if ($data !== '') {
193
            $this->emit('data', array($data));
194
        } elseif (\feof($this->stream)) {
195
            // no data read => we reached the end and close the stream
196
            $this->emit('end');
197
            $this->close();
198
        }
199
    }
200
 
201
    /**
202
     * Returns whether this is a pipe resource in a legacy environment
203
     *
204
     * This works around a legacy PHP bug (#61019) that was fixed in PHP 5.4.28+
205
     * and PHP 5.5.12+ and newer.
206
     *
207
     * @param resource $resource
208
     * @return bool
209
     * @link https://github.com/reactphp/child-process/issues/40
210
     *
211
     * @codeCoverageIgnore
212
     */
213
    private function isLegacyPipe($resource)
214
    {
215
        if (\PHP_VERSION_ID < 50428 || (\PHP_VERSION_ID >= 50500 && \PHP_VERSION_ID < 50512)) {
216
            $meta = \stream_get_meta_data($resource);
217
 
218
            if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
219
                return true;
220
            }
221
        }
222
        return false;
223
    }
224
}