Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
namespace Ratchet\WebSocket;
3
use Ratchet\ComponentInterface;
4
use Ratchet\ConnectionInterface;
5
use Ratchet\MessageComponentInterface as DataComponentInterface;
6
use Ratchet\Http\HttpServerInterface;
7
use Ratchet\Http\CloseResponseTrait;
8
use Psr\Http\Message\RequestInterface;
9
use Ratchet\RFC6455\Messaging\MessageInterface;
10
use Ratchet\RFC6455\Messaging\FrameInterface;
11
use Ratchet\RFC6455\Messaging\Frame;
12
use Ratchet\RFC6455\Messaging\MessageBuffer;
13
use Ratchet\RFC6455\Messaging\CloseFrameChecker;
14
use Ratchet\RFC6455\Handshake\ServerNegotiator;
15
use Ratchet\RFC6455\Handshake\RequestVerifier;
16
use React\EventLoop\LoopInterface;
17
use GuzzleHttp\Psr7 as gPsr;
18
 
19
/**
20
 * The adapter to handle WebSocket requests/responses
21
 * This is a mediator between the Server and your application to handle real-time messaging through a web browser
22
 * @link http://ca.php.net/manual/en/ref.http.php
23
 * @link http://dev.w3.org/html5/websockets/
24
 */
25
class WsServer implements HttpServerInterface {
26
    use CloseResponseTrait;
27
 
28
    /**
29
     * Decorated component
30
     * @var \Ratchet\ComponentInterface
31
     */
32
    private $delegate;
33
 
34
    /**
35
     * @var \SplObjectStorage
36
     */
37
    protected $connections;
38
 
39
    /**
40
     * @var \Ratchet\RFC6455\Messaging\CloseFrameChecker
41
     */
42
    private $closeFrameChecker;
43
 
44
    /**
45
     * @var \Ratchet\RFC6455\Handshake\ServerNegotiator
46
     */
47
    private $handshakeNegotiator;
48
 
49
    /**
50
     * @var \Closure
51
     */
52
    private $ueFlowFactory;
53
 
54
    /**
55
     * @var \Closure
56
     */
57
    private $pongReceiver;
58
 
59
    /**
60
     * @var \Closure
61
     */
62
    private $msgCb;
63
 
64
    /**
65
     * @param \Ratchet\WebSocket\MessageComponentInterface|\Ratchet\MessageComponentInterface $component Your application to run with WebSockets
66
     * @note If you want to enable sub-protocols have your component implement WsServerInterface as well
67
     */
68
    public function __construct(ComponentInterface $component) {
69
        if ($component instanceof MessageComponentInterface) {
70
            $this->msgCb = function(ConnectionInterface $conn, MessageInterface $msg) {
71
                $this->delegate->onMessage($conn, $msg);
72
            };
73
        } elseif ($component instanceof DataComponentInterface) {
74
            $this->msgCb = function(ConnectionInterface $conn, MessageInterface $msg) {
75
                $this->delegate->onMessage($conn, $msg->getPayload());
76
            };
77
        } else {
78
            throw new \UnexpectedValueException('Expected instance of \Ratchet\WebSocket\MessageComponentInterface or \Ratchet\MessageComponentInterface');
79
        }
80
 
81
        if (bin2hex('✓') !== 'e29c93') {
82
            throw new \DomainException('Bad encoding, unicode character ✓ did not match expected value. Ensure charset UTF-8 and check ini val mbstring.func_autoload');
83
        }
84
 
85
        $this->delegate    = $component;
86
        $this->connections = new \SplObjectStorage;
87
 
88
        $this->closeFrameChecker   = new CloseFrameChecker;
89
        $this->handshakeNegotiator = new ServerNegotiator(new RequestVerifier);
90
        $this->handshakeNegotiator->setStrictSubProtocolCheck(true);
91
 
92
        if ($component instanceof WsServerInterface) {
93
            $this->handshakeNegotiator->setSupportedSubProtocols($component->getSubProtocols());
94
        }
95
 
96
        $this->pongReceiver = function() {};
97
 
98
        $reusableUnderflowException = new \UnderflowException;
99
        $this->ueFlowFactory = function() use ($reusableUnderflowException) {
100
            return $reusableUnderflowException;
101
        };
102
    }
103
 
104
    /**
105
     * {@inheritdoc}
106
     */
107
    public function onOpen(ConnectionInterface $conn, RequestInterface $request = null) {
108
        if (null === $request) {
109
            throw new \UnexpectedValueException('$request can not be null');
110
        }
111
 
112
        $conn->httpRequest = $request;
113
 
114
        $conn->WebSocket            = new \StdClass;
115
        $conn->WebSocket->closing   = false;
116
 
117
        $response = $this->handshakeNegotiator->handshake($request)->withHeader('X-Powered-By', \Ratchet\VERSION);
118
 
119
        $conn->send(gPsr\str($response));
120
 
121
        if (101 !== $response->getStatusCode()) {
122
            return $conn->close();
123
        }
124
 
125
        $wsConn = new WsConnection($conn);
126
 
127
        $streamer = new MessageBuffer(
128
            $this->closeFrameChecker,
129
            function(MessageInterface $msg) use ($wsConn) {
130
                $cb = $this->msgCb;
131
                $cb($wsConn, $msg);
132
            },
133
            function(FrameInterface $frame) use ($wsConn) {
134
                $this->onControlFrame($frame, $wsConn);
135
            },
136
            true,
137
            $this->ueFlowFactory
138
        );
139
 
140
        $this->connections->attach($conn, new ConnContext($wsConn, $streamer));
141
 
142
        return $this->delegate->onOpen($wsConn);
143
    }
144
 
145
    /**
146
     * {@inheritdoc}
147
     */
148
    public function onMessage(ConnectionInterface $from, $msg) {
149
        if ($from->WebSocket->closing) {
150
            return;
151
        }
152
 
153
        $this->connections[$from]->buffer->onData($msg);
154
    }
155
 
156
    /**
157
     * {@inheritdoc}
158
     */
159
    public function onClose(ConnectionInterface $conn) {
160
        if ($this->connections->contains($conn)) {
161
            $context = $this->connections[$conn];
162
            $this->connections->detach($conn);
163
 
164
            $this->delegate->onClose($context->connection);
165
        }
166
    }
167
 
168
    /**
169
     * {@inheritdoc}
170
     */
171
    public function onError(ConnectionInterface $conn, \Exception $e) {
172
        if ($this->connections->contains($conn)) {
173
            $this->delegate->onError($this->connections[$conn]->connection, $e);
174
        } else {
175
            $conn->close();
176
        }
177
    }
178
 
179
    public function onControlFrame(FrameInterface $frame, WsConnection $conn) {
180
        switch ($frame->getOpCode()) {
181
            case Frame::OP_CLOSE:
182
                $conn->close($frame);
183
                break;
184
            case Frame::OP_PING:
185
                $conn->send(new Frame($frame->getPayload(), true, Frame::OP_PONG));
186
                break;
187
            case Frame::OP_PONG:
188
                $pongReceiver = $this->pongReceiver;
189
                $pongReceiver($frame, $conn);
190
            break;
191
        }
192
    }
193
 
194
    public function setStrictSubProtocolCheck($enable) {
195
        $this->handshakeNegotiator->setStrictSubProtocolCheck($enable);
196
    }
197
 
198
    public function enableKeepAlive(LoopInterface $loop, $interval = 30) {
199
        $lastPing = new Frame(uniqid(), true, Frame::OP_PING);
200
        $pingedConnections = new \SplObjectStorage;
201
        $splClearer = new \SplObjectStorage;
202
 
203
        $this->pongReceiver = function(FrameInterface $frame, $wsConn) use ($pingedConnections, &$lastPing) {
204
            if ($frame->getPayload() === $lastPing->getPayload()) {
205
                $pingedConnections->detach($wsConn);
206
            }
207
        };
208
 
209
        $loop->addPeriodicTimer((int)$interval, function() use ($pingedConnections, &$lastPing, $splClearer) {
210
            foreach ($pingedConnections as $wsConn) {
211
                $wsConn->close();
212
            }
213
            $pingedConnections->removeAllExcept($splClearer);
214
 
215
            $lastPing = new Frame(uniqid(), true, Frame::OP_PING);
216
 
217
            foreach ($this->connections as $key => $conn) {
218
                $wsConn  = $this->connections[$conn]->connection;
219
 
220
                $wsConn->send($lastPing);
221
                $pingedConnections->attach($wsConn);
222
            }
223
        });
224
   }
225
}