Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\Stream;
4
 
5
use Evenement\EventEmitter;
6
use React\EventLoop\LoopInterface;
7
use InvalidArgumentException;
8
 
9
final class ReadableResourceStream extends EventEmitter implements ReadableStreamInterface
10
{
11
    /**
12
     * @var resource
13
     */
14
    private $stream;
15
 
16
    private $loop;
17
 
18
    /**
19
     * Controls the maximum buffer size in bytes to read at once from the stream.
20
     *
21
     * This value SHOULD NOT be changed unless you know what you're doing.
22
     *
23
     * This can be a positive number which means that up to X bytes will be read
24
     * at once from the underlying stream resource. Note that the actual number
25
     * of bytes read may be lower if the stream resource has less than X bytes
26
     * currently available.
27
     *
28
     * This can be `-1` which means read everything available from the
29
     * underlying stream resource.
30
     * This should read until the stream resource is not readable anymore
31
     * (i.e. underlying buffer drained), note that this does not neccessarily
32
     * mean it reached EOF.
33
     *
34
     * @var int
35
     */
36
    private $bufferSize;
37
 
38
    private $closed = false;
39
    private $listening = false;
40
 
41
    public function __construct($stream, LoopInterface $loop, $readChunkSize = null)
42
    {
43
        if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
44
             throw new InvalidArgumentException('First parameter must be a valid stream resource');
45
        }
46
 
47
        // ensure resource is opened for reading (fopen mode must contain "r" or "+")
48
        $meta = \stream_get_meta_data($stream);
49
        if (isset($meta['mode']) && $meta['mode'] !== '' && \strpos($meta['mode'], 'r') === \strpos($meta['mode'], '+')) {
50
            throw new InvalidArgumentException('Given stream resource is not opened in read mode');
51
        }
52
 
53
        // this class relies on non-blocking I/O in order to not interrupt the event loop
54
        // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
55
        if (\stream_set_blocking($stream, false) !== true) {
56
            throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
57
        }
58
 
59
        // Use unbuffered read operations on the underlying stream resource.
60
        // Reading chunks from the stream may otherwise leave unread bytes in
61
        // PHP's stream buffers which some event loop implementations do not
62
        // trigger events on (edge triggered).
63
        // This does not affect the default event loop implementation (level
64
        // triggered), so we can ignore platforms not supporting this (HHVM).
65
        // Pipe streams (such as STDIN) do not seem to require this and legacy
66
        // PHP versions cause SEGFAULTs on unbuffered pipe streams, so skip this.
67
        if (\function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
68
            \stream_set_read_buffer($stream, 0);
69
        }
70
 
71
        $this->stream = $stream;
72
        $this->loop = $loop;
73
        $this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;
74
 
75
        $this->resume();
76
    }
77
 
78
    public function isReadable()
79
    {
80
        return !$this->closed;
81
    }
82
 
83
    public function pause()
84
    {
85
        if ($this->listening) {
86
            $this->loop->removeReadStream($this->stream);
87
            $this->listening = false;
88
        }
89
    }
90
 
91
    public function resume()
92
    {
93
        if (!$this->listening && !$this->closed) {
94
            $this->loop->addReadStream($this->stream, array($this, 'handleData'));
95
            $this->listening = true;
96
        }
97
    }
98
 
99
    public function pipe(WritableStreamInterface $dest, array $options = array())
100
    {
101
        return Util::pipe($this, $dest, $options);
102
    }
103
 
104
    public function close()
105
    {
106
        if ($this->closed) {
107
            return;
108
        }
109
 
110
        $this->closed = true;
111
 
112
        $this->emit('close');
113
        $this->pause();
114
        $this->removeAllListeners();
115
 
116
        if (\is_resource($this->stream)) {
117
            \fclose($this->stream);
118
        }
119
    }
120
 
121
    /** @internal */
122
    public function handleData()
123
    {
124
        $error = null;
125
        \set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
126
            $error = new \ErrorException(
127
                $errstr,
128
                0,
129
                $errno,
130
                $errfile,
131
                $errline
132
            );
133
        });
134
 
135
        $data = \stream_get_contents($this->stream, $this->bufferSize);
136
 
137
        \restore_error_handler();
138
 
139
        if ($error !== null) {
140
            $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
141
            $this->close();
142
            return;
143
        }
144
 
145
        if ($data !== '') {
146
            $this->emit('data', array($data));
147
        } elseif (\feof($this->stream)) {
148
            // no data read => we reached the end and close the stream
149
            $this->emit('end');
150
            $this->close();
151
        }
152
    }
153
 
154
    /**
155
     * Returns whether this is a pipe resource in a legacy environment
156
     *
157
     * This works around a legacy PHP bug (#61019) that was fixed in PHP 5.4.28+
158
     * and PHP 5.5.12+ and newer.
159
     *
160
     * @param resource $resource
161
     * @return bool
162
     * @link https://github.com/reactphp/child-process/issues/40
163
     *
164
     * @codeCoverageIgnore
165
     */
166
    private function isLegacyPipe($resource)
167
    {
168
        if (\PHP_VERSION_ID < 50428 || (\PHP_VERSION_ID >= 50500 && \PHP_VERSION_ID < 50512)) {
169
            $meta = \stream_get_meta_data($resource);
170
 
171
            if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
172
                return true;
173
            }
174
        }
175
        return false;
176
    }
177
}