Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\Stream;
4
 
5
use Evenement\EventEmitter;
6
use InvalidArgumentException;
7
 
8
/**
9
 * The `ThroughStream` implements the
10
 * [`DuplexStreamInterface`](#duplexstreaminterface) and will simply pass any data
11
 * you write to it through to its readable end.
12
 *
13
 * ```php
14
 * $through = new ThroughStream();
15
 * $through->on('data', $this->expectCallableOnceWith('hello'));
16
 *
17
 * $through->write('hello');
18
 * ```
19
 *
20
 * Similarly, the [`end()` method](#end) will end the stream and emit an
21
 * [`end` event](#end-event) and then [`close()`](#close-1) the stream.
22
 * The [`close()` method](#close-1) will close the stream and emit a
23
 * [`close` event](#close-event).
24
 * Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this:
25
 *
26
 * ```php
27
 * $through = new ThroughStream();
28
 * $source->pipe($through)->pipe($dest);
29
 * ```
30
 *
31
 * Optionally, its constructor accepts any callable function which will then be
32
 * used to *filter* any data written to it. This function receives a single data
33
 * argument as passed to the writable side and must return the data as it will be
34
 * passed to its readable end:
35
 *
36
 * ```php
37
 * $through = new ThroughStream('strtoupper');
38
 * $source->pipe($through)->pipe($dest);
39
 * ```
40
 *
41
 * Note that this class makes no assumptions about any data types. This can be
42
 * used to convert data, for example for transforming any structured data into
43
 * a newline-delimited JSON (NDJSON) stream like this:
44
 *
45
 * ```php
46
 * $through = new ThroughStream(function ($data) {
47
 *     return json_encode($data) . PHP_EOL;
48
 * });
49
 * $through->on('data', $this->expectCallableOnceWith("[2, true]\n"));
50
 *
51
 * $through->write(array(2, true));
52
 * ```
53
 *
54
 * The callback function is allowed to throw an `Exception`. In this case,
55
 * the stream will emit an `error` event and then [`close()`](#close-1) the stream.
56
 *
57
 * ```php
58
 * $through = new ThroughStream(function ($data) {
59
 *     if (!is_string($data)) {
60
 *         throw new \UnexpectedValueException('Only strings allowed');
61
 *     }
62
 *     return $data;
63
 * });
64
 * $through->on('error', $this->expectCallableOnce()));
65
 * $through->on('close', $this->expectCallableOnce()));
66
 * $through->on('data', $this->expectCallableNever()));
67
 *
68
 * $through->write(2);
69
 * ```
70
 *
71
 * @see WritableStreamInterface::write()
72
 * @see WritableStreamInterface::end()
73
 * @see DuplexStreamInterface::close()
74
 * @see WritableStreamInterface::pipe()
75
 */
76
final class ThroughStream extends EventEmitter implements DuplexStreamInterface
77
{
78
    private $readable = true;
79
    private $writable = true;
80
    private $closed = false;
81
    private $paused = false;
82
    private $drain = false;
83
    private $callback;
84
 
85
    public function __construct($callback = null)
86
    {
87
        if ($callback !== null && !\is_callable($callback)) {
88
            throw new InvalidArgumentException('Invalid transformation callback given');
89
        }
90
 
91
        $this->callback = $callback;
92
    }
93
 
94
    public function pause()
95
    {
96
        $this->paused = true;
97
    }
98
 
99
    public function resume()
100
    {
101
        if ($this->drain) {
102
            $this->drain = false;
103
            $this->emit('drain');
104
        }
105
        $this->paused = false;
106
    }
107
 
108
    public function pipe(WritableStreamInterface $dest, array $options = array())
109
    {
110
        return Util::pipe($this, $dest, $options);
111
    }
112
 
113
    public function isReadable()
114
    {
115
        return $this->readable;
116
    }
117
 
118
    public function isWritable()
119
    {
120
        return $this->writable;
121
    }
122
 
123
    public function write($data)
124
    {
125
        if (!$this->writable) {
126
            return false;
127
        }
128
 
129
        if ($this->callback !== null) {
130
            try {
131
                $data = \call_user_func($this->callback, $data);
132
            } catch (\Exception $e) {
133
                $this->emit('error', array($e));
134
                $this->close();
135
 
136
                return false;
137
            }
138
        }
139
 
140
        $this->emit('data', array($data));
141
 
142
        if ($this->paused) {
143
            $this->drain = true;
144
            return false;
145
        }
146
 
147
        return true;
148
    }
149
 
150
    public function end($data = null)
151
    {
152
        if (!$this->writable) {
153
            return;
154
        }
155
 
156
        if (null !== $data) {
157
            $this->write($data);
158
 
159
            // return if write() already caused the stream to close
160
            if (!$this->writable) {
161
                return;
162
            }
163
        }
164
 
165
        $this->readable = false;
166
        $this->writable = false;
167
        $this->paused = true;
168
        $this->drain = false;
169
 
170
        $this->emit('end');
171
        $this->close();
172
    }
173
 
174
    public function close()
175
    {
176
        if ($this->closed) {
177
            return;
178
        }
179
 
180
        $this->readable = false;
181
        $this->writable = false;
182
        $this->closed = true;
183
        $this->paused = true;
184
        $this->drain = false;
185
        $this->callback = null;
186
 
187
        $this->emit('close');
188
        $this->removeAllListeners();
189
    }
190
}