| 3 |
liveuser |
1 |
<?php
|
|
|
2 |
|
|
|
3 |
namespace React\Stream;
|
|
|
4 |
|
|
|
5 |
final class Util
|
|
|
6 |
{
|
|
|
7 |
/**
|
|
|
8 |
* Pipes all the data from the given $source into the $dest
|
|
|
9 |
*
|
|
|
10 |
* @param ReadableStreamInterface $source
|
|
|
11 |
* @param WritableStreamInterface $dest
|
|
|
12 |
* @param array $options
|
|
|
13 |
* @return WritableStreamInterface $dest stream as-is
|
|
|
14 |
* @see ReadableStreamInterface::pipe() for more details
|
|
|
15 |
*/
|
|
|
16 |
public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array())
|
|
|
17 |
{
|
|
|
18 |
// source not readable => NO-OP
|
|
|
19 |
if (!$source->isReadable()) {
|
|
|
20 |
return $dest;
|
|
|
21 |
}
|
|
|
22 |
|
|
|
23 |
// destination not writable => just pause() source
|
|
|
24 |
if (!$dest->isWritable()) {
|
|
|
25 |
$source->pause();
|
|
|
26 |
|
|
|
27 |
return $dest;
|
|
|
28 |
}
|
|
|
29 |
|
|
|
30 |
$dest->emit('pipe', array($source));
|
|
|
31 |
|
|
|
32 |
// forward all source data events as $dest->write()
|
|
|
33 |
$source->on('data', $dataer = function ($data) use ($source, $dest) {
|
|
|
34 |
$feedMore = $dest->write($data);
|
|
|
35 |
|
|
|
36 |
if (false === $feedMore) {
|
|
|
37 |
$source->pause();
|
|
|
38 |
}
|
|
|
39 |
});
|
|
|
40 |
$dest->on('close', function () use ($source, $dataer) {
|
|
|
41 |
$source->removeListener('data', $dataer);
|
|
|
42 |
$source->pause();
|
|
|
43 |
});
|
|
|
44 |
|
|
|
45 |
// forward destination drain as $source->resume()
|
|
|
46 |
$dest->on('drain', $drainer = function () use ($source) {
|
|
|
47 |
$source->resume();
|
|
|
48 |
});
|
|
|
49 |
$source->on('close', function () use ($dest, $drainer) {
|
|
|
50 |
$dest->removeListener('drain', $drainer);
|
|
|
51 |
});
|
|
|
52 |
|
|
|
53 |
// forward end event from source as $dest->end()
|
|
|
54 |
$end = isset($options['end']) ? $options['end'] : true;
|
|
|
55 |
if ($end) {
|
|
|
56 |
$source->on('end', $ender = function () use ($dest) {
|
|
|
57 |
$dest->end();
|
|
|
58 |
});
|
|
|
59 |
$dest->on('close', function () use ($source, $ender) {
|
|
|
60 |
$source->removeListener('end', $ender);
|
|
|
61 |
});
|
|
|
62 |
}
|
|
|
63 |
|
|
|
64 |
return $dest;
|
|
|
65 |
}
|
|
|
66 |
|
|
|
67 |
public static function forwardEvents($source, $target, array $events)
|
|
|
68 |
{
|
|
|
69 |
foreach ($events as $event) {
|
|
|
70 |
$source->on($event, function () use ($event, $target) {
|
|
|
71 |
$target->emit($event, \func_get_args());
|
|
|
72 |
});
|
|
|
73 |
}
|
|
|
74 |
}
|
|
|
75 |
}
|