Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
namespace Ratchet\Wamp;
3
use Ratchet\ConnectionInterface;
4
use Ratchet\WebSocket\WsServerInterface;
5
 
6
class TopicManager implements WsServerInterface, WampServerInterface {
7
    /**
8
     * @var WampServerInterface
9
     */
10
    protected $app;
11
 
12
    /**
13
     * @var array
14
     */
15
    protected $topicLookup = array();
16
 
17
    public function __construct(WampServerInterface $app) {
18
        $this->app = $app;
19
    }
20
 
21
    /**
22
     * {@inheritdoc}
23
     */
24
    public function onOpen(ConnectionInterface $conn) {
25
        $conn->WAMP->subscriptions = new \SplObjectStorage;
26
        $this->app->onOpen($conn);
27
    }
28
 
29
    /**
30
     * {@inheritdoc}
31
     */
32
    public function onCall(ConnectionInterface $conn, $id, $topic, array $params) {
33
        $this->app->onCall($conn, $id, $this->getTopic($topic), $params);
34
    }
35
 
36
    /**
37
     * {@inheritdoc}
38
     */
39
    public function onSubscribe(ConnectionInterface $conn, $topic) {
40
        $topicObj = $this->getTopic($topic);
41
 
42
        if ($conn->WAMP->subscriptions->contains($topicObj)) {
43
            return;
44
        }
45
 
46
        $this->topicLookup[$topic]->add($conn);
47
        $conn->WAMP->subscriptions->attach($topicObj);
48
        $this->app->onSubscribe($conn, $topicObj);
49
    }
50
 
51
    /**
52
     * {@inheritdoc}
53
     */
54
    public function onUnsubscribe(ConnectionInterface $conn, $topic) {
55
        $topicObj = $this->getTopic($topic);
56
 
57
        if (!$conn->WAMP->subscriptions->contains($topicObj)) {
58
            return;
59
        }
60
 
61
        $this->cleanTopic($topicObj, $conn);
62
 
63
        $this->app->onUnsubscribe($conn, $topicObj);
64
    }
65
 
66
    /**
67
     * {@inheritdoc}
68
     */
69
    public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
70
        $this->app->onPublish($conn, $this->getTopic($topic), $event, $exclude, $eligible);
71
    }
72
 
73
    /**
74
     * {@inheritdoc}
75
     */
76
    public function onClose(ConnectionInterface $conn) {
77
        $this->app->onClose($conn);
78
 
79
        foreach ($this->topicLookup as $topic) {
80
            $this->cleanTopic($topic, $conn);
81
        }
82
    }
83
 
84
    /**
85
     * {@inheritdoc}
86
     */
87
    public function onError(ConnectionInterface $conn, \Exception $e) {
88
        $this->app->onError($conn, $e);
89
    }
90
 
91
    /**
92
     * {@inheritdoc}
93
     */
94
    public function getSubProtocols() {
95
        if ($this->app instanceof WsServerInterface) {
96
            return $this->app->getSubProtocols();
97
        }
98
 
99
        return array();
100
    }
101
 
102
    /**
103
     * @param string
104
     * @return Topic
105
     */
106
    protected function getTopic($topic) {
107
        if (!array_key_exists($topic, $this->topicLookup)) {
108
            $this->topicLookup[$topic] = new Topic($topic);
109
        }
110
 
111
        return $this->topicLookup[$topic];
112
    }
113
 
114
    protected function cleanTopic(Topic $topic, ConnectionInterface $conn) {
115
        if ($conn->WAMP->subscriptions->contains($topic)) {
116
            $conn->WAMP->subscriptions->detach($topic);
117
        }
118
 
119
        $this->topicLookup[$topic->getId()]->remove($conn);
120
 
121
        if (0 === $topic->count()) {
122
            unset($this->topicLookup[$topic->getId()]);
123
        }
124
    }
125
}