From 904da979e655a5abf0852cdf770d22a095de74fe Mon Sep 17 00:00:00 2001 From: Jan Sorgalla Date: Sat, 21 Mar 2015 11:31:26 +0100 Subject: [PATCH 01/10] Add global task queue for reducing stack size when chaining promises --- src/Promise.php | 25 +++++++++++++---------- src/Queue.php | 45 ++++++++++++++++++++++++++++++++++++++++++ src/QueueInterface.php | 8 ++++++++ src/functions.php | 21 ++++++++++++++++++++ 4 files changed, 89 insertions(+), 10 deletions(-) create mode 100644 src/Queue.php create mode 100644 src/QueueInterface.php diff --git a/src/Promise.php b/src/Promise.php index 27d66387..9db035a2 100644 --- a/src/Promise.php +++ b/src/Promise.php @@ -143,22 +143,27 @@ private function notify($update = null) return; } - foreach ($this->progressHandlers as $handler) { - $handler($update); - } + $handlers = $this->progressHandlers; + + queue()->enqueue(function () use ($handlers, $update) { + foreach ($handlers as $handler) { + $handler($update); + } + }); } - private function settle(ExtendedPromiseInterface $promise) + private function settle(ExtendedPromiseInterface $result) { - $result = $promise; - - foreach ($this->handlers as $handler) { - $handler($result); - } + $handlers = $this->handlers; $this->progressHandlers = $this->handlers = []; - $this->result = $result; + + queue()->enqueue(function () use ($handlers, $result) { + foreach ($handlers as $handler) { + $handler($result); + } + }); } private function call(callable $callback) diff --git a/src/Queue.php b/src/Queue.php new file mode 100644 index 00000000..e50f3b26 --- /dev/null +++ b/src/Queue.php @@ -0,0 +1,45 @@ +queue, $task); + + if (null === $this->resumeAt && 1 !== $length) { + return; + } + + $this->drain(); + } + + private function drain() + { + $start = null !== $this->resumeAt ? $this->resumeAt : 0; + + for ($i = $start; isset($this->queue[$i]); $i++) { + $task = $this->queue[$i]; + + try { + $task(); + } catch (\Exception $e) { + if (isset($this->queue[$i + 1])) { + $this->resumeAt = $i + 1; + } else { + $this->resumeAt = null; + $this->queue = []; + } + + throw $e; + } + } + + $this->resumeAt = null; + $this->queue = []; + } +} diff --git a/src/QueueInterface.php b/src/QueueInterface.php new file mode 100644 index 00000000..0a0c2844 --- /dev/null +++ b/src/QueueInterface.php @@ -0,0 +1,8 @@ + Date: Wed, 8 Apr 2015 16:57:44 +0200 Subject: [PATCH 02/10] Optimize queue memory usage --- src/Queue.php | 32 ++++++++++++-------------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/src/Queue.php b/src/Queue.php index e50f3b26..fd687a9a 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -5,41 +5,33 @@ class Queue implements QueueInterface { private $queue = []; - private $resumeAt = null; public function enqueue(callable $task) { - $length = array_push($this->queue, $task); - - if (null === $this->resumeAt && 1 !== $length) { - return; + if (1 === array_push($this->queue, $task)) { + $this->drain(); } - - $this->drain(); } private function drain() { - $start = null !== $this->resumeAt ? $this->resumeAt : 0; - - for ($i = $start; isset($this->queue[$i]); $i++) { + for ($i = key($this->queue); isset($this->queue[$i]); $i++) { $task = $this->queue[$i]; + $exception = null; + try { $task(); - } catch (\Exception $e) { - if (isset($this->queue[$i + 1])) { - $this->resumeAt = $i + 1; - } else { - $this->resumeAt = null; - $this->queue = []; - } - - throw $e; + } catch (\Exception $exception) { + } + + unset($this->queue[$i]); + + if ($exception) { + throw $exception; } } - $this->resumeAt = null; $this->queue = []; } } From 28eb199d5083948d114740d101346d28a055fee0 Mon Sep 17 00:00:00 2001 From: Jan Sorgalla Date: Tue, 5 May 2015 16:09:19 +0200 Subject: [PATCH 03/10] Move queue classes to subnamespace --- src/{ => Queue}/Queue.php | 2 +- src/{ => Queue}/QueueInterface.php | 2 +- src/functions.php | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) rename src/{ => Queue}/Queue.php (95%) rename src/{ => Queue}/QueueInterface.php (72%) diff --git a/src/Queue.php b/src/Queue/Queue.php similarity index 95% rename from src/Queue.php rename to src/Queue/Queue.php index fd687a9a..9405b0c2 100644 --- a/src/Queue.php +++ b/src/Queue/Queue.php @@ -1,6 +1,6 @@ Date: Tue, 5 May 2015 16:10:03 +0200 Subject: [PATCH 04/10] Rename Queue to SynchronousQueue --- src/Queue/{Queue.php => SynchronousQueue.php} | 2 +- src/functions.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename src/Queue/{Queue.php => SynchronousQueue.php} (93%) diff --git a/src/Queue/Queue.php b/src/Queue/SynchronousQueue.php similarity index 93% rename from src/Queue/Queue.php rename to src/Queue/SynchronousQueue.php index 9405b0c2..34c0842e 100644 --- a/src/Queue/Queue.php +++ b/src/Queue/SynchronousQueue.php @@ -2,7 +2,7 @@ namespace React\Promise\Queue; -class Queue implements QueueInterface +class SynchronousQueue implements QueueInterface { private $queue = []; diff --git a/src/functions.php b/src/functions.php index a6ca923f..4612d36a 100644 --- a/src/functions.php +++ b/src/functions.php @@ -180,7 +180,7 @@ function queue(Queue\QueueInterface $queue = null) $globalQueueUsed = true; if (!$globalQueue) { - $globalQueue = new Queue\Queue(); + $globalQueue = new Queue\SynchronousQueue(); } return $globalQueue; From cc6f1c3846a77f42c3d5cab013dd012bab2a682a Mon Sep 17 00:00:00 2001 From: Jan Sorgalla Date: Mon, 13 Jul 2015 10:33:41 +0200 Subject: [PATCH 05/10] Simplify queue draining code --- src/Queue/SynchronousQueue.php | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/src/Queue/SynchronousQueue.php b/src/Queue/SynchronousQueue.php index 34c0842e..b2d42e9f 100644 --- a/src/Queue/SynchronousQueue.php +++ b/src/Queue/SynchronousQueue.php @@ -15,23 +15,9 @@ public function enqueue(callable $task) private function drain() { - for ($i = key($this->queue); isset($this->queue[$i]); $i++) { - $task = $this->queue[$i]; - - $exception = null; - - try { - $task(); - } catch (\Exception $exception) { - } - - unset($this->queue[$i]); - - if ($exception) { - throw $exception; - } + /** @var callable $task */ + while ($task = array_shift($this->queue)) { + $task(); } - - $this->queue = []; } } From 239c075f3f49e52ae565aae2ff50864f7f3a8c41 Mon Sep 17 00:00:00 2001 From: Jan Sorgalla Date: Mon, 14 Sep 2015 13:22:37 +0200 Subject: [PATCH 06/10] Revert "Simplify queue draining code" This reverts commit cc6f1c3846a77f42c3d5cab013dd012bab2a682a. Note: The task must be kept on the queue until after it is called. Otherwise drain() will be called recursively. --- src/Queue/SynchronousQueue.php | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/Queue/SynchronousQueue.php b/src/Queue/SynchronousQueue.php index b2d42e9f..34c0842e 100644 --- a/src/Queue/SynchronousQueue.php +++ b/src/Queue/SynchronousQueue.php @@ -15,9 +15,23 @@ public function enqueue(callable $task) private function drain() { - /** @var callable $task */ - while ($task = array_shift($this->queue)) { - $task(); + for ($i = key($this->queue); isset($this->queue[$i]); $i++) { + $task = $this->queue[$i]; + + $exception = null; + + try { + $task(); + } catch (\Exception $exception) { + } + + unset($this->queue[$i]); + + if ($exception) { + throw $exception; + } } + + $this->queue = []; } } From f12dfa42dd6671ae1f9782a676319618d311d652 Mon Sep 17 00:00:00 2001 From: Jan Sorgalla Date: Wed, 16 Sep 2015 08:24:24 +0200 Subject: [PATCH 07/10] Move resolution callback enqueuing to resolved promise classes --- src/FulfilledPromise.php | 24 +++++++++++++++--------- src/Promise.php | 8 +++----- src/RejectedPromise.php | 36 +++++++++++++++++++++--------------- 3 files changed, 39 insertions(+), 29 deletions(-) diff --git a/src/FulfilledPromise.php b/src/FulfilledPromise.php index bc8714ce..8afb0486 100644 --- a/src/FulfilledPromise.php +++ b/src/FulfilledPromise.php @@ -21,11 +21,15 @@ public function then(callable $onFulfilled = null, callable $onRejected = null, return $this; } - try { - return resolve($onFulfilled($this->value)); - } catch (\Exception $exception) { - return new RejectedPromise($exception); - } + return new Promise(function (callable $resolve, callable $reject) use ($onFulfilled) { + queue()->enqueue(function () use ($resolve, $reject, $onFulfilled) { + try { + $resolve($onFulfilled($this->value)); + } catch (\Exception $exception) { + $reject($exception); + } + }); + }); } public function done(callable $onFulfilled = null, callable $onRejected = null, callable $onProgress = null) @@ -34,11 +38,13 @@ public function done(callable $onFulfilled = null, callable $onRejected = null, return; } - $result = $onFulfilled($this->value); + queue()->enqueue(function () use ($onFulfilled) { + $result = $onFulfilled($this->value); - if ($result instanceof ExtendedPromiseInterface) { - $result->done(); - } + if ($result instanceof ExtendedPromiseInterface) { + $result->done(); + } + }); } public function otherwise(callable $onRejected) diff --git a/src/Promise.php b/src/Promise.php index 8ee8f547..8ad63584 100644 --- a/src/Promise.php +++ b/src/Promise.php @@ -159,11 +159,9 @@ private function settle(ExtendedPromiseInterface $result) $this->progressHandlers = $this->handlers = []; $this->result = $result; - queue()->enqueue(function () use ($handlers, $result) { - foreach ($handlers as $handler) { - $handler($result); - } - }); + foreach ($handlers as $handler) { + $handler($result); + } } private function result() diff --git a/src/RejectedPromise.php b/src/RejectedPromise.php index fcb9063a..dc68c514 100644 --- a/src/RejectedPromise.php +++ b/src/RejectedPromise.php @@ -21,28 +21,34 @@ public function then(callable $onFulfilled = null, callable $onRejected = null, return $this; } - try { - return resolve($onRejected($this->reason)); - } catch (\Exception $exception) { - return new RejectedPromise($exception); - } + return new Promise(function (callable $resolve, callable $reject) use ($onRejected) { + queue()->enqueue(function () use ($resolve, $reject, $onRejected) { + try { + $resolve($onRejected($this->reason)); + } catch (\Exception $exception) { + $reject($exception); + } + }); + }); } public function done(callable $onFulfilled = null, callable $onRejected = null, callable $onProgress = null) { - if (null === $onRejected) { - throw UnhandledRejectionException::resolve($this->reason); - } + queue()->enqueue(function () use ($onRejected) { + if (null === $onRejected) { + throw UnhandledRejectionException::resolve($this->reason); + } - $result = $onRejected($this->reason); + $result = $onRejected($this->reason); - if ($result instanceof self) { - throw UnhandledRejectionException::resolve($result->reason); - } + if ($result instanceof self) { + throw UnhandledRejectionException::resolve($result->reason); + } - if ($result instanceof ExtendedPromiseInterface) { - $result->done(); - } + if ($result instanceof ExtendedPromiseInterface) { + $result->done(); + } + }); } public function otherwise(callable $onRejected) From 76e04a43bd93ab81cdc6281d7057d1385f6eb91f Mon Sep 17 00:00:00 2001 From: Jan Sorgalla Date: Thu, 28 Apr 2016 20:44:46 +0200 Subject: [PATCH 08/10] Simplify queue() --- src/functions.php | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/functions.php b/src/functions.php index fd899554..cede2bfe 100644 --- a/src/functions.php +++ b/src/functions.php @@ -225,18 +225,12 @@ function reduce($promisesOrValues, callable $reduceFunc, $initialValue = null) function queue(Queue\QueueInterface $queue = null) { - static $globalQueue, $globalQueueUsed = false; + static $globalQueue; if ($queue) { - if ($globalQueueUsed) { - throw new \RuntimeException('Cannot set global queue instance because there is already an instance running.'); - } - - return $globalQueue = $queue; + return ($globalQueue = $queue); } - $globalQueueUsed = true; - if (!$globalQueue) { $globalQueue = new Queue\SynchronousQueue(); } From a5cac0191844d93b6266119612cda8706f9a0503 Mon Sep 17 00:00:00 2001 From: Jan Sorgalla Date: Thu, 28 Apr 2016 20:48:08 +0200 Subject: [PATCH 09/10] Increase deep nested promise count in test --- tests/FunctionResolveTest.php | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/FunctionResolveTest.php b/tests/FunctionResolveTest.php index 216cbb0c..ed34212c 100644 --- a/tests/FunctionResolveTest.php +++ b/tests/FunctionResolveTest.php @@ -126,13 +126,12 @@ public function shouldSupportVeryDeepNestedPromises() { $deferreds = []; - // @TODO Increase count once global-queue is merged - for ($i = 0; $i < 10; $i++) { + for ($i = 0; $i < 250; $i++) { $deferreds[] = $d = new Deferred(); $p = $d->promise(); $last = $p; - for ($j = 0; $j < 10; $j++) { + for ($j = 0; $j < 250; $j++) { $last = $last->then(function($result) { return $result; }); From 0865eac793d8dc3f3958374c3afb0d721a410948 Mon Sep 17 00:00:00 2001 From: Jan Sorgalla Date: Fri, 29 Apr 2016 21:04:10 +0200 Subject: [PATCH 10/10] Catch also \Throwable --- src/Queue/SynchronousQueue.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Queue/SynchronousQueue.php b/src/Queue/SynchronousQueue.php index 34c0842e..ec7a3d36 100644 --- a/src/Queue/SynchronousQueue.php +++ b/src/Queue/SynchronousQueue.php @@ -22,6 +22,7 @@ private function drain() try { $task(); + } catch (\Throwable $exception) { } catch (\Exception $exception) { }