Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\EventLoop;
4
 
5
use BadMethodCallException;
6
use libev\EventLoop;
7
use libev\IOEvent;
8
use libev\SignalEvent;
9
use libev\TimerEvent;
10
use React\EventLoop\Tick\FutureTickQueue;
11
use React\EventLoop\Timer\Timer;
12
use SplObjectStorage;
13
 
14
/**
15
 * An `ext-libev` based event loop.
16
 *
17
 * This uses an [unofficial `libev` extension](https://github.com/m4rw3r/php-libev).
18
 * It supports the same backends as libevent.
19
 *
20
 * This loop does only work with PHP 5.
21
 * An update for PHP 7 is [unlikely](https://github.com/m4rw3r/php-libev/issues/8)
22
 * to happen any time soon.
23
 *
24
 * @see https://github.com/m4rw3r/php-libev
25
 * @see https://gist.github.com/1688204
26
 */
27
final class ExtLibevLoop implements LoopInterface
28
{
29
    private $loop;
30
    private $futureTickQueue;
31
    private $timerEvents;
32
    private $readEvents = array();
33
    private $writeEvents = array();
34
    private $running;
35
    private $signals;
36
    private $signalEvents = array();
37
 
38
    public function __construct()
39
    {
40
        if (!\class_exists('libev\EventLoop', false)) {
41
            throw new BadMethodCallException('Cannot create ExtLibevLoop, ext-libev extension missing');
42
        }
43
 
44
        $this->loop = new EventLoop();
45
        $this->futureTickQueue = new FutureTickQueue();
46
        $this->timerEvents = new SplObjectStorage();
47
        $this->signals = new SignalsHandler();
48
    }
49
 
50
    public function addReadStream($stream, $listener)
51
    {
52
        if (isset($this->readEvents[(int) $stream])) {
53
            return;
54
        }
55
 
56
        $callback = function () use ($stream, $listener) {
57
            \call_user_func($listener, $stream);
58
        };
59
 
60
        $event = new IOEvent($callback, $stream, IOEvent::READ);
61
        $this->loop->add($event);
62
 
63
        $this->readEvents[(int) $stream] = $event;
64
    }
65
 
66
    public function addWriteStream($stream, $listener)
67
    {
68
        if (isset($this->writeEvents[(int) $stream])) {
69
            return;
70
        }
71
 
72
        $callback = function () use ($stream, $listener) {
73
            \call_user_func($listener, $stream);
74
        };
75
 
76
        $event = new IOEvent($callback, $stream, IOEvent::WRITE);
77
        $this->loop->add($event);
78
 
79
        $this->writeEvents[(int) $stream] = $event;
80
    }
81
 
82
    public function removeReadStream($stream)
83
    {
84
        $key = (int) $stream;
85
 
86
        if (isset($this->readEvents[$key])) {
87
            $this->readEvents[$key]->stop();
88
            $this->loop->remove($this->readEvents[$key]);
89
            unset($this->readEvents[$key]);
90
        }
91
    }
92
 
93
    public function removeWriteStream($stream)
94
    {
95
        $key = (int) $stream;
96
 
97
        if (isset($this->writeEvents[$key])) {
98
            $this->writeEvents[$key]->stop();
99
            $this->loop->remove($this->writeEvents[$key]);
100
            unset($this->writeEvents[$key]);
101
        }
102
    }
103
 
104
    public function addTimer($interval, $callback)
105
    {
106
        $timer = new Timer( $interval, $callback, false);
107
 
108
        $that = $this;
109
        $timers = $this->timerEvents;
110
        $callback = function () use ($timer, $timers, $that) {
111
            \call_user_func($timer->getCallback(), $timer);
112
 
113
            if ($timers->contains($timer)) {
114
                $that->cancelTimer($timer);
115
            }
116
        };
117
 
118
        $event = new TimerEvent($callback, $timer->getInterval());
119
        $this->timerEvents->attach($timer, $event);
120
        $this->loop->add($event);
121
 
122
        return $timer;
123
    }
124
 
125
    public function addPeriodicTimer($interval, $callback)
126
    {
127
        $timer = new Timer($interval, $callback, true);
128
 
129
        $callback = function () use ($timer) {
130
            \call_user_func($timer->getCallback(), $timer);
131
        };
132
 
133
        $event = new TimerEvent($callback, $interval, $interval);
134
        $this->timerEvents->attach($timer, $event);
135
        $this->loop->add($event);
136
 
137
        return $timer;
138
    }
139
 
140
    public function cancelTimer(TimerInterface $timer)
141
    {
142
        if (isset($this->timerEvents[$timer])) {
143
            $this->loop->remove($this->timerEvents[$timer]);
144
            $this->timerEvents->detach($timer);
145
        }
146
    }
147
 
148
    public function futureTick($listener)
149
    {
150
        $this->futureTickQueue->add($listener);
151
    }
152
 
153
    public function addSignal($signal, $listener)
154
    {
155
        $this->signals->add($signal, $listener);
156
 
157
        if (!isset($this->signalEvents[$signal])) {
158
            $signals = $this->signals;
159
            $this->signalEvents[$signal] = new SignalEvent(function () use ($signals, $signal) {
160
                $signals->call($signal);
161
            }, $signal);
162
            $this->loop->add($this->signalEvents[$signal]);
163
        }
164
    }
165
 
166
    public function removeSignal($signal, $listener)
167
    {
168
        $this->signals->remove($signal, $listener);
169
 
170
        if (isset($this->signalEvents[$signal]) && $this->signals->count($signal) === 0) {
171
            $this->signalEvents[$signal]->stop();
172
            $this->loop->remove($this->signalEvents[$signal]);
173
            unset($this->signalEvents[$signal]);
174
        }
175
    }
176
 
177
    public function run()
178
    {
179
        $this->running = true;
180
 
181
        while ($this->running) {
182
            $this->futureTickQueue->tick();
183
 
184
            $flags = EventLoop::RUN_ONCE;
185
            if (!$this->running || !$this->futureTickQueue->isEmpty()) {
186
                $flags |= EventLoop::RUN_NOWAIT;
187
            } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count() && $this->signals->isEmpty()) {
188
                break;
189
            }
190
 
191
            $this->loop->run($flags);
192
        }
193
    }
194
 
195
    public function stop()
196
    {
197
        $this->running = false;
198
    }
199
}