Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\Stream;
4
 
5
use Evenement\EventEmitter;
6
use React\EventLoop\LoopInterface;
7
 
8
/** @event full-drain */
9
class Buffer extends EventEmitter implements WritableStreamInterface
10
{
11
    public $stream;
12
    public $listening = false;
13
    public $softLimit = 2048;
14
    private $writable = true;
15
    private $loop;
16
    private $data = '';
17
    private $lastError = array(
18
        'number'  => 0,
19
        'message' => '',
20
        'file'    => '',
21
        'line'    => 0,
22
    );
23
 
24
    public function __construct($stream, LoopInterface $loop)
25
    {
26
        $this->stream = $stream;
27
        $this->loop = $loop;
28
    }
29
 
30
    public function isWritable()
31
    {
32
        return $this->writable;
33
    }
34
 
35
    public function write($data)
36
    {
37
        if (!$this->writable) {
38
            return;
39
        }
40
 
41
        $this->data .= $data;
42
 
43
        if (!$this->listening) {
44
            $this->listening = true;
45
 
46
            $this->loop->addWriteStream($this->stream, array($this, 'handleWrite'));
47
        }
48
 
49
        $belowSoftLimit = strlen($this->data) < $this->softLimit;
50
 
51
        return $belowSoftLimit;
52
    }
53
 
54
    public function end($data = null)
55
    {
56
        if (null !== $data) {
57
            $this->write($data);
58
        }
59
 
60
        $this->writable = false;
61
 
62
        if ($this->listening) {
63
            $this->on('full-drain', array($this, 'close'));
64
        } else {
65
            $this->close();
66
        }
67
    }
68
 
69
    public function close()
70
    {
71
        $this->writable = false;
72
        $this->listening = false;
73
        $this->data = '';
74
 
75
        $this->emit('close', array($this));
76
    }
77
 
78
    public function handleWrite()
79
    {
80
        if (!is_resource($this->stream)) {
81
            $this->emit('error', array(new \RuntimeException('Tried to write to invalid stream.'), $this));
82
 
83
            return;
84
        }
85
 
86
        set_error_handler(array($this, 'errorHandler'));
87
 
88
        $sent = fwrite($this->stream, $this->data);
89
 
90
        restore_error_handler();
91
 
92
        if (false === $sent) {
93
            $this->emit('error', array(
94
                new \ErrorException(
95
                    $this->lastError['message'],
96
                    0,
97
                    $this->lastError['number'],
98
                    $this->lastError['file'],
99
                    $this->lastError['line']
100
                ),
101
                $this
102
            ));
103
 
104
            return;
105
        }
106
 
107
        if (0 === $sent && feof($this->stream)) {
108
            $this->emit('error', array(new \RuntimeException('Tried to write to closed stream.'), $this));
109
 
110
            return;
111
        }
112
 
113
        $len = strlen($this->data);
114
        $this->data = (string) substr($this->data, $sent);
115
 
116
        if ($len >= $this->softLimit && $len - $sent < $this->softLimit) {
117
            $this->emit('drain', array($this));
118
        }
119
 
120
        if (0 === strlen($this->data)) {
121
            $this->loop->removeWriteStream($this->stream);
122
            $this->listening = false;
123
 
124
            $this->emit('full-drain', array($this));
125
        }
126
    }
127
 
128
    private function errorHandler($errno, $errstr, $errfile, $errline)
129
    {
130
        $this->lastError['number']  = $errno;
131
        $this->lastError['message'] = $errstr;
132
        $this->lastError['file']    = $errfile;
133
        $this->lastError['line']    = $errline;
134
    }
135
}