Skip to content

Commit 2ad3ec3

Browse files
ronagMylesBorins
authored andcommitted
stream: destroy wrapped streams on error
Stream should be destroyed and update state accordingly when the wrapped stream emits error. Does some additional cleanup with future TODOs that might be worth looking into. PR-URL: #34102 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Anna Henningsen <[email protected]>
1 parent 4b5d531 commit 2ad3ec3

File tree

2 files changed

+55
-5
lines changed

2 files changed

+55
-5
lines changed

lib/_stream_readable.js

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
6666
ObjectSetPrototypeOf(Readable, Stream);
6767

6868
const { errorOrDestroy } = destroyImpl;
69-
const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];
7069

7170
function prependListener(emitter, event, fn) {
7271
// Sadly this is not cacheable as some libraries bundle their own
@@ -1034,10 +1033,29 @@ Readable.prototype.wrap = function(stream) {
10341033
}
10351034
}
10361035

1037-
// Proxy certain important events.
1038-
for (const kProxyEvent of kProxyEvents) {
1039-
stream.on(kProxyEvent, this.emit.bind(this, kProxyEvent));
1040-
}
1036+
stream.on('error', (err) => {
1037+
errorOrDestroy(this, err);
1038+
});
1039+
1040+
stream.on('close', () => {
1041+
// TODO(ronag): Update readable state?
1042+
this.emit('close');
1043+
});
1044+
1045+
stream.on('destroy', () => {
1046+
// TODO(ronag): this.destroy()?
1047+
this.emit('destroy');
1048+
});
1049+
1050+
stream.on('pause', () => {
1051+
// TODO(ronag): this.pause()?
1052+
this.emit('pause');
1053+
});
1054+
1055+
stream.on('resume', () => {
1056+
// TODO(ronag): this.resume()?
1057+
this.emit('resume');
1058+
});
10411059

10421060
// When we try to consume some more bytes, simply unpause the
10431061
// underlying stream.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
5+
const Readable = require('_stream_readable');
6+
const EE = require('events').EventEmitter;
7+
8+
const oldStream = new EE();
9+
oldStream.pause = () => {};
10+
oldStream.resume = () => {};
11+
12+
{
13+
const r = new Readable({ autoDestroy: true })
14+
.wrap(oldStream)
15+
.on('error', common.mustCall(() => {
16+
assert.strictEqual(r._readableState.errorEmitted, true);
17+
assert.strictEqual(r._readableState.errored, true);
18+
assert.strictEqual(r.destroyed, true);
19+
}));
20+
oldStream.emit('error', new Error());
21+
}
22+
23+
{
24+
const r = new Readable({ autoDestroy: false })
25+
.wrap(oldStream)
26+
.on('error', common.mustCall(() => {
27+
assert.strictEqual(r._readableState.errorEmitted, true);
28+
assert.strictEqual(r._readableState.errored, true);
29+
assert.strictEqual(r.destroyed, false);
30+
}));
31+
oldStream.emit('error', new Error());
32+
}

0 commit comments

Comments
 (0)