Blame | Last modification | View Log | RSS feed
<?phpnamespace React\EventLoop;use Event;use EventBase;use React\EventLoop\Tick\FutureTickQueue;use React\EventLoop\Tick\NextTickQueue;use React\EventLoop\Timer\Timer;use React\EventLoop\Timer\TimerInterface;use SplObjectStorage;/*** An ext-libevent based event-loop.*/class LibEventLoop implements LoopInterface{const MICROSECONDS_PER_SECOND = 1000000;private $eventBase;private $nextTickQueue;private $futureTickQueue;private $timerCallback;private $timerEvents;private $streamCallback;private $streamEvents = [];private $streamFlags = [];private $readListeners = [];private $writeListeners = [];private $running;public function __construct(){$this->eventBase = event_base_new();$this->nextTickQueue = new NextTickQueue($this);$this->futureTickQueue = new FutureTickQueue($this);$this->timerEvents = new SplObjectStorage();$this->createTimerCallback();$this->createStreamCallback();}/*** {@inheritdoc}*/public function addReadStream($stream, callable $listener){$key = (int) $stream;if (!isset($this->readListeners[$key])) {$this->readListeners[$key] = $listener;$this->subscribeStreamEvent($stream, EV_READ);}}/*** {@inheritdoc}*/public function addWriteStream($stream, callable $listener){$key = (int) $stream;if (!isset($this->writeListeners[$key])) {$this->writeListeners[$key] = $listener;$this->subscribeStreamEvent($stream, EV_WRITE);}}/*** {@inheritdoc}*/public function removeReadStream($stream){$key = (int) $stream;if (isset($this->readListeners[$key])) {unset($this->readListeners[$key]);$this->unsubscribeStreamEvent($stream, EV_READ);}}/*** {@inheritdoc}*/public function removeWriteStream($stream){$key = (int) $stream;if (isset($this->writeListeners[$key])) {unset($this->writeListeners[$key]);$this->unsubscribeStreamEvent($stream, EV_WRITE);}}/*** {@inheritdoc}*/public function removeStream($stream){$key = (int) $stream;if (isset($this->streamEvents[$key])) {$event = $this->streamEvents[$key];event_del($event);event_free($event);unset($this->streamFlags[$key],$this->streamEvents[$key],$this->readListeners[$key],$this->writeListeners[$key]);}}/*** {@inheritdoc}*/public function addTimer($interval, callable $callback){$timer = new Timer($this, $interval, $callback, false);$this->scheduleTimer($timer);return $timer;}/*** {@inheritdoc}*/public function addPeriodicTimer($interval, callable $callback){$timer = new Timer($this, $interval, $callback, true);$this->scheduleTimer($timer);return $timer;}/*** {@inheritdoc}*/public function cancelTimer(TimerInterface $timer){if ($this->isTimerActive($timer)) {$event = $this->timerEvents[$timer];event_del($event);event_free($event);$this->timerEvents->detach($timer);}}/*** {@inheritdoc}*/public function isTimerActive(TimerInterface $timer){return $this->timerEvents->contains($timer);}/*** {@inheritdoc}*/public function nextTick(callable $listener){$this->nextTickQueue->add($listener);}/*** {@inheritdoc}*/public function futureTick(callable $listener){$this->futureTickQueue->add($listener);}/*** {@inheritdoc}*/public function tick(){$this->nextTickQueue->tick();$this->futureTickQueue->tick();event_base_loop($this->eventBase, EVLOOP_ONCE | EVLOOP_NONBLOCK);}/*** {@inheritdoc}*/public function run(){$this->running = true;while ($this->running) {$this->nextTickQueue->tick();$this->futureTickQueue->tick();$flags = EVLOOP_ONCE;if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {$flags |= EVLOOP_NONBLOCK;} elseif (!$this->streamEvents && !$this->timerEvents->count()) {break;}event_base_loop($this->eventBase, $flags);}}/*** {@inheritdoc}*/public function stop(){$this->running = false;}/*** Schedule a timer for execution.** @param TimerInterface $timer*/private function scheduleTimer(TimerInterface $timer){$this->timerEvents[$timer] = $event = event_timer_new();event_timer_set($event, $this->timerCallback, $timer);event_base_set($event, $this->eventBase);event_add($event, $timer->getInterval() * self::MICROSECONDS_PER_SECOND);}/*** Create a new ext-libevent event resource, or update the existing one.** @param resource $stream* @param integer $flag EV_READ or EV_WRITE*/private function subscribeStreamEvent($stream, $flag){$key = (int) $stream;if (isset($this->streamEvents[$key])) {$event = $this->streamEvents[$key];$flags = $this->streamFlags[$key] |= $flag;event_del($event);event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);} else {$event = event_new();event_set($event, $stream, EV_PERSIST | $flag, $this->streamCallback);event_base_set($event, $this->eventBase);$this->streamEvents[$key] = $event;$this->streamFlags[$key] = $flag;}event_add($event);}/*** Update the ext-libevent event resource for this stream to stop listening to* the given event type, or remove it entirely if it's no longer needed.** @param resource $stream* @param integer $flag EV_READ or EV_WRITE*/private function unsubscribeStreamEvent($stream, $flag){$key = (int) $stream;$flags = $this->streamFlags[$key] &= ~$flag;if (0 === $flags) {$this->removeStream($stream);return;}$event = $this->streamEvents[$key];event_del($event);event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);event_add($event);}/*** Create a callback used as the target of timer events.** A reference is kept to the callback for the lifetime of the loop* to prevent "Cannot destroy active lambda function" fatal error from* the event extension.*/private function createTimerCallback(){$this->timerCallback = function ($_, $__, $timer) {call_user_func($timer->getCallback(), $timer);// Timer already cancelled ...if (!$this->isTimerActive($timer)) {return;// Reschedule periodic timers ...} elseif ($timer->isPeriodic()) {event_add($this->timerEvents[$timer],$timer->getInterval() * self::MICROSECONDS_PER_SECOND);// Clean-up one shot timers ...} else {$this->cancelTimer($timer);}};}/*** Create a callback used as the target of stream events.** A reference is kept to the callback for the lifetime of the loop* to prevent "Cannot destroy active lambda function" fatal error from* the event extension.*/private function createStreamCallback(){$this->streamCallback = function ($stream, $flags) {$key = (int) $stream;if (EV_READ === (EV_READ & $flags) && isset($this->readListeners[$key])) {call_user_func($this->readListeners[$key], $stream, $this);}if (EV_WRITE === (EV_WRITE & $flags) && isset($this->writeListeners[$key])) {call_user_func($this->writeListeners[$key], $stream, $this);}};}}