Skip to content

Commit f4d90ca

Browse files
authored
Merge pull request #89 from clue-labs/through-callback
Simplify ThroughStream by using data callback instead of inheritance
2 parents 148c979 + 62a939f commit f4d90ca

File tree

3 files changed

+219
-4
lines changed

3 files changed

+219
-4
lines changed

README.md

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ descriptor based implementation with an in-memory write buffer.
3737
* [ReadableResourceStream](#readableresourcestream)
3838
* [WritableResourceStream](#writableresourcestream)
3939
* [DuplexResourceStream](#duplexresourcestream)
40+
* [ThroughStream](#throughstream)
4041
* [Usage](#usage)
4142
* [Install](#install)
4243
* [Tests](#tests)
@@ -924,6 +925,70 @@ $buffer->softLimit = 8192;
924925

925926
See also [`write()`](#write) for more details.
926927

928+
### ThroughStream
929+
930+
The `ThroughStream` implements the
931+
[`DuplexStreamInterface`](#duplexstreaminterface) and will simply pass any data
932+
you write to it through to its readable end.
933+
934+
```php
935+
$through = new ThroughStream();
936+
$through->on('data', $this->expectCallableOnceWith('hello'));
937+
938+
$through->write('hello');
939+
```
940+
941+
Similarly, the [`end()` method](#end) will end the stream and emit an
942+
[`end` event](#end-event) and then [`close()`](#close-1) the stream.
943+
The [`close()` method](#close-1) will close the stream and emit a
944+
[`close` event](#close-event).
945+
Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this:
946+
947+
```php
948+
$through = new ThroughStream();
949+
$source->pipe($through)->pipe($dest);
950+
```
951+
952+
Optionally, its constructor accepts any callable function which will then be
953+
used to *filter* any data written to it. This function receives a single data
954+
argument as passed to the writable side and must return the data as it will be
955+
passed to its readable end:
956+
957+
```php
958+
$through = new ThroughStream('strtoupper');
959+
$source->pipe($through)->pipe($dest);
960+
```
961+
962+
Note that this class makes no assumptions about any data types. This can be
963+
used to convert data, for example for transforming any structured data into
964+
a newline-delimited JSON (NDJSON) stream like this:
965+
966+
```php
967+
$through = new ThroughStream(function ($data) {
968+
return json_encode($data) . PHP_EOL;
969+
});
970+
$through->on('data', $this->expectCallableOnceWith("[2, true]\n"));
971+
972+
$through->write(array(2, true));
973+
```
974+
975+
The callback function is allowed to throw an `Exception`. In this case,
976+
the stream will emit an `error` event and then [`close()`](#close-1) the stream.
977+
978+
```php
979+
$through = new ThroughStream(function ($data) {
980+
if (!is_string($data)) {
981+
throw new \UnexpectedValueException('Only strings allowed');
982+
}
983+
return $data;
984+
});
985+
$through->on('error', $this->expectCallableOnce()));
986+
$through->on('close', $this->expectCallableOnce()));
987+
$through->on('data', $this->expectCallableNever()));
988+
989+
$through->write(2);
990+
```
991+
927992
## Usage
928993
```php
929994
$loop = React\EventLoop\Factory::create();

src/ThroughStream.php

Lines changed: 95 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,92 @@
33
namespace React\Stream;
44

55
use Evenement\EventEmitter;
6-
6+
use InvalidArgumentException;
7+
8+
/**
9+
* The `ThroughStream` implements the
10+
* [`DuplexStreamInterface`](#duplexstreaminterface) and will simply pass any data
11+
* you write to it through to its readable end.
12+
*
13+
* ```php
14+
* $through = new ThroughStream();
15+
* $through->on('data', $this->expectCallableOnceWith('hello'));
16+
*
17+
* $through->write('hello');
18+
* ```
19+
*
20+
* Similarly, the [`end()` method](#end) will end the stream and emit an
21+
* [`end` event](#end-event) and then [`close()`](#close-1) the stream.
22+
* The [`close()` method](#close-1) will close the stream and emit a
23+
* [`close` event](#close-event).
24+
* Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this:
25+
*
26+
* ```php
27+
* $through = new ThroughStream();
28+
* $source->pipe($through)->pipe($dest);
29+
* ```
30+
*
31+
* Optionally, its constructor accepts any callable function which will then be
32+
* used to *filter* any data written to it. This function receives a single data
33+
* argument as passed to the writable side and must return the data as it will be
34+
* passed to its readable end:
35+
*
36+
* ```php
37+
* $through = new ThroughStream('strtoupper');
38+
* $source->pipe($through)->pipe($dest);
39+
* ```
40+
*
41+
* Note that this class makes no assumptions about any data types. This can be
42+
* used to convert data, for example for transforming any structured data into
43+
* a newline-delimited JSON (NDJSON) stream like this:
44+
*
45+
* ```php
46+
* $through = new ThroughStream(function ($data) {
47+
* return json_encode($data) . PHP_EOL;
48+
* });
49+
* $through->on('data', $this->expectCallableOnceWith("[2, true]\n"));
50+
*
51+
* $through->write(array(2, true));
52+
* ```
53+
*
54+
* The callback function is allowed to throw an `Exception`. In this case,
55+
* the stream will emit an `error` event and then [`close()`](#close-1) the stream.
56+
*
57+
* ```php
58+
* $through = new ThroughStream(function ($data) {
59+
* if (!is_string($data)) {
60+
* throw new \UnexpectedValueException('Only strings allowed');
61+
* }
62+
* return $data;
63+
* });
64+
* $through->on('error', $this->expectCallableOnce()));
65+
* $through->on('close', $this->expectCallableOnce()));
66+
* $through->on('data', $this->expectCallableNever()));
67+
*
68+
* $through->write(2);
69+
* ```
70+
*
71+
* @see WritableStreamInterface::write()
72+
* @see WritableStreamInterface::end()
73+
* @see DuplexStreamInterface::close()
74+
* @see WritableStreamInterface::pipe()
75+
*/
776
class ThroughStream extends EventEmitter implements DuplexStreamInterface
877
{
978
private $readable = true;
1079
private $writable = true;
1180
private $closed = false;
1281
private $paused = false;
1382
private $drain = false;
83+
private $callback;
1484

15-
public function filter($data)
85+
public function __construct($callback = null)
1686
{
17-
return $data;
87+
if ($callback !== null && !is_callable($callback)) {
88+
throw new InvalidArgumentException('Invalid transformation callback given');
89+
}
90+
91+
$this->callback = $callback;
1892
}
1993

2094
public function pause()
@@ -52,7 +126,18 @@ public function write($data)
52126
return false;
53127
}
54128

55-
$this->emit('data', array($this->filter($data)));
129+
if ($this->callback !== null) {
130+
try {
131+
$data = call_user_func($this->callback, $data);
132+
} catch (\Exception $e) {
133+
$this->emit('error', array($e));
134+
$this->close();
135+
136+
return false;
137+
}
138+
}
139+
140+
$this->emit('data', array($data));
56141

57142
if ($this->paused) {
58143
$this->drain = true;
@@ -70,6 +155,11 @@ public function end($data = null)
70155

71156
if (null !== $data) {
72157
$this->write($data);
158+
159+
// return if write() already caused the stream to close
160+
if (!$this->writable) {
161+
return;
162+
}
73163
}
74164

75165
$this->readable = false;
@@ -92,6 +182,7 @@ public function close()
92182
$this->closed = true;
93183
$this->paused = true;
94184
$this->drain = false;
185+
$this->callback = null;
95186

96187
$this->emit('close');
97188
}

tests/ThroughStreamTest.php

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,15 @@
1010
*/
1111
class ThroughStreamTest extends TestCase
1212
{
13+
/**
14+
* @test
15+
* @expectedException InvalidArgumentException
16+
*/
17+
public function itShouldRejectInvalidCallback()
18+
{
19+
new ThroughStream(123);
20+
}
21+
1322
/** @test */
1423
public function itShouldReturnTrueForAnyDataWrittenToIt()
1524
{
@@ -27,6 +36,56 @@ public function itShouldEmitAnyDataWrittenToIt()
2736
$through->write('foo');
2837
}
2938

39+
/** @test */
40+
public function itShouldEmitAnyDataWrittenToItPassedThruFunction()
41+
{
42+
$through = new ThroughStream('strtoupper');
43+
$through->on('data', $this->expectCallableOnceWith('FOO'));
44+
$through->write('foo');
45+
}
46+
47+
/** @test */
48+
public function itShouldEmitAnyDataWrittenToItPassedThruCallback()
49+
{
50+
$through = new ThroughStream('strtoupper');
51+
$through->on('data', $this->expectCallableOnceWith('FOO'));
52+
$through->write('foo');
53+
}
54+
55+
/** @test */
56+
public function itShouldEmitErrorAndCloseIfCallbackThrowsException()
57+
{
58+
$through = new ThroughStream(function () {
59+
throw new \RuntimeException();
60+
});
61+
$through->on('error', $this->expectCallableOnce());
62+
$through->on('close', $this->expectCallableOnce());
63+
$through->on('data', $this->expectCallableNever());
64+
$through->on('end', $this->expectCallableNever());
65+
66+
$through->write('foo');
67+
68+
$this->assertFalse($through->isReadable());
69+
$this->assertFalse($through->isWritable());
70+
}
71+
72+
/** @test */
73+
public function itShouldEmitErrorAndCloseIfCallbackThrowsExceptionOnEnd()
74+
{
75+
$through = new ThroughStream(function () {
76+
throw new \RuntimeException();
77+
});
78+
$through->on('error', $this->expectCallableOnce());
79+
$through->on('close', $this->expectCallableOnce());
80+
$through->on('data', $this->expectCallableNever());
81+
$through->on('end', $this->expectCallableNever());
82+
83+
$through->end('foo');
84+
85+
$this->assertFalse($through->isReadable());
86+
$this->assertFalse($through->isWritable());
87+
}
88+
3089
/** @test */
3190
public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused()
3291
{

0 commit comments

Comments
 (0)