From 01297de7b33790f0811e93cba5441ebfbdfcdcd7 Mon Sep 17 00:00:00 2001 From: James Lucas Date: Fri, 17 Mar 2023 11:35:54 +1100 Subject: [PATCH] Improve memory usage by removing listener reference on stream close and not triggering garbage collection cycles --- src/functions.php | 8 ++++++-- tests/FirstTest.php | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/src/functions.php b/src/functions.php index 12acf9a..c923f1d 100644 --- a/src/functions.php +++ b/src/functions.php @@ -141,6 +141,7 @@ function first(EventEmitterInterface $stream, $event = 'data') return new Promise\Promise(function ($resolve, $reject) use ($stream, $event, &$listener) { $listener = function ($data = null) use ($stream, $event, &$listener, $resolve) { $stream->removeListener($event, $listener); + $listener = null; $resolve($data); }; $stream->on($event, $listener); @@ -156,8 +157,11 @@ function first(EventEmitterInterface $stream, $event = 'data') }); } - $stream->on('close', function () use ($stream, $event, $listener, $reject) { - $stream->removeListener($event, $listener); + $stream->on('close', function () use ($stream, $event, &$listener, $reject) { + if ($listener !== null) { + $stream->removeListener($event, $listener); + $listener = null; + } $reject(new \RuntimeException('Stream closed')); }); }, function ($_, $reject) use ($stream, $event, &$listener) { diff --git a/tests/FirstTest.php b/tests/FirstTest.php index 1ce643c..c40e64f 100644 --- a/tests/FirstTest.php +++ b/tests/FirstTest.php @@ -112,4 +112,43 @@ public function testCancelPendingStreamWillReject() $this->expectPromiseReject($promise); } + + public function testNoGarbageCollectionCyclesAfterClosingStream() + { + \gc_collect_cycles(); + $stream = new ThroughStream(); + $promise = Stream\first($stream); + + $stream->close(); + + $this->assertSame(0, \gc_collect_cycles()); + } + + public function testShouldResolveWithoutCreatingGarbageCyclesAfterDataThenClose() + { + \gc_collect_cycles(); + + $stream = new ThroughStream(); + + $promise = Stream\first($stream); + + $stream->emit('data', array('hello', $stream)); + $stream->close(); + + $this->expectPromiseResolve($promise); + $this->assertSame(0, \gc_collect_cycles()); + } + + public function testCancelPendingStreamWillRejectWithoutCreatingGarbageCycles() + { + \gc_collect_cycles(); + $stream = new ThroughStream(); + + $promise = Stream\first($stream); + + $promise->cancel(); + + $this->expectPromiseReject($promise); + $this->assertSame(0, \gc_collect_cycles()); + } }