Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\EventLoop;
4
 
5
use libev\EventLoop;
6
use libev\IOEvent;
7
use libev\TimerEvent;
8
use React\EventLoop\Tick\FutureTickQueue;
9
use React\EventLoop\Tick\NextTickQueue;
10
use React\EventLoop\Timer\Timer;
11
use React\EventLoop\Timer\TimerInterface;
12
use SplObjectStorage;
13
 
14
/**
15
 * @see https://github.com/m4rw3r/php-libev
16
 * @see https://gist.github.com/1688204
17
 */
18
class LibEvLoop implements LoopInterface
19
{
20
    private $loop;
21
    private $nextTickQueue;
22
    private $futureTickQueue;
23
    private $timerEvents;
24
    private $readEvents = [];
25
    private $writeEvents = [];
26
    private $running;
27
 
28
    public function __construct()
29
    {
30
        $this->loop = new EventLoop();
31
        $this->nextTickQueue = new NextTickQueue($this);
32
        $this->futureTickQueue = new FutureTickQueue($this);
33
        $this->timerEvents = new SplObjectStorage();
34
    }
35
 
36
    /**
37
     * {@inheritdoc}
38
     */
39
    public function addReadStream($stream, callable $listener)
40
    {
41
        $callback = function () use ($stream, $listener) {
42
            call_user_func($listener, $stream, $this);
43
        };
44
 
45
        $event = new IOEvent($callback, $stream, IOEvent::READ);
46
        $this->loop->add($event);
47
 
48
        $this->readEvents[(int) $stream] = $event;
49
    }
50
 
51
    /**
52
     * {@inheritdoc}
53
     */
54
    public function addWriteStream($stream, callable $listener)
55
    {
56
        $callback = function () use ($stream, $listener) {
57
            call_user_func($listener, $stream, $this);
58
        };
59
 
60
        $event = new IOEvent($callback, $stream, IOEvent::WRITE);
61
        $this->loop->add($event);
62
 
63
        $this->writeEvents[(int) $stream] = $event;
64
    }
65
 
66
    /**
67
     * {@inheritdoc}
68
     */
69
    public function removeReadStream($stream)
70
    {
71
        $key = (int) $stream;
72
 
73
        if (isset($this->readEvents[$key])) {
74
            $this->readEvents[$key]->stop();
75
            unset($this->readEvents[$key]);
76
        }
77
    }
78
 
79
    /**
80
     * {@inheritdoc}
81
     */
82
    public function removeWriteStream($stream)
83
    {
84
        $key = (int) $stream;
85
 
86
        if (isset($this->writeEvents[$key])) {
87
            $this->writeEvents[$key]->stop();
88
            unset($this->writeEvents[$key]);
89
        }
90
    }
91
 
92
    /**
93
     * {@inheritdoc}
94
     */
95
    public function removeStream($stream)
96
    {
97
        $this->removeReadStream($stream);
98
        $this->removeWriteStream($stream);
99
    }
100
 
101
    /**
102
     * {@inheritdoc}
103
     */
104
    public function addTimer($interval, callable $callback)
105
    {
106
        $timer = new Timer($this, $interval, $callback, false);
107
 
108
        $callback = function () use ($timer) {
109
            call_user_func($timer->getCallback(), $timer);
110
 
111
            if ($this->isTimerActive($timer)) {
112
                $this->cancelTimer($timer);
113
            }
114
        };
115
 
116
        $event = new TimerEvent($callback, $timer->getInterval());
117
        $this->timerEvents->attach($timer, $event);
118
        $this->loop->add($event);
119
 
120
        return $timer;
121
    }
122
 
123
    /**
124
     * {@inheritdoc}
125
     */
126
    public function addPeriodicTimer($interval, callable $callback)
127
    {
128
        $timer = new Timer($this, $interval, $callback, true);
129
 
130
        $callback = function () use ($timer) {
131
            call_user_func($timer->getCallback(), $timer);
132
        };
133
 
134
        $event = new TimerEvent($callback, $interval, $interval);
135
        $this->timerEvents->attach($timer, $event);
136
        $this->loop->add($event);
137
 
138
        return $timer;
139
    }
140
 
141
    /**
142
     * {@inheritdoc}
143
     */
144
    public function cancelTimer(TimerInterface $timer)
145
    {
146
        if (isset($this->timerEvents[$timer])) {
147
            $this->loop->remove($this->timerEvents[$timer]);
148
            $this->timerEvents->detach($timer);
149
        }
150
    }
151
 
152
    /**
153
     * {@inheritdoc}
154
     */
155
    public function isTimerActive(TimerInterface $timer)
156
    {
157
        return $this->timerEvents->contains($timer);
158
    }
159
 
160
    /**
161
     * {@inheritdoc}
162
     */
163
    public function nextTick(callable $listener)
164
    {
165
        $this->nextTickQueue->add($listener);
166
    }
167
 
168
    /**
169
     * {@inheritdoc}
170
     */
171
    public function futureTick(callable $listener)
172
    {
173
        $this->futureTickQueue->add($listener);
174
    }
175
 
176
    /**
177
     * {@inheritdoc}
178
     */
179
    public function tick()
180
    {
181
        $this->nextTickQueue->tick();
182
 
183
        $this->futureTickQueue->tick();
184
 
185
        $this->loop->run(EventLoop::RUN_ONCE | EventLoop::RUN_NOWAIT);
186
    }
187
 
188
    /**
189
     * {@inheritdoc}
190
     */
191
    public function run()
192
    {
193
        $this->running = true;
194
 
195
        while ($this->running) {
196
            $this->nextTickQueue->tick();
197
 
198
            $this->futureTickQueue->tick();
199
 
200
            $flags = EventLoop::RUN_ONCE;
201
            if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
202
                $flags |= EventLoop::RUN_NOWAIT;
203
            } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count()) {
204
                break;
205
            }
206
 
207
            $this->loop->run($flags);
208
        }
209
    }
210
 
211
    /**
212
     * {@inheritdoc}
213
     */
214
    public function stop()
215
    {
216
        $this->running = false;
217
    }
218
}