Skip to content

Commit de9f68a

Browse files
committed
stream: catch and forward error from dest.write
1 parent 7ae193d commit de9f68a

File tree

2 files changed

+80
-4
lines changed

2 files changed

+80
-4
lines changed

lib/internal/streams/readable.js

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,10 +1004,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
10041004
src.on('data', ondata);
10051005
function ondata(chunk) {
10061006
debug('ondata');
1007-
const ret = dest.write(chunk);
1008-
debug('dest.write', ret);
1009-
if (ret === false) {
1010-
pause();
1007+
try {
1008+
const ret = dest.write(chunk);
1009+
debug('dest.write', ret);
1010+
1011+
if (ret === false) {
1012+
pause();
1013+
}
1014+
} catch (error) {
1015+
dest.destroy(error);
10111016
}
10121017
}
10131018

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
'use strict';
2+
3+
/**
4+
* source is an object mode stream that only flows strings/buffer and the error is unrelated
5+
* source is a binary stream and dest is a binary stream
6+
*/
7+
const common = require('../common');
8+
const assert = require('node:assert');
9+
const { Readable, Transform, Writable } = require('node:stream');
10+
11+
{
12+
const objectReadable = Readable.from([
13+
{ hello: 'hello' },
14+
{ world: 'world' },
15+
]);
16+
17+
const passThrough = new Transform({
18+
transform(chunk, _encoding, cb) {
19+
this.push(chunk);
20+
cb(null);
21+
},
22+
});
23+
24+
passThrough.on('error', common.mustCall());
25+
26+
objectReadable.pipe(passThrough);
27+
28+
assert.rejects(async () => {
29+
// eslint-disable-next-line no-unused-vars
30+
for await (const _ of passThrough);
31+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
32+
}
33+
34+
{
35+
const stringReadable = Readable.from(['hello', 'world']);
36+
37+
const passThrough = new Transform({
38+
transform(chunk, _encoding, cb) {
39+
this.push(chunk);
40+
throw new Error('something went wrong');
41+
},
42+
});
43+
44+
passThrough.on('error', common.mustCall((err) => {
45+
assert.strictEqual(err.message, 'something went wrong');
46+
}));
47+
48+
stringReadable.pipe(passThrough);
49+
}
50+
51+
{
52+
const binaryData = Buffer.from('binary data');
53+
54+
const binaryReadable = new Readable({
55+
read() {
56+
this.push(binaryData);
57+
this.push(null);
58+
}
59+
});
60+
61+
const binaryWritable = new Writable({
62+
write(chunk, _encoding, cb) {
63+
throw new Error('something went wrong');
64+
}
65+
});
66+
67+
binaryWritable.on('error', common.mustCall((err) => {
68+
assert.strictEqual(err.message, 'something went wrong');
69+
}));
70+
binaryReadable.pipe(binaryWritable);
71+
}

0 commit comments

Comments
 (0)