Subversion Repositories qbpwcf-lib(archive)

Rev

Rev 915 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
1 liveuser 1
<?php
2
 
3
namespace React\Stream;
4
 
5
use Evenement\EventEmitter;
6
 
7
final class CompositeStream extends EventEmitter implements DuplexStreamInterface
8
{
9
    private $readable;
10
    private $writable;
11
    private $closed = false;
12
 
13
    public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable)
14
    {
15
        $this->readable = $readable;
16
        $this->writable = $writable;
17
 
18
        if (!$readable->isReadable() || !$writable->isWritable()) {
19
            $this->close();
20
            return;
21
        }
22
 
23
        Util::forwardEvents($this->readable, $this, array('data', 'end', 'error'));
24
        Util::forwardEvents($this->writable, $this, array('drain', 'error', 'pipe'));
25
 
26
        $this->readable->on('close', array($this, 'close'));
27
        $this->writable->on('close', array($this, 'close'));
28
    }
29
 
30
    public function isReadable()
31
    {
32
        return $this->readable->isReadable();
33
    }
34
 
35
    public function pause()
36
    {
37
        $this->readable->pause();
38
    }
39
 
40
    public function resume()
41
    {
42
        if (!$this->writable->isWritable()) {
43
            return;
44
        }
45
 
46
        $this->readable->resume();
47
    }
48
 
49
    public function pipe(WritableStreamInterface $dest, array $options = array())
50
    {
51
        return Util::pipe($this, $dest, $options);
52
    }
53
 
54
    public function isWritable()
55
    {
56
        return $this->writable->isWritable();
57
    }
58
 
59
    public function write($data)
60
    {
61
        return $this->writable->write($data);
62
    }
63
 
64
    public function end($data = null)
65
    {
66
        $this->readable->pause();
67
        $this->writable->end($data);
68
    }
69
 
70
    public function close()
71
    {
72
        if ($this->closed) {
73
            return;
74
        }
75
 
76
        $this->closed = true;
77
        $this->readable->close();
78
        $this->writable->close();
79
 
80
        $this->emit('close');
81
        $this->removeAllListeners();
82
    }
83
}