diff --git a/src/DuplexResourceStream.php b/src/DuplexResourceStream.php index 7869eda..46f7a21 100644 --- a/src/DuplexResourceStream.php +++ b/src/DuplexResourceStream.php @@ -33,6 +33,7 @@ final class DuplexResourceStream extends EventEmitter implements DuplexStreamInt private $readable = true; private $writable = true; private $closing = false; + private $listening = false; public function __construct($stream, LoopInterface $loop, $readChunkSize = null, WritableStreamInterface $buffer = null) { @@ -100,13 +101,17 @@ public function isWritable() public function pause() { - $this->loop->removeReadStream($this->stream); + if ($this->listening) { + $this->loop->removeReadStream($this->stream); + $this->listening = false; + } } public function resume() { - if ($this->readable) { + if (!$this->listening && $this->readable) { $this->loop->addReadStream($this->stream, array($this, 'handleData')); + $this->listening = true; } } @@ -131,11 +136,13 @@ public function close() $this->writable = false; $this->emit('close'); - $this->loop->removeStream($this->stream); + $this->pause(); $this->buffer->close(); $this->removeAllListeners(); - $this->handleClose(); + if (is_resource($this->stream)) { + fclose($this->stream); + } } public function end($data = null) @@ -191,14 +198,6 @@ public function handleData($stream) } } - /** @internal */ - public function handleClose() - { - if (is_resource($this->stream)) { - fclose($this->stream); - } - } - /** * Returns whether this is a pipe resource in a legacy environment * diff --git a/src/ReadableResourceStream.php b/src/ReadableResourceStream.php index 6a9cd65..a0a0339 100644 --- a/src/ReadableResourceStream.php +++ b/src/ReadableResourceStream.php @@ -36,6 +36,7 @@ final class ReadableResourceStream extends EventEmitter implements ReadableStrea private $bufferSize; private $closed = false; + private $listening = false; public function __construct($stream, LoopInterface $loop, $readChunkSize = null) { @@ -81,13 +82,17 @@ public function isReadable() public function pause() { - $this->loop->removeReadStream($this->stream); + if ($this->listening) { + $this->loop->removeReadStream($this->stream); + $this->listening = false; + } } public function resume() { - if (!$this->closed) { + if (!$this->listening && !$this->closed) { $this->loop->addReadStream($this->stream, array($this, 'handleData')); + $this->listening = true; } } @@ -105,10 +110,12 @@ public function close() $this->closed = true; $this->emit('close'); - $this->loop->removeStream($this->stream); + $this->pause(); $this->removeAllListeners(); - $this->handleClose(); + if (is_resource($this->stream)) { + fclose($this->stream); + } } /** @internal */ @@ -144,14 +151,6 @@ public function handleData() } } - /** @internal */ - public function handleClose() - { - if (is_resource($this->stream)) { - fclose($this->stream); - } - } - /** * Returns whether this is a pipe resource in a legacy environment * diff --git a/tests/DuplexResourceStreamTest.php b/tests/DuplexResourceStreamTest.php index 9ea43f9..6b63e4c 100644 --- a/tests/DuplexResourceStreamTest.php +++ b/tests/DuplexResourceStreamTest.php @@ -242,12 +242,85 @@ public function testEndRemovesReadStreamFromLoop() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); - $loop->expects($this->once())->method('removeReadStream'); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); $conn = new DuplexResourceStream($stream, $loop); $conn->end('bye'); } + /** + * @covers React\Stream\DuplexResourceStream::pause + */ + public function testPauseRemovesReadStreamFromLoop() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); + + $conn = new DuplexResourceStream($stream, $loop); + $conn->pause(); + $conn->pause(); + } + + /** + * @covers React\Stream\DuplexResourceStream::pause + */ + public function testResumeDoesAddStreamToLoopOnlyOnce() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + + $conn = new DuplexResourceStream($stream, $loop); + $conn->resume(); + $conn->resume(); + } + + /** + * @covers React\Stream\DuplexResourceStream::close + */ + public function testCloseRemovesReadStreamFromLoop() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); + + $conn = new DuplexResourceStream($stream, $loop); + $conn->close(); + } + + /** + * @covers React\Stream\DuplexResourceStream::close + */ + public function testCloseAfterPauseRemovesReadStreamFromLoopOnlyOnce() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); + + $conn = new DuplexResourceStream($stream, $loop); + $conn->pause(); + $conn->close(); + } + + /** + * @covers React\Stream\DuplexResourceStream::close + */ + public function testResumeAfterCloseDoesAddReadStreamToLoopOnlyOnce() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + + $conn = new DuplexResourceStream($stream, $loop); + $conn->close(); + $conn->resume(); + } + public function testEndedStreamsShouldNotWrite() { $file = tempnam(sys_get_temp_dir(), 'reactphptest_'); diff --git a/tests/ReadableResourceStreamTest.php b/tests/ReadableResourceStreamTest.php index a6909ba..71bbba0 100644 --- a/tests/ReadableResourceStreamTest.php +++ b/tests/ReadableResourceStreamTest.php @@ -204,6 +204,78 @@ public function testClosingStreamInDataEventShouldNotTriggerError() $conn->handleData($stream); } + /** + * @covers React\Stream\ReadableResourceStream::pause + */ + public function testPauseRemovesReadStreamFromLoop() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); + + $conn = new ReadableResourceStream($stream, $loop); + $conn->pause(); + $conn->pause(); + } + + /** + * @covers React\Stream\ReadableResourceStream::pause + */ + public function testResumeDoesAddStreamToLoopOnlyOnce() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + + $conn = new ReadableResourceStream($stream, $loop); + $conn->resume(); + $conn->resume(); + } + + /** + * @covers React\Stream\ReadableResourceStream::close + */ + public function testCloseRemovesReadStreamFromLoop() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); + + $conn = new ReadableResourceStream($stream, $loop); + $conn->close(); + } + + /** + * @covers React\Stream\ReadableResourceStream::close + */ + public function testCloseAfterPauseRemovesReadStreamFromLoopOnce() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + $loop->expects($this->once())->method('removeReadStream')->with($stream); + + $conn = new ReadableResourceStream($stream, $loop); + $conn->pause(); + $conn->close(); + } + + /** + * @covers React\Stream\ReadableResourceStream::close + */ + public function testResumeAfterCloseDoesAddReadStreamToLoopOnlyOnce() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + $loop->expects($this->once())->method('addReadStream')->with($stream); + + $conn = new ReadableResourceStream($stream, $loop); + $conn->close(); + $conn->resume(); + } + /** * @covers React\Stream\ReadableResourceStream::handleData */