diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 798745a110d51b..cb358cb00184df 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -24,8 +24,15 @@ function destroyer(stream, reading, writing, callback) { closed = true; }); + stream.on('error', () => { + // Pipeline should not allow uncaught stream errors to propagate. + // 1. eos can complete before 'close' (e.g. 'finish' or 'end') + // and therefore 'error' can be emitted after eos. + // 2. Buggy streams can emit 'error' after 'close' + }); + if (eos === undefined) eos = require('internal/streams/end-of-stream'); - eos(stream, { readable: reading, writable: writing }, (err) => { + eos(stream, { readable: reading, writable: writing, error: false }, (err) => { if (err) return callback(err); closed = true; callback(); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index ef5a39fddd881f..a72b718241c4b1 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -477,3 +477,22 @@ const { promisify } = require('util'); { code: 'ERR_INVALID_CALLBACK' } ); } + +{ + const read = new Readable({ + read() {} + }); + + const write = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + read.push(null); + pipeline(read, write, common.mustCall(err => { + // Should swallow unexpected errors. + read.emit('error', new Error()); + write.emit('error', new Error()); + })); +}