Blame | Last modification | View Log | RSS feed
<?phpnamespace React\Stream;use Evenement\EventEmitter;use InvalidArgumentException;/*** The `ThroughStream` implements the* [`DuplexStreamInterface`](#duplexstreaminterface) and will simply pass any data* you write to it through to its readable end.** ```php* $through = new ThroughStream();* $through->on('data', $this->expectCallableOnceWith('hello'));** $through->write('hello');* ```** Similarly, the [`end()` method](#end) will end the stream and emit an* [`end` event](#end-event) and then [`close()`](#close-1) the stream.* The [`close()` method](#close-1) will close the stream and emit a* [`close` event](#close-event).* Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this:** ```php* $through = new ThroughStream();* $source->pipe($through)->pipe($dest);* ```** Optionally, its constructor accepts any callable function which will then be* used to *filter* any data written to it. This function receives a single data* argument as passed to the writable side and must return the data as it will be* passed to its readable end:** ```php* $through = new ThroughStream('strtoupper');* $source->pipe($through)->pipe($dest);* ```** Note that this class makes no assumptions about any data types. This can be* used to convert data, for example for transforming any structured data into* a newline-delimited JSON (NDJSON) stream like this:** ```php* $through = new ThroughStream(function ($data) {* return json_encode($data) . PHP_EOL;* });* $through->on('data', $this->expectCallableOnceWith("[2, true]\n"));** $through->write(array(2, true));* ```** The callback function is allowed to throw an `Exception`. In this case,* the stream will emit an `error` event and then [`close()`](#close-1) the stream.** ```php* $through = new ThroughStream(function ($data) {* if (!is_string($data)) {* throw new \UnexpectedValueException('Only strings allowed');* }* return $data;* });* $through->on('error', $this->expectCallableOnce()));* $through->on('close', $this->expectCallableOnce()));* $through->on('data', $this->expectCallableNever()));** $through->write(2);* ```** @see WritableStreamInterface::write()* @see WritableStreamInterface::end()* @see DuplexStreamInterface::close()* @see WritableStreamInterface::pipe()*/final class ThroughStream extends EventEmitter implements DuplexStreamInterface{private $readable = true;private $writable = true;private $closed = false;private $paused = false;private $drain = false;private $callback;public function __construct($callback = null){if ($callback !== null && !\is_callable($callback)) {throw new InvalidArgumentException('Invalid transformation callback given');}$this->callback = $callback;}public function pause(){$this->paused = true;}public function resume(){if ($this->drain) {$this->drain = false;$this->emit('drain');}$this->paused = false;}public function pipe(WritableStreamInterface $dest, array $options = array()){return Util::pipe($this, $dest, $options);}public function isReadable(){return $this->readable;}public function isWritable(){return $this->writable;}public function write($data){if (!$this->writable) {return false;}if ($this->callback !== null) {try {$data = \call_user_func($this->callback, $data);} catch (\Exception $e) {$this->emit('error', array($e));$this->close();return false;}}$this->emit('data', array($data));if ($this->paused) {$this->drain = true;return false;}return true;}public function end($data = null){if (!$this->writable) {return;}if (null !== $data) {$this->write($data);// return if write() already caused the stream to closeif (!$this->writable) {return;}}$this->readable = false;$this->writable = false;$this->paused = true;$this->drain = false;$this->emit('end');$this->close();}public function close(){if ($this->closed) {return;}$this->readable = false;$this->writable = false;$this->closed = true;$this->paused = true;$this->drain = false;$this->callback = null;$this->emit('close');$this->removeAllListeners();}}