Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\Stream;
4
 
5
use Evenement\EventEmitter;
6
use React\EventLoop\LoopInterface;
7
use InvalidArgumentException;
8
 
9
class Stream extends EventEmitter implements DuplexStreamInterface
10
{
11
    public $bufferSize = 4096;
12
    public $stream;
13
    protected $readable = true;
14
    protected $writable = true;
15
    protected $closing = false;
16
    protected $loop;
17
    protected $buffer;
18
 
19
    public function __construct($stream, LoopInterface $loop)
20
    {
21
        $this->stream = $stream;
22
        if (!is_resource($this->stream) || get_resource_type($this->stream) !== "stream") {
23
             throw new InvalidArgumentException('First parameter must be a valid stream resource');
24
        }
25
 
26
        stream_set_blocking($this->stream, 0);
27
 
28
        // Use unbuffered read operations on the underlying stream resource.
29
        // Reading chunks from the stream may otherwise leave unread bytes in
30
        // PHP's stream buffers which some event loop implementations do not
31
        // trigger events on (edge triggered).
32
        // This does not affect the default event loop implementation (level
33
        // triggered), so we can ignore platforms not supporting this (HHVM).
34
        if (function_exists('stream_set_read_buffer')) {
35
            stream_set_read_buffer($this->stream, 0);
36
        }
37
 
38
        $this->loop = $loop;
39
        $this->buffer = new Buffer($this->stream, $this->loop);
40
 
41
        $that = $this;
42
 
43
        $this->buffer->on('error', function ($error) use ($that) {
44
            $that->emit('error', array($error, $that));
45
            $that->close();
46
        });
47
 
48
        $this->buffer->on('drain', function () use ($that) {
49
            $that->emit('drain', array($that));
50
        });
51
 
52
        $this->resume();
53
    }
54
 
55
    public function isReadable()
56
    {
57
        return $this->readable;
58
    }
59
 
60
    public function isWritable()
61
    {
62
        return $this->writable;
63
    }
64
 
65
    public function pause()
66
    {
67
        $this->loop->removeReadStream($this->stream);
68
    }
69
 
70
    public function resume()
71
    {
72
        if ($this->readable) {
73
            $this->loop->addReadStream($this->stream, array($this, 'handleData'));
74
        }
75
    }
76
 
77
    public function write($data)
78
    {
79
        if (!$this->writable) {
80
            return;
81
        }
82
 
83
        return $this->buffer->write($data);
84
    }
85
 
86
    public function close()
87
    {
88
        if (!$this->writable && !$this->closing) {
89
            return;
90
        }
91
 
92
        $this->closing = false;
93
 
94
        $this->readable = false;
95
        $this->writable = false;
96
 
97
        $this->emit('end', array($this));
98
        $this->emit('close', array($this));
99
        $this->loop->removeStream($this->stream);
100
        $this->buffer->removeAllListeners();
101
        $this->removeAllListeners();
102
 
103
        $this->handleClose();
104
    }
105
 
106
    public function end($data = null)
107
    {
108
        if (!$this->writable) {
109
            return;
110
        }
111
 
112
        $this->closing = true;
113
 
114
        $this->readable = false;
115
        $this->writable = false;
116
 
117
        $this->buffer->on('close', array($this, 'close'));
118
 
119
        $this->buffer->end($data);
120
    }
121
 
122
    public function pipe(WritableStreamInterface $dest, array $options = array())
123
    {
124
        Util::pipe($this, $dest, $options);
125
 
126
        return $dest;
127
    }
128
 
129
    public function handleData($stream)
130
    {
131
        $data = fread($stream, $this->bufferSize);
132
 
133
        $this->emit('data', array($data, $this));
134
 
135
        if (!is_resource($stream) || feof($stream)) {
136
            $this->end();
137
        }
138
    }
139
 
140
    public function handleClose()
141
    {
142
        if (is_resource($this->stream)) {
143
            fclose($this->stream);
144
        }
145
    }
146
 
147
    public function getBuffer()
148
    {
149
        return $this->buffer;
150
    }
151
}