Blame | Last modification | View Log | RSS feed
<?phpnamespace Ratchet\WebSocket;use Ratchet\ComponentInterface;use Ratchet\ConnectionInterface;use Ratchet\MessageComponentInterface as DataComponentInterface;use Ratchet\Http\HttpServerInterface;use Ratchet\Http\CloseResponseTrait;use Psr\Http\Message\RequestInterface;use Ratchet\RFC6455\Messaging\MessageInterface;use Ratchet\RFC6455\Messaging\FrameInterface;use Ratchet\RFC6455\Messaging\Frame;use Ratchet\RFC6455\Messaging\MessageBuffer;use Ratchet\RFC6455\Messaging\CloseFrameChecker;use Ratchet\RFC6455\Handshake\ServerNegotiator;use Ratchet\RFC6455\Handshake\RequestVerifier;use React\EventLoop\LoopInterface;use GuzzleHttp\Psr7 as gPsr;/*** The adapter to handle WebSocket requests/responses* This is a mediator between the Server and your application to handle real-time messaging through a web browser* @link http://ca.php.net/manual/en/ref.http.php* @link http://dev.w3.org/html5/websockets/*/class WsServer implements HttpServerInterface {use CloseResponseTrait;/*** Decorated component* @var \Ratchet\ComponentInterface*/private $delegate;/*** @var \SplObjectStorage*/protected $connections;/*** @var \Ratchet\RFC6455\Messaging\CloseFrameChecker*/private $closeFrameChecker;/*** @var \Ratchet\RFC6455\Handshake\ServerNegotiator*/private $handshakeNegotiator;/*** @var \Closure*/private $ueFlowFactory;/*** @var \Closure*/private $pongReceiver;/*** @var \Closure*/private $msgCb;/*** @param \Ratchet\WebSocket\MessageComponentInterface|\Ratchet\MessageComponentInterface $component Your application to run with WebSockets* @note If you want to enable sub-protocols have your component implement WsServerInterface as well*/public function __construct(ComponentInterface $component) {if ($component instanceof MessageComponentInterface) {$this->msgCb = function(ConnectionInterface $conn, MessageInterface $msg) {$this->delegate->onMessage($conn, $msg);};} elseif ($component instanceof DataComponentInterface) {$this->msgCb = function(ConnectionInterface $conn, MessageInterface $msg) {$this->delegate->onMessage($conn, $msg->getPayload());};} else {throw new \UnexpectedValueException('Expected instance of \Ratchet\WebSocket\MessageComponentInterface or \Ratchet\MessageComponentInterface');}if (bin2hex('✓') !== 'e29c93') {throw new \DomainException('Bad encoding, unicode character ✓ did not match expected value. Ensure charset UTF-8 and check ini val mbstring.func_autoload');}$this->delegate = $component;$this->connections = new \SplObjectStorage;$this->closeFrameChecker = new CloseFrameChecker;$this->handshakeNegotiator = new ServerNegotiator(new RequestVerifier);$this->handshakeNegotiator->setStrictSubProtocolCheck(true);if ($component instanceof WsServerInterface) {$this->handshakeNegotiator->setSupportedSubProtocols($component->getSubProtocols());}$this->pongReceiver = function() {};$reusableUnderflowException = new \UnderflowException;$this->ueFlowFactory = function() use ($reusableUnderflowException) {return $reusableUnderflowException;};}/*** {@inheritdoc}*/public function onOpen(ConnectionInterface $conn, RequestInterface $request = null) {if (null === $request) {throw new \UnexpectedValueException('$request can not be null');}$conn->httpRequest = $request;$conn->WebSocket = new \StdClass;$conn->WebSocket->closing = false;$response = $this->handshakeNegotiator->handshake($request)->withHeader('X-Powered-By', \Ratchet\VERSION);$conn->send(gPsr\str($response));if (101 !== $response->getStatusCode()) {return $conn->close();}$wsConn = new WsConnection($conn);$streamer = new MessageBuffer($this->closeFrameChecker,function(MessageInterface $msg) use ($wsConn) {$cb = $this->msgCb;$cb($wsConn, $msg);},function(FrameInterface $frame) use ($wsConn) {$this->onControlFrame($frame, $wsConn);},true,$this->ueFlowFactory);$this->connections->attach($conn, new ConnContext($wsConn, $streamer));return $this->delegate->onOpen($wsConn);}/*** {@inheritdoc}*/public function onMessage(ConnectionInterface $from, $msg) {if ($from->WebSocket->closing) {return;}$this->connections[$from]->buffer->onData($msg);}/*** {@inheritdoc}*/public function onClose(ConnectionInterface $conn) {if ($this->connections->contains($conn)) {$context = $this->connections[$conn];$this->connections->detach($conn);$this->delegate->onClose($context->connection);}}/*** {@inheritdoc}*/public function onError(ConnectionInterface $conn, \Exception $e) {if ($this->connections->contains($conn)) {$this->delegate->onError($this->connections[$conn]->connection, $e);} else {$conn->close();}}public function onControlFrame(FrameInterface $frame, WsConnection $conn) {switch ($frame->getOpCode()) {case Frame::OP_CLOSE:$conn->close($frame);break;case Frame::OP_PING:$conn->send(new Frame($frame->getPayload(), true, Frame::OP_PONG));break;case Frame::OP_PONG:$pongReceiver = $this->pongReceiver;$pongReceiver($frame, $conn);break;}}public function setStrictSubProtocolCheck($enable) {$this->handshakeNegotiator->setStrictSubProtocolCheck($enable);}public function enableKeepAlive(LoopInterface $loop, $interval = 30) {$lastPing = new Frame(uniqid(), true, Frame::OP_PING);$pingedConnections = new \SplObjectStorage;$splClearer = new \SplObjectStorage;$this->pongReceiver = function(FrameInterface $frame, $wsConn) use ($pingedConnections, &$lastPing) {if ($frame->getPayload() === $lastPing->getPayload()) {$pingedConnections->detach($wsConn);}};$loop->addPeriodicTimer((int)$interval, function() use ($pingedConnections, &$lastPing, $splClearer) {foreach ($pingedConnections as $wsConn) {$wsConn->close();}$pingedConnections->removeAllExcept($splClearer);$lastPing = new Frame(uniqid(), true, Frame::OP_PING);foreach ($this->connections as $key => $conn) {$wsConn = $this->connections[$conn]->connection;$wsConn->send($lastPing);$pingedConnections->attach($wsConn);}});}}