Skip to content

Commit fd8de67

Browse files
authored
stream: catch and forward error from dest.write
PR-URL: #55270 Fixes: #54945 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 0675e05 commit fd8de67

File tree

2 files changed

+82
-4
lines changed

2 files changed

+82
-4
lines changed

lib/internal/streams/readable.js

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,10 +1004,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
10041004
src.on('data', ondata);
10051005
function ondata(chunk) {
10061006
debug('ondata');
1007-
const ret = dest.write(chunk);
1008-
debug('dest.write', ret);
1009-
if (ret === false) {
1010-
pause();
1007+
try {
1008+
const ret = dest.write(chunk);
1009+
debug('dest.write', ret);
1010+
1011+
if (ret === false) {
1012+
pause();
1013+
}
1014+
} catch (error) {
1015+
dest.destroy(error);
10111016
}
10121017
}
10131018

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('node:assert');
5+
const { Readable, Transform, Writable } = require('node:stream');
6+
7+
// Pipeine objects from object mode to non-object mode should throw an error and
8+
// catch by the consumer
9+
{
10+
const objectReadable = Readable.from([
11+
{ hello: 'hello' },
12+
{ world: 'world' },
13+
]);
14+
15+
const passThrough = new Transform({
16+
transform(chunk, _encoding, cb) {
17+
this.push(chunk);
18+
cb(null);
19+
},
20+
});
21+
22+
passThrough.on('error', common.mustCall());
23+
24+
objectReadable.pipe(passThrough);
25+
26+
assert.rejects(async () => {
27+
// eslint-disable-next-line no-unused-vars
28+
for await (const _ of passThrough);
29+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
30+
}
31+
32+
// The error should be properly forwarded when the readable stream is in object mode,
33+
// the writable stream is in non-object mode, and the data is string.
34+
{
35+
const stringReadable = Readable.from(['hello', 'world']);
36+
37+
const passThrough = new Transform({
38+
transform(chunk, _encoding, cb) {
39+
this.push(chunk);
40+
throw new Error('something went wrong');
41+
},
42+
});
43+
44+
passThrough.on('error', common.mustCall((err) => {
45+
assert.strictEqual(err.message, 'something went wrong');
46+
}));
47+
48+
stringReadable.pipe(passThrough);
49+
}
50+
51+
// The error should be properly forwarded when the readable stream is in object mode,
52+
// the writable stream is in non-object mode, and the data is buffer.
53+
{
54+
const binaryData = Buffer.from('binary data');
55+
56+
const binaryReadable = new Readable({
57+
read() {
58+
this.push(binaryData);
59+
this.push(null);
60+
}
61+
});
62+
63+
const binaryWritable = new Writable({
64+
write(chunk, _encoding, cb) {
65+
throw new Error('something went wrong');
66+
}
67+
});
68+
69+
binaryWritable.on('error', common.mustCall((err) => {
70+
assert.strictEqual(err.message, 'something went wrong');
71+
}));
72+
binaryReadable.pipe(binaryWritable);
73+
}

0 commit comments

Comments
 (0)