Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\EventLoop;
4
 
5
use Ev;
6
use EvIo;
7
use EvLoop;
8
use React\EventLoop\Tick\FutureTickQueue;
9
use React\EventLoop\Timer\Timer;
10
use SplObjectStorage;
11
 
12
/**
13
 * An `ext-ev` based event loop.
14
 *
15
 * This loop uses the [`ev` PECL extension](https://pecl.php.net/package/ev),
16
 * that provides an interface to `libev` library.
17
 *
18
 * This loop is known to work with PHP 5.4 through PHP 7+.
19
 *
20
 * @see http://php.net/manual/en/book.ev.php
21
 * @see https://bitbucket.org/osmanov/pecl-ev/overview
22
 */
23
class ExtEvLoop implements LoopInterface
24
{
25
    /**
26
     * @var EvLoop
27
     */
28
    private $loop;
29
 
30
    /**
31
     * @var FutureTickQueue
32
     */
33
    private $futureTickQueue;
34
 
35
    /**
36
     * @var SplObjectStorage
37
     */
38
    private $timers;
39
 
40
    /**
41
     * @var EvIo[]
42
     */
43
    private $readStreams = array();
44
 
45
    /**
46
     * @var EvIo[]
47
     */
48
    private $writeStreams = array();
49
 
50
    /**
51
     * @var bool
52
     */
53
    private $running;
54
 
55
    /**
56
     * @var SignalsHandler
57
     */
58
    private $signals;
59
 
60
    /**
61
     * @var \EvSignal[]
62
     */
63
    private $signalEvents = array();
64
 
65
    public function __construct()
66
    {
67
        $this->loop = new EvLoop();
68
        $this->futureTickQueue = new FutureTickQueue();
69
        $this->timers = new SplObjectStorage();
70
        $this->signals = new SignalsHandler();
71
    }
72
 
73
    public function addReadStream($stream, $listener)
74
    {
75
        $key = (int)$stream;
76
 
77
        if (isset($this->readStreams[$key])) {
78
            return;
79
        }
80
 
81
        $callback = $this->getStreamListenerClosure($stream, $listener);
82
        $event = $this->loop->io($stream, Ev::READ, $callback);
83
        $this->readStreams[$key] = $event;
84
    }
85
 
86
    /**
87
     * @param resource $stream
88
     * @param callable $listener
89
     *
90
     * @return \Closure
91
     */
92
    private function getStreamListenerClosure($stream, $listener)
93
    {
94
        return function () use ($stream, $listener) {
95
            \call_user_func($listener, $stream);
96
        };
97
    }
98
 
99
    public function addWriteStream($stream, $listener)
100
    {
101
        $key = (int)$stream;
102
 
103
        if (isset($this->writeStreams[$key])) {
104
            return;
105
        }
106
 
107
        $callback = $this->getStreamListenerClosure($stream, $listener);
108
        $event = $this->loop->io($stream, Ev::WRITE, $callback);
109
        $this->writeStreams[$key] = $event;
110
    }
111
 
112
    public function removeReadStream($stream)
113
    {
114
        $key = (int)$stream;
115
 
116
        if (!isset($this->readStreams[$key])) {
117
            return;
118
        }
119
 
120
        $this->readStreams[$key]->stop();
121
        unset($this->readStreams[$key]);
122
    }
123
 
124
    public function removeWriteStream($stream)
125
    {
126
        $key = (int)$stream;
127
 
128
        if (!isset($this->writeStreams[$key])) {
129
            return;
130
        }
131
 
132
        $this->writeStreams[$key]->stop();
133
        unset($this->writeStreams[$key]);
134
    }
135
 
136
    public function addTimer($interval, $callback)
137
    {
138
        $timer = new Timer($interval, $callback, false);
139
 
140
        $that = $this;
141
        $timers = $this->timers;
142
        $callback = function () use ($timer, $timers, $that) {
143
            \call_user_func($timer->getCallback(), $timer);
144
 
145
            if ($timers->contains($timer)) {
146
                $that->cancelTimer($timer);
147
            }
148
        };
149
 
150
        $event = $this->loop->timer($timer->getInterval(), 0.0, $callback);
151
        $this->timers->attach($timer, $event);
152
 
153
        return $timer;
154
    }
155
 
156
    public function addPeriodicTimer($interval, $callback)
157
    {
158
        $timer = new Timer($interval, $callback, true);
159
 
160
        $callback = function () use ($timer) {
161
            \call_user_func($timer->getCallback(), $timer);
162
        };
163
 
164
        $event = $this->loop->timer($interval, $interval, $callback);
165
        $this->timers->attach($timer, $event);
166
 
167
        return $timer;
168
    }
169
 
170
    public function cancelTimer(TimerInterface $timer)
171
    {
172
        if (!isset($this->timers[$timer])) {
173
            return;
174
        }
175
 
176
        $event = $this->timers[$timer];
177
        $event->stop();
178
        $this->timers->detach($timer);
179
    }
180
 
181
    public function futureTick($listener)
182
    {
183
        $this->futureTickQueue->add($listener);
184
    }
185
 
186
    public function run()
187
    {
188
        $this->running = true;
189
 
190
        while ($this->running) {
191
            $this->futureTickQueue->tick();
192
 
193
            $hasPendingCallbacks = !$this->futureTickQueue->isEmpty();
194
            $wasJustStopped = !$this->running;
195
            $nothingLeftToDo = !$this->readStreams
196
                && !$this->writeStreams
197
                && !$this->timers->count()
198
                && $this->signals->isEmpty();
199
 
200
            $flags = Ev::RUN_ONCE;
201
            if ($wasJustStopped || $hasPendingCallbacks) {
202
                $flags |= Ev::RUN_NOWAIT;
203
            } elseif ($nothingLeftToDo) {
204
                break;
205
            }
206
 
207
            $this->loop->run($flags);
208
        }
209
    }
210
 
211
    public function stop()
212
    {
213
        $this->running = false;
214
    }
215
 
216
    public function __destruct()
217
    {
218
        /** @var TimerInterface $timer */
219
        foreach ($this->timers as $timer) {
220
            $this->cancelTimer($timer);
221
        }
222
 
223
        foreach ($this->readStreams as $key => $stream) {
224
            $this->removeReadStream($key);
225
        }
226
 
227
        foreach ($this->writeStreams as $key => $stream) {
228
            $this->removeWriteStream($key);
229
        }
230
    }
231
 
232
    public function addSignal($signal, $listener)
233
    {
234
        $this->signals->add($signal, $listener);
235
 
236
        if (!isset($this->signalEvents[$signal])) {
237
            $this->signalEvents[$signal] = $this->loop->signal($signal, function() use ($signal) {
238
                $this->signals->call($signal);
239
            });
240
        }
241
    }
242
 
243
    public function removeSignal($signal, $listener)
244
    {
245
        $this->signals->remove($signal, $listener);
246
 
247
        if (isset($this->signalEvents[$signal])) {
248
            $this->signalEvents[$signal]->stop();
249
            unset($this->signalEvents[$signal]);
250
        }
251
    }
252
}