From a46cd996fb56fc36485583f821bc4dc3bfdbe2cb Mon Sep 17 00:00:00 2001 From: Steve Rhoades Date: Sun, 9 Nov 2014 20:24:15 -0800 Subject: [PATCH 01/10] re-igniting libuv support --- src/Factory.php | 2 + src/LibUvLoop.php | 303 ++++++++++++++++++++++ tests/LibUvLoopTest.php | 541 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 846 insertions(+) create mode 100644 src/LibUvLoop.php create mode 100644 tests/LibUvLoopTest.php diff --git a/src/Factory.php b/src/Factory.php index 9a481e35..36cd2d5c 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -13,6 +13,8 @@ public static function create() return new LibEvLoop; } elseif (class_exists('EventBase', false)) { return new ExtEventLoop; + } elseif (function_exists('uv_default_loop')) { + return new LibUvLoop(); } return new StreamSelectLoop(); diff --git a/src/LibUvLoop.php b/src/LibUvLoop.php new file mode 100644 index 00000000..f8799ade --- /dev/null +++ b/src/LibUvLoop.php @@ -0,0 +1,303 @@ +loop = uv_loop_new(); + $this->timers = new SplObjectStorage(); + $this->nextTickQueue = new NextTickQueue($this); + $this->futureTickQueue = new FutureTickQueue($this); + } + + /** + * {@inheritdoc} + */ + public function addReadStream($stream, callable $listener) + { + $this->addStream($stream, $listener, \UV::READABLE); + } + + /** + * {@inheritdoc} + */ + public function addWriteStream($stream, callable $listener) + { + $this->addStream($stream, $listener, \UV::WRITABLE); + } + + /** + * {@inheritdoc} + */ + public function removeReadStream($stream) + { + if (!isset($this->events[(int) $stream])) { + return; + } + + uv_poll_stop($this->events[(int) $stream]); + unset($this->listeners[(int) $stream]['read']); + + if (!isset($this->listeners[(int) $stream]['read']) + && !isset($this->listeners[(int) $stream]['write'])) { + unset($this->events[(int) $stream]); + } + } + + /** + * {@inheritdoc} + */ + public function removeWriteStream($stream) + { + if (!isset($this->events[(int) $stream])) { + return; + } + + uv_poll_stop($this->events[(int) $stream]); + unset($this->listeners[(int) $stream]['write']); + + if (!isset($this->listeners[(int) $stream]['read']) + && !isset($this->listeners[(int) $stream]['write'])) { + unset($this->events[(int) $stream]); + } + } + + /** + * {@inheritdoc} + */ + public function removeStream($stream) + { + if (isset($this->events[(int) $stream])) { + + uv_poll_stop($this->events[(int) $stream]); + + unset($this->listeners[(int) $stream]['read']); + unset($this->listeners[(int) $stream]['write']); + unset($this->events[(int) $stream]); + } + } + + /** + * {@inheritdoc} + */ + public function addTimer($interval, callable $callback) + { + return $this->createTimer($interval, $callback, 0); + } + + /** + * {@inheritdoc} + */ + public function addPeriodicTimer($interval, callable $callback) + { + return $this->createTimer($interval, $callback, 1); + } + + /** + * {@inheritdoc} + */ + public function cancelTimer(TimerInterface $timer) + { + uv_timer_stop($this->timers[$timer]); + uv_unref($this->timers[$timer]); + $this->timers->detach($timer); + } + + /** + * {@inheritdoc} + */ + public function isTimerActive(TimerInterface $timer) + { + return $this->timers->contains($timer); + } + + /** + * {@inheritdoc} + */ + public function nextTick(callable $listener) + { + $this->nextTickQueue->add($listener); + } + + /** + * {@inheritdoc} + */ + public function futureTick(callable $listener) + { + $this->futureTickQueue->add($listener); + } + + /** + * {@inheritdoc} + */ + public function tick() + { + $this->nextTickQueue->tick(); + + $this->futureTickQueue->tick(); + + uv_run_once($this->loop); + } + + /** + * {@inheritdoc} + */ + public function run() + { + $this->running = true; + + while ($this->running) { + $this->nextTickQueue->tick(); + + $this->futureTickQueue->tick(); + + $flags = \UV::RUN_ONCE; + if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) { + $flags = \UV::RUN_NOWAIT; + } elseif (empty($this->events) && !$this->timers->count()) { + break; + } + + uv_run($this->loop, $flags); + } + } + + /** + * {@inheritdoc} + */ + public function stop() + { + $this->running = false; + } + + /* PRIVATE */ + + private function addStream($stream, $listener, $flags) + { + $meta = stream_get_meta_data($stream); + if (get_resource_type($stream) == "Unknown" || !(strpos($meta['stream_type'], 'socket')) ) { + throw new \InvalidArgumentException("Stream must be a resource of type socket."); + + return false; + } + + $currentFlag = 0; + if (isset($this->listeners[(int) $stream]['read'])) { + $currentFlag |= \UV::READABLE; + } + + if (isset($this->listeners[(int) $stream]['write'])) { + $currentFlag |= \UV::WRITABLE; + } + + if (($flags & \UV::READABLE) === $flags) { + $this->listeners[(int) $stream]['read'] = $listener; + } elseif (($flags & \UV::WRITABLE) === $flags) { + $this->listeners[(int) $stream]['write'] = $listener; + } + + if (!isset($this->events[(int) $stream])) { + $event = uv_poll_init($this->loop, $stream); + $this->events[(int) $stream] = $event; + } else { + $event = $this->events[(int) $stream]; + } + + $listener = $this->createStreamListener(); + uv_poll_start($event, $currentFlag | $flags, $listener); + } + + /** + * Create a stream listener + * + * @return callable Returns a callback + */ + private function createStreamListener() + { + $loop = $this; + + $callback = function ($poll, $status, $event, $stream) use ($loop, &$callback) { + if ($status < 0) { + + if (isset($loop->listeners[(int) $stream]['read'])) { + call_user_func(array($this, 'removeReadStream'), $stream); + } + + if (isset($loop->writeListeners[(int) $stream]['write'])) { + call_user_func(array($this, 'removeWriteStream'), $stream); + } + + return; + } + + if (($event & \UV::READABLE) && isset($loop->listeners[(int) $stream]['read'])) { + call_user_func($loop->listeners[(int) $stream]['read'], $stream); + } + + if (($event & \UV::WRITABLE) && isset($loop->listeners[(int) $stream]['write'])) { + call_user_func($loop->listeners[(int) $stream]['write'], $stream); + } + }; + + return $callback; + } + + /** + * Add callback and configured a timer + * + * @param Int $interval The interval of the timer + * @param Callable $callback The callback to be executed + * @param int $periodic 0 = one-off, 1 = periodic + * @return Timer Returns a timer instance + */ + private function createTimer($interval, $callback, $periodic) + { + $timer = new Timer($this, $interval, $callback, $periodic); + $resource = uv_timer_init($this->loop); + + $timers = $this->timers; + $timers->attach($timer, $resource); + + $callback = $this->wrapTimerCallback($timer, $periodic); + uv_timer_start($resource, $interval * 1000, $interval * 1000, $callback); + + return $timer; + } + + /** + * Create a timer wrapper for periodic/one-off timers + * + * @param Timer $timer Timer object + * @param int $periodic 0 = one-off, 1 = periodic + * @return Callable wrapper + */ + private function wrapTimerCallback($timer, $periodic) + { + $callback = function () use ($timer, $periodic) { + + call_user_func($timer->getCallback(), $timer); + + if (!$periodic) { + $timer->cancel(); + } + }; + + return $callback; + } +} diff --git a/tests/LibUvLoopTest.php b/tests/LibUvLoopTest.php new file mode 100644 index 00000000..22e307a2 --- /dev/null +++ b/tests/LibUvLoopTest.php @@ -0,0 +1,541 @@ +markTestSkipped('libuv tests skipped because ext-uv is not installed.'); + } + + if(!is_null($this->loop)) { + return $this->loop; + } + + return new LibUvLoop(); + } + + public function testLibEventConstructor() + { + $loop = new LibUvLoop(); + } + + public function createStream() + { + $sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + + stream_set_blocking($sockets[0], 0); + stream_set_blocking($sockets[1], 0); + + return $sockets; + } + + public function writeToStream($stream, $content) + { + fwrite($stream, $content); + } + + /** + * Make sure event loop throws exception, as libuv only supports + * network socket streams. + * @group socketonly + */ + public function testCanOnlyAddSocketStream() + { + $this->setExpectedException('InvalidArgumentException'); + + $fp = fopen("php://temp", "r+"); + $this->loop->addReadStream($fp, function(){}); + + } + + public function testAddReadStream() + { + $streams = $this->createStream(); + + $this->loop->addReadStream($streams[1], $this->expectCallableExactly(2)); + + $this->writeToStream($streams[0], "foo\n"); + $this->loop->tick(); + + $this->writeToStream($streams[0], "bar\n"); + $this->loop->tick(); + } + + public function testAddWriteStream() + { + $input = $this->createStream(); + + $this->loop->addWriteStream($input[0], $this->expectCallableExactly(2)); + $this->loop->tick(); + $this->loop->tick(); + } + + public function testRemoveReadStreamInstantly() + { + $input = $this->createStream(); + + $this->loop->addReadStream($input[1], $this->expectCallableNever()); + $this->loop->removeReadStream($input[1]); + + $this->writeToStream($input[0], "bar\n"); + $this->loop->tick(); + + //cleanup + $this->loop->removeStream($input[0]); + } + + public function testRemoveReadStreamAfterReading() + { + $input = $this->createStream(); + + $this->loop->addReadStream($input[1], $this->expectCallableOnce()); + + $this->writeToStream($input[0], "foo\n"); + $this->loop->tick(); + + $this->loop->removeReadStream($input[1]); + + $this->writeToStream($input[0], "bar\n"); + $this->loop->tick(); + + //cleanup + $this->loop->removeStream($input[0]); + } + + public function testRemoveWriteStreamInstantly() + { + $input = $this->createStream(); + + $this->loop->addWriteStream($input[0], $this->expectCallableNever()); + $this->loop->removeWriteStream($input[0]); + $this->loop->tick(); + } + + public function testRemoveWriteStreamAfterWriting() + { + $input = $this->createStream(); + + $this->loop->addWriteStream($input[0], $this->expectCallableOnce()); + $this->loop->tick(); + + $this->loop->removeWriteStream($input[0]); + $this->loop->tick(); + } + + public function testRemoveStreamInstantly() + { + $input = $this->createStream(); + + $this->loop->addReadStream($input[0], $this->expectCallableNever()); + $this->loop->addWriteStream($input[0], $this->expectCallableNever()); + $this->loop->removeStream($input[0]); + + $this->writeToStream($input[0], "bar\n"); + $this->loop->tick(); + } + + public function testRemoveStreamForReadOnly() + { + $input = $this->createStream(); + + $this->loop->addReadStream($input[1], $this->expectCallableNever()); + $this->loop->addWriteStream($input[0], $this->expectCallableOnce()); + $this->loop->removeReadStream($input[1]); + + $this->writeToStream($input[0], "foo\n"); + $this->loop->tick(); + + //cleanup + $this->loop->removeStream($input[0]); + } + + public function testRemoveStreamForWriteOnly() + { + $input = $this->createStream(); + + $this->writeToStream($input[0], "foo\n"); + + $this->loop->addReadStream($input[1], $this->expectCallableOnce()); + $this->loop->addWriteStream($input[0], $this->expectCallableNever()); + $this->loop->removeWriteStream($input[0]); + + $this->loop->tick(); + + //cleanup + $this->loop->removeStream($input[1]); + } + + public function testRemoveStream() + { + $input = $this->createStream(); + + $this->loop->addReadStream($input[1], $this->expectCallableOnce()); + $this->loop->addWriteStream($input[0], $this->expectCallableOnce()); + + $this->writeToStream($input[0], "bar\n"); + $this->loop->tick(); + + $this->loop->removeStream($input[0]); + $this->loop->removeStream($input[1]); + + $this->writeToStream($input[0], "bar\n"); + $this->loop->tick(); + } + + public function testRemoveInvalid() + { + $input = $this->createStream(); + + // remove a valid stream from the event loop that was never added in the first place + $this->loop->removeReadStream($input[0]); + $this->loop->removeWriteStream($input[0]); + $this->loop->removeStream($input[0]); + } + + /** @test */ + public function emptyRunShouldSimplyReturn() + { + $this->assertRunFasterThan(0.005); + } + + /** @test */ + public function runShouldReturnWhenNoMoreFds() + { + $input = $this->createStream(); + + $loop = $this->loop; + $this->loop->addReadStream($input[1], function ($stream) use ($input) { + $this->loop->removeStream($stream); + $this->loop->removeStream($input[0]); + }); + + $this->writeToStream($input[0], "foo\n"); + + $this->assertRunFasterThan(0.005); + } + + /** @test */ + public function stopShouldStopRunningLoop() + { + $input = $this->createStream(); + + $loop = $this->loop; + $this->loop->addReadStream($input[1], function ($stream) use ($loop) { + $loop->stop(); + }); + + $this->writeToStream($input[0], "foo\n"); + + $this->assertRunFasterThan(0.005); + } + + public function testStopShouldPreventRunFromBlocking() + { + $this->loop->addTimer( + 1, + function () { + $this->fail('Timer was executed.'); + } + ); + + $this->loop->nextTick( + function () { + $this->loop->stop(); + } + ); + + $this->assertRunFasterThan(0.005); + } + + public function testIgnoreRemovedCallback() + { + // two independent streams, both should be readable right away + $input = $this->createStream(); + // $stream2 = $this->createStream(); + + $loop = $this->loop; + $loop->addReadStream($input[1], function ($stream) use ($loop, $input) { + // stream1 is readable, remove stream2 as well => this will invalidate its callback + $loop->removeReadStream($stream); + $loop->removeReadStream($input[0]); + }); + + // this callback would have to be called as well, but the first stream already removed us + $loop->addReadStream($input[0], $this->expectCallableNever()); + + $this->writeToStream($input[0], "foo\n"); + $this->writeToStream($input[1], "foo\n"); + + $loop->run(); + } + + public function testNextTick() + { + $called = false; + + $callback = function ($loop) use (&$called) { + $this->assertSame($this->loop, $loop); + $called = true; + }; + + $this->loop->nextTick($callback); + + $this->assertFalse($called); + + $this->loop->tick(); + + $this->assertTrue($called); + } + + public function testNextTickFiresBeforeIO() + { + $stream = $this->createStream(); + + $this->loop->addWriteStream( + $stream[0], + function () { + echo 'stream' . PHP_EOL; + } + ); + + $this->loop->nextTick( + function () { + echo 'next-tick' . PHP_EOL; + } + ); + + $this->expectOutputString('next-tick' . PHP_EOL . 'stream' . PHP_EOL); + + $this->loop->tick(); + } + + public function testRecursiveNextTick() + { + $stream = $this->createStream(); + + $this->loop->addWriteStream( + $stream[0], + function () { + echo 'stream' . PHP_EOL; + } + ); + + $this->loop->nextTick( + function () { + $this->loop->nextTick( + function () { + echo 'next-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('next-tick' . PHP_EOL . 'stream' . PHP_EOL); + + $this->loop->tick(); + } + + public function testRunWaitsForNextTickEvents() + { + $stream = $this->createStream(); + + $this->loop->addWriteStream( + $stream[0], + function () use ($stream) { + $this->loop->removeStream($stream[0]); + $this->loop->nextTick( + function () { + echo 'next-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('next-tick' . PHP_EOL); + + $this->loop->run(); + + $this->writeToStream($stream[0], "foo\n"); + } + + public function testNextTickEventGeneratedByFutureTick() + { + $this->loop->futureTick( + function () { + $this->loop->nextTick( + function () { + echo 'next-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('next-tick' . PHP_EOL); + + $this->loop->run(); + } + + public function testNextTickEventGeneratedByTimer() + { + $this->loop->addTimer( + 0.001, + function () { + $this->loop->nextTick( + function () { + echo 'next-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('next-tick' . PHP_EOL); + + $this->loop->run(); + } + + public function testFutureTick() + { + $called = false; + + $callback = function ($loop) use (&$called) { + $this->assertSame($this->loop, $loop); + $called = true; + }; + + $this->loop->futureTick($callback); + + $this->assertFalse($called); + + $this->loop->tick(); + + $this->assertTrue($called); + } + + public function testFutureTickFiresBeforeIO() + { + $stream = $this->createStream(); + + $this->loop->addWriteStream( + $stream[0], + function () { + echo 'stream' . PHP_EOL; + } + ); + + $this->loop->futureTick( + function () { + echo 'future-tick' . PHP_EOL; + } + ); + + $this->writeToStream($stream[0], "foo\n"); + $this->expectOutputString('future-tick' . PHP_EOL . 'stream' . PHP_EOL); + + $this->loop->tick(); + } + + public function testRecursiveFutureTick() + { + $stream = $this->createStream(); + + $this->loop->addWriteStream( + $stream[0], + function () use ($stream) { + echo 'stream' . PHP_EOL; + $this->loop->removeWriteStream($stream[0]); + } + ); + + $this->loop->futureTick( + function () { + echo 'future-tick-1' . PHP_EOL; + $this->loop->futureTick( + function () { + echo 'future-tick-2' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('future-tick-1' . PHP_EOL . 'stream' . PHP_EOL . 'future-tick-2' . PHP_EOL); + + $this->loop->run(); + + $this->writeToStream($stream[0], "foo\n"); + } + + public function testRunWaitsForFutureTickEvents() + { + $stream = $this->createStream(); + + $this->loop->addWriteStream( + $stream[0], + function () use ($stream) { + $this->removeStream($stream[0]); + $this->loop->futureTick( + function () { + echo 'future-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('future-tick' . PHP_EOL); + + $this->loop->run(); + + $this->writeToStream($stream[0], "foo\n"); + } + + public function testFutureTickEventGeneratedByNextTick() + { + $this->loop->nextTick( + function () { + $this->loop->futureTick( + function () { + echo 'future-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('future-tick' . PHP_EOL); + + $this->loop->run(); + } + + public function testFutureTickEventGeneratedByTimer() + { + $this->loop->addTimer( + 0.001, + function () { + $this->loop->futureTick( + function () { + echo 'future-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('future-tick' . PHP_EOL); + + $this->loop->run(); + } + + private function assertRunFasterThan($maxInterval) + { + $start = microtime(true); + + $this->loop->run(); + + $end = microtime(true); + $interval = $end - $start; + + $this->assertLessThan($maxInterval, $interval); + } +} From f65b11ff3123ccf8d2d4040ef69815eac393ac35 Mon Sep 17 00:00:00 2001 From: Steve Rhoades Date: Mon, 10 Nov 2014 08:44:18 -0800 Subject: [PATCH 02/10] add travis support for libuv --- travis-init.sh | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/travis-init.sh b/travis-init.sh index 63deeb85..61edb641 100755 --- a/travis-init.sh +++ b/travis-init.sh @@ -31,6 +31,26 @@ if [[ "$TRAVIS_PHP_VERSION" != "hhvm" && popd echo "extension=libev.so" >> "$(php -r 'echo php_ini_loaded_file();')" + # install 'libuv' + git clone --recursive --branch v1.0.0-rc2 --depth 1 https://github.com/joyent/libuv + pushd libuv + ./autogen.sh && ./configure && make && make install + popd + + #install 'php-uv' + phpize && ./configure --with-uv --enable-httpparser && make && sudo make install + echo "extension=uv.so" >> `php --ini | grep "Loaded Configuration" | sed -e "s|.*:\s*||"` + + # install 'php-uv' PHP extension + git clone --recursive https://github.com/m4rw3r/php-libev + pushd php-libev + phpize + ./configure --with-libev + make + make install + popd + echo "extension=libev.so" >> "$(php -r 'echo php_ini_loaded_file();')" + fi composer install --dev --prefer-source From 5818a48bb846a6bc863f2063c2330494609f7ba6 Mon Sep 17 00:00:00 2001 From: Steve Rhoades Date: Mon, 10 Nov 2014 08:53:50 -0800 Subject: [PATCH 03/10] fix permission issue with libuv install --- travis-init.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/travis-init.sh b/travis-init.sh index 61edb641..363427a4 100755 --- a/travis-init.sh +++ b/travis-init.sh @@ -34,7 +34,7 @@ if [[ "$TRAVIS_PHP_VERSION" != "hhvm" && # install 'libuv' git clone --recursive --branch v1.0.0-rc2 --depth 1 https://github.com/joyent/libuv pushd libuv - ./autogen.sh && ./configure && make && make install + ./autogen.sh && ./configure && make && sudo make install popd #install 'php-uv' From 29b8bef31bc6a2e4dba55396a44b28959093fcd5 Mon Sep 17 00:00:00 2001 From: Steve Rhoades Date: Mon, 10 Nov 2014 09:02:08 -0800 Subject: [PATCH 04/10] fix permission issue with libuv install --- travis-init.sh | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/travis-init.sh b/travis-init.sh index 363427a4..3d9841a2 100755 --- a/travis-init.sh +++ b/travis-init.sh @@ -41,16 +41,6 @@ if [[ "$TRAVIS_PHP_VERSION" != "hhvm" && phpize && ./configure --with-uv --enable-httpparser && make && sudo make install echo "extension=uv.so" >> `php --ini | grep "Loaded Configuration" | sed -e "s|.*:\s*||"` - # install 'php-uv' PHP extension - git clone --recursive https://github.com/m4rw3r/php-libev - pushd php-libev - phpize - ./configure --with-libev - make - make install - popd - echo "extension=libev.so" >> "$(php -r 'echo php_ini_loaded_file();')" - fi composer install --dev --prefer-source From 1b1e8ee2de724a6cd315bae9f16bc2b93f1f5012 Mon Sep 17 00:00:00 2001 From: Steve Rhoades Date: Mon, 10 Nov 2014 09:16:31 -0800 Subject: [PATCH 05/10] fix permission issue with libuv install --- travis-init.sh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/travis-init.sh b/travis-init.sh index 3d9841a2..59166a22 100755 --- a/travis-init.sh +++ b/travis-init.sh @@ -38,8 +38,11 @@ if [[ "$TRAVIS_PHP_VERSION" != "hhvm" && popd #install 'php-uv' + git clone --recursive --branch libuv-1.0 --depth 1 https://github.com/steverhoades/php-uv + pushd php-uv phpize && ./configure --with-uv --enable-httpparser && make && sudo make install - echo "extension=uv.so" >> `php --ini | grep "Loaded Configuration" | sed -e "s|.*:\s*||"` + echo "extension=uv.so" >> "$(php -r 'echo php_ini_loaded_file();')" + popd fi From 8fa5c8e1296a8ef84ad84e36e0f1d0ac9407e7ec Mon Sep 17 00:00:00 2001 From: Steve Rhoades Date: Mon, 10 Nov 2014 09:26:12 -0800 Subject: [PATCH 06/10] fix typo --- tests/LibUvLoopTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/LibUvLoopTest.php b/tests/LibUvLoopTest.php index 22e307a2..5d341edd 100644 --- a/tests/LibUvLoopTest.php +++ b/tests/LibUvLoopTest.php @@ -476,7 +476,7 @@ public function testRunWaitsForFutureTickEvents() $this->loop->addWriteStream( $stream[0], function () use ($stream) { - $this->removeStream($stream[0]); + $this->loop->removeStream($stream[0]); $this->loop->futureTick( function () { echo 'future-tick' . PHP_EOL; From 25b52c8ffbd53bb9393d4ae426882582cae951c1 Mon Sep 17 00:00:00 2001 From: Steve Rhoades Date: Mon, 10 Nov 2014 09:38:11 -0800 Subject: [PATCH 07/10] remove deprecated call to uv_run_once --- src/LibUvLoop.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/LibUvLoop.php b/src/LibUvLoop.php index f8799ade..10acde3e 100644 --- a/src/LibUvLoop.php +++ b/src/LibUvLoop.php @@ -152,7 +152,7 @@ public function tick() $this->futureTickQueue->tick(); - uv_run_once($this->loop); + uv_run($this->loop, \UV::RUN_ONCE); } /** From e1aca991f7cd27cbdb826a5d5044986192a71344 Mon Sep 17 00:00:00 2001 From: Steve Rhoades Date: Wed, 12 Nov 2014 15:02:41 -0800 Subject: [PATCH 08/10] added support for filesystem to event-loop --- src/LibUvLoop.php | 57 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 13 deletions(-) diff --git a/src/LibUvLoop.php b/src/LibUvLoop.php index 10acde3e..9b9f45fe 100644 --- a/src/LibUvLoop.php +++ b/src/LibUvLoop.php @@ -18,6 +18,22 @@ class LibUvLoop implements LoopInterface private $nextTickQueue; private $futureTickQueue; + public $tasks = 0; + + /** + * @todo FIXME - this is temporary to allow filesystem to work + * @param string $name name of get param + * @return string + */ + public function __get($name) + { + if($name === 'loop') { + return $this->loop; + } + + return NULL; + } + public function __construct() { $this->loop = uv_loop_new(); @@ -152,7 +168,33 @@ public function tick() $this->futureTickQueue->tick(); - uv_run($this->loop, \UV::RUN_ONCE); + $flags = \UV::RUN_ONCE; + if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) { + $flags = \UV::RUN_NOWAIT; + } elseif (empty($this->events) && !$this->timers->count() && !$this->tasks) { + $this->running = false; + return; + } + + uv_run($this->loop, $flags); + } + + /** + * Wrap a task related callback, this will keep track of pending tasks + * and keep the event loop running accordingly. + * + * @param callable $task the callback you want executed when task completes + * @return callable wrapper to task tracking + */ + public function taskCallback($task) + { + $callback = function() use ($task) { + $this->tasks--; + call_user_func_array($task, func_get_args()); + }; + $this->tasks++; + + return $callback; } /** @@ -163,18 +205,7 @@ public function run() $this->running = true; while ($this->running) { - $this->nextTickQueue->tick(); - - $this->futureTickQueue->tick(); - - $flags = \UV::RUN_ONCE; - if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) { - $flags = \UV::RUN_NOWAIT; - } elseif (empty($this->events) && !$this->timers->count()) { - break; - } - - uv_run($this->loop, $flags); + $this->tick(); } } From dd2e11e6b458c856d6f52507b0a4c56d7ef0bb17 Mon Sep 17 00:00:00 2001 From: Steve Rhoades Date: Wed, 12 Nov 2014 16:32:10 -0800 Subject: [PATCH 09/10] change visibility of tasks --- src/LibUvLoop.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/LibUvLoop.php b/src/LibUvLoop.php index 9b9f45fe..1293d583 100644 --- a/src/LibUvLoop.php +++ b/src/LibUvLoop.php @@ -18,7 +18,7 @@ class LibUvLoop implements LoopInterface private $nextTickQueue; private $futureTickQueue; - public $tasks = 0; + private $tasks = 0; /** * @todo FIXME - this is temporary to allow filesystem to work From c11ec04f346a679f8713c3c7e81d624ead4ac9a1 Mon Sep 17 00:00:00 2001 From: Steve Rhoades Date: Fri, 14 Nov 2014 15:00:13 -0800 Subject: [PATCH 10/10] remove uv_unref call to timer as this causes a SIGABRT on php shutdown --- src/LibUvLoop.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/LibUvLoop.php b/src/LibUvLoop.php index 1293d583..df19aba2 100644 --- a/src/LibUvLoop.php +++ b/src/LibUvLoop.php @@ -131,7 +131,7 @@ public function addPeriodicTimer($interval, callable $callback) public function cancelTimer(TimerInterface $timer) { uv_timer_stop($this->timers[$timer]); - uv_unref($this->timers[$timer]); + $this->timers->detach($timer); }