Skip to content

Commit 1428a92

Browse files
ronagaddaleax
authored andcommitted
stream: make pipeline try to wait for 'close'
Pipeline uses eos which will invoke the callback on 'finish' and 'end' before all streams have been fully destroyed. Fixes: #32032 PR-URL: #32158 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Anna Henningsen <[email protected]>
1 parent cba9f2e commit 1428a92

File tree

2 files changed

+55
-1
lines changed

2 files changed

+55
-1
lines changed

lib/internal/streams/end-of-stream.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,22 +63,35 @@ function eos(stream, opts, callback) {
6363

6464
const wState = stream._writableState;
6565
const rState = stream._readableState;
66+
const state = wState || rState;
6667

6768
const onlegacyfinish = () => {
6869
if (!stream.writable) onfinish();
6970
};
7071

72+
// TODO (ronag): Improve soft detection to include core modules and
73+
// common ecosystem modules that do properly emit 'close' but fail
74+
// this generic check.
75+
const willEmitClose = (
76+
state &&
77+
state.autoDestroy &&
78+
state.emitClose &&
79+
state.closed === false
80+
);
81+
7182
let writableFinished = stream.writableFinished ||
7283
(wState && wState.finished);
7384
const onfinish = () => {
7485
writableFinished = true;
86+
if (willEmitClose && (!stream.readable || readable)) return;
7587
if (!readable || readableEnded) callback.call(stream);
7688
};
7789

7890
let readableEnded = stream.readableEnded ||
7991
(rState && rState.endEmitted);
8092
const onend = () => {
8193
readableEnded = true;
94+
if (willEmitClose && (!stream.writable || writable)) return;
8295
if (!writable || writableFinished) callback.call(stream);
8396
};
8497

test/parallel/test-stream-pipeline.js

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ const {
77
Readable,
88
Transform,
99
pipeline,
10-
PassThrough
10+
PassThrough,
11+
Duplex
1112
} = require('stream');
1213
const assert = require('assert');
1314
const http = require('http');
@@ -1077,3 +1078,43 @@ const { promisify } = require('util');
10771078
assert.ifError(err);
10781079
}));
10791080
}
1081+
1082+
{
1083+
let closed = false;
1084+
const src = new Readable({
1085+
read() {},
1086+
destroy(err, cb) {
1087+
process.nextTick(cb);
1088+
}
1089+
});
1090+
const dst = new Writable({
1091+
write(chunk, encoding, callback) {
1092+
callback();
1093+
}
1094+
});
1095+
src.on('close', () => {
1096+
closed = true;
1097+
});
1098+
src.push(null);
1099+
pipeline(src, dst, common.mustCall((err) => {
1100+
assert.strictEqual(closed, true);
1101+
}));
1102+
}
1103+
1104+
{
1105+
let closed = false;
1106+
const src = new Readable({
1107+
read() {},
1108+
destroy(err, cb) {
1109+
process.nextTick(cb);
1110+
}
1111+
});
1112+
const dst = new Duplex({});
1113+
src.on('close', common.mustCall(() => {
1114+
closed = true;
1115+
}));
1116+
src.push(null);
1117+
pipeline(src, dst, common.mustCall((err) => {
1118+
assert.strictEqual(closed, true);
1119+
}));
1120+
}

0 commit comments

Comments
 (0)