Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\Stream;
4
 
5
use Evenement\EventEmitter;
6
use React\EventLoop\LoopInterface;
7
 
8
final class WritableResourceStream extends EventEmitter implements WritableStreamInterface
9
{
10
    private $stream;
11
    private $loop;
12
 
13
    /**
14
     * @var int
15
     */
16
    private $softLimit;
17
 
18
    /**
19
     * @var int
20
     */
21
    private $writeChunkSize;
22
 
23
    private $listening = false;
24
    private $writable = true;
25
    private $closed = false;
26
    private $data = '';
27
 
28
    public function __construct($stream, LoopInterface $loop, $writeBufferSoftLimit = null, $writeChunkSize = null)
29
    {
30
        if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
31
            throw new \InvalidArgumentException('First parameter must be a valid stream resource');
32
        }
33
 
34
        // ensure resource is opened for writing (fopen mode must contain either of "waxc+")
35
        $meta = \stream_get_meta_data($stream);
36
        if (isset($meta['mode']) && $meta['mode'] !== '' && \strtr($meta['mode'], 'waxc+', '.....') === $meta['mode']) {
37
            throw new \InvalidArgumentException('Given stream resource is not opened in write mode');
38
        }
39
 
40
        // this class relies on non-blocking I/O in order to not interrupt the event loop
41
        // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
42
        if (\stream_set_blocking($stream, false) !== true) {
43
            throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
44
        }
45
 
46
        $this->stream = $stream;
47
        $this->loop = $loop;
48
        $this->softLimit = ($writeBufferSoftLimit === null) ? 65536 : (int)$writeBufferSoftLimit;
49
        $this->writeChunkSize = ($writeChunkSize === null) ? -1 : (int)$writeChunkSize;
50
    }
51
 
52
    public function isWritable()
53
    {
54
        return $this->writable;
55
    }
56
 
57
    public function write($data)
58
    {
59
        if (!$this->writable) {
60
            return false;
61
        }
62
 
63
        $this->data .= $data;
64
 
65
        if (!$this->listening && $this->data !== '') {
66
            $this->listening = true;
67
 
68
            $this->loop->addWriteStream($this->stream, array($this, 'handleWrite'));
69
        }
70
 
71
        return !isset($this->data[$this->softLimit - 1]);
72
    }
73
 
74
    public function end($data = null)
75
    {
76
        if (null !== $data) {
77
            $this->write($data);
78
        }
79
 
80
        $this->writable = false;
81
 
82
        // close immediately if buffer is already empty
83
        // otherwise wait for buffer to flush first
84
        if ($this->data === '') {
85
            $this->close();
86
        }
87
    }
88
 
89
    public function close()
90
    {
91
        if ($this->closed) {
92
            return;
93
        }
94
 
95
        if ($this->listening) {
96
            $this->listening = false;
97
            $this->loop->removeWriteStream($this->stream);
98
        }
99
 
100
        $this->closed = true;
101
        $this->writable = false;
102
        $this->data = '';
103
 
104
        $this->emit('close');
105
        $this->removeAllListeners();
106
 
107
        if (\is_resource($this->stream)) {
108
            \fclose($this->stream);
109
        }
110
    }
111
 
112
    /** @internal */
113
    public function handleWrite()
114
    {
115
        $error = null;
116
        \set_error_handler(function ($_, $errstr) use (&$error) {
117
            $error = $errstr;
118
        });
119
 
120
        if ($this->writeChunkSize === -1) {
121
            $sent = \fwrite($this->stream, $this->data);
122
        } else {
123
            $sent = \fwrite($this->stream, $this->data, $this->writeChunkSize);
124
        }
125
 
126
        \restore_error_handler();
127
 
128
        // Only report errors if *nothing* could be sent and an error has been raised.
129
        // Ignore non-fatal warnings if *some* data could be sent.
130
        // Any hard (permanent) error will fail to send any data at all.
131
        // Sending excessive amounts of data will only flush *some* data and then
132
        // report a temporary error (EAGAIN) which we do not raise here in order
133
        // to keep the stream open for further tries to write.
134
        // Should this turn out to be a permanent error later, it will eventually
135
        // send *nothing* and we can detect this.
136
        if (($sent === 0 || $sent === false) && $error !== null) {
137
            $this->emit('error', array(new \RuntimeException('Unable to write to stream: ' . $error)));
138
            $this->close();
139
 
140
            return;
141
        }
142
 
143
        $exceeded = isset($this->data[$this->softLimit - 1]);
144
        $this->data = (string) \substr($this->data, $sent);
145
 
146
        // buffer has been above limit and is now below limit
147
        if ($exceeded && !isset($this->data[$this->softLimit - 1])) {
148
            $this->emit('drain');
149
        }
150
 
151
        // buffer is now completely empty => stop trying to write
152
        if ($this->data === '') {
153
            // stop waiting for resource to be writable
154
            if ($this->listening) {
155
                $this->loop->removeWriteStream($this->stream);
156
                $this->listening = false;
157
            }
158
 
159
            // buffer is end()ing and now completely empty => close buffer
160
            if (!$this->writable) {
161
                $this->close();
162
            }
163
        }
164
    }
165
}