Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\Stream;
4
 
5
final class Util
6
{
7
    /**
8
     * Pipes all the data from the given $source into the $dest
9
     *
10
     * @param ReadableStreamInterface $source
11
     * @param WritableStreamInterface $dest
12
     * @param array $options
13
     * @return WritableStreamInterface $dest stream as-is
14
     * @see ReadableStreamInterface::pipe() for more details
15
     */
16
    public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array())
17
    {
18
        // source not readable => NO-OP
19
        if (!$source->isReadable()) {
20
            return $dest;
21
        }
22
 
23
        // destination not writable => just pause() source
24
        if (!$dest->isWritable()) {
25
            $source->pause();
26
 
27
            return $dest;
28
        }
29
 
30
        $dest->emit('pipe', array($source));
31
 
32
        // forward all source data events as $dest->write()
33
        $source->on('data', $dataer = function ($data) use ($source, $dest) {
34
            $feedMore = $dest->write($data);
35
 
36
            if (false === $feedMore) {
37
                $source->pause();
38
            }
39
        });
40
        $dest->on('close', function () use ($source, $dataer) {
41
            $source->removeListener('data', $dataer);
42
            $source->pause();
43
        });
44
 
45
        // forward destination drain as $source->resume()
46
        $dest->on('drain', $drainer = function () use ($source) {
47
            $source->resume();
48
        });
49
        $source->on('close', function () use ($dest, $drainer) {
50
            $dest->removeListener('drain', $drainer);
51
        });
52
 
53
        // forward end event from source as $dest->end()
54
        $end = isset($options['end']) ? $options['end'] : true;
55
        if ($end) {
56
            $source->on('end', $ender = function () use ($dest) {
57
                $dest->end();
58
            });
59
            $dest->on('close', function () use ($source, $ender) {
60
                $source->removeListener('end', $ender);
61
            });
62
        }
63
 
64
        return $dest;
65
    }
66
 
67
    public static function forwardEvents($source, $target, array $events)
68
    {
69
        foreach ($events as $event) {
70
            $source->on($event, function () use ($event, $target) {
71
                $target->emit($event, \func_get_args());
72
            });
73
        }
74
    }
75
}