Skip to content

Commit 22819aa

Browse files
mcollinajasnell
authored andcommitted
stream: prevent 'end' to be emitted after 'error'
This PR adds _readableState.errorEmitted and add the tracking of it. Fixes: #6083 PR-URL: #20104 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Ruben Bridgewater <[email protected]> Reviewed-By: Trivikram Kamat <[email protected]>
1 parent 4f68133 commit 22819aa

17 files changed

+76
-20
lines changed

lib/_stream_readable.js

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ function ReadableState(options, stream, isDuplex) {
9999
this.endEmitted = false;
100100
this.reading = false;
101101

102+
// Flipped if an 'error' is emitted.
103+
this.errorEmitted = false;
104+
102105
// a flag to be able to tell if the event 'readable'/'data' is emitted
103106
// immediately, or on a later tick. We set this to true at first, because
104107
// any actions that shouldn't happen until "later" should generally also
@@ -1069,20 +1072,23 @@ function fromList(n, state) {
10691072
function endReadable(stream) {
10701073
var state = stream._readableState;
10711074

1072-
debug('endReadable', state.endEmitted);
1073-
if (!state.endEmitted) {
1075+
debug('endReadable', state.endEmitted, state.errorEmitted);
1076+
if (!state.endEmitted && !state.errorEmitted) {
10741077
state.ended = true;
10751078
process.nextTick(endReadableNT, state, stream);
10761079
}
10771080
}
10781081

10791082
function endReadableNT(state, stream) {
1080-
debug('endReadableNT', state.endEmitted, state.length);
1083+
debug('endReadableNT', state.endEmitted, state.length, state.errorEmitted);
10811084

10821085
// Check that we didn't get one last unshift.
10831086
if (!state.endEmitted && state.length === 0) {
1084-
state.endEmitted = true;
10851087
stream.readable = false;
1086-
stream.emit('end');
1088+
1089+
if (!state.errorEmitted) {
1090+
state.endEmitted = true;
1091+
stream.emit('end');
1092+
}
10871093
}
10881094
}

lib/_stream_writable.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,12 +424,22 @@ function onwriteError(stream, state, sync, er, cb) {
424424
// this can emit finish, and it will always happen
425425
// after error
426426
process.nextTick(finishMaybe, stream, state);
427+
428+
// needed for duplex, fixes https://github.com/nodejs/node/issues/6083
429+
if (stream._readableState) {
430+
stream._readableState.errorEmitted = true;
431+
}
427432
stream._writableState.errorEmitted = true;
428433
stream.emit('error', er);
429434
} else {
430435
// the caller expect this to happen before if
431436
// it is async
432437
cb(er);
438+
439+
// needed for duplex, fixes https://github.com/nodejs/node/issues/6083
440+
if (stream._readableState) {
441+
stream._readableState.errorEmitted = true;
442+
}
433443
stream._writableState.errorEmitted = true;
434444
stream.emit('error', er);
435445
// this can emit finish, but finish must

lib/internal/streams/destroy.js

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@ function destroy(err, cb) {
88
this._writableState.destroyed;
99

1010
if (readableDestroyed || writableDestroyed) {
11+
const readableErrored = this._readableState &&
12+
this._readableState.errorEmitted;
13+
const writableErrored = this._writableState &&
14+
this._writableState.errorEmitted;
15+
1116
if (cb) {
1217
cb(err);
13-
} else if (err &&
14-
(!this._writableState || !this._writableState.errorEmitted)) {
18+
} else if (err && !readableErrored && !writableErrored) {
1519
process.nextTick(emitErrorNT, this, err);
1620
}
1721
return this;
@@ -32,6 +36,11 @@ function destroy(err, cb) {
3236
this._destroy(err || null, (err) => {
3337
if (!cb && err) {
3438
process.nextTick(emitErrorAndCloseNT, this, err);
39+
40+
if (this._readableState) {
41+
this._readableState.errorEmitted = true;
42+
}
43+
3544
if (this._writableState) {
3645
this._writableState.errorEmitted = true;
3746
}
@@ -65,6 +74,7 @@ function undestroy() {
6574
this._readableState.reading = false;
6675
this._readableState.ended = false;
6776
this._readableState.endEmitted = false;
77+
this._readableState.errorEmitted = false;
6878
}
6979

7080
if (this._writableState) {

test/parallel/test-http2-client-destroy.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ const Countdown = require('../common/countdown');
9595
});
9696

9797
req.resume();
98-
req.on('end', common.mustCall());
9998
req.on('close', common.mustCall(() => server.close()));
10099
}));
101100
}

test/parallel/test-http2-client-onconnect-errors.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ function runTest(test) {
101101
});
102102
}
103103

104-
req.on('end', common.mustCall());
105104
req.on('close', common.mustCall(() => {
106105
client.destroy();
107106

test/parallel/test-http2-client-stream-destroy-before-connect.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,4 @@ server.listen(0, common.mustCall(() => {
4545

4646
req.on('response', common.mustNotCall());
4747
req.resume();
48-
req.on('end', common.mustCall());
4948
}));

test/parallel/test-http2-compat-serverresponse-destroy.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ server.listen(0, common.mustCall(() => {
6363
req.on('close', common.mustCall(() => countdown.dec()));
6464

6565
req.resume();
66-
req.on('end', common.mustCall());
6766
}
6867

6968
{
@@ -78,6 +77,5 @@ server.listen(0, common.mustCall(() => {
7877
req.on('close', common.mustCall(() => countdown.dec()));
7978

8079
req.resume();
81-
req.on('end', common.mustCall());
8280
}
8381
}));

test/parallel/test-http2-max-concurrent-streams.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ server.listen(0, common.mustCall(() => {
4545
req.on('aborted', common.mustCall());
4646
req.on('response', common.mustNotCall());
4747
req.resume();
48-
req.on('end', common.mustCall());
4948
req.on('close', common.mustCall(() => countdown.dec()));
5049
req.on('error', common.expectsError({
5150
code: 'ERR_HTTP2_STREAM_ERROR',

test/parallel/test-http2-misused-pseudoheaders.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ server.listen(0, common.mustCall(() => {
4141

4242
req.on('response', common.mustCall());
4343
req.resume();
44-
req.on('end', common.mustCall());
4544
req.on('close', common.mustCall(() => {
4645
server.close();
4746
client.close();

test/parallel/test-http2-multi-content-length.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ server.listen(0, common.mustCall(() => {
5353
// header to be set for non-payload bearing requests...
5454
const req = client.request({ 'content-length': 1 });
5555
req.resume();
56-
req.on('end', common.mustCall());
5756
req.on('close', common.mustCall(() => countdown.dec()));
5857
req.on('error', common.expectsError({
5958
code: 'ERR_HTTP2_STREAM_ERROR',

test/parallel/test-http2-respond-file-fd-invalid.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ server.listen(0, () => {
4040
req.on('response', common.mustCall());
4141
req.on('error', common.mustCall(errorCheck));
4242
req.on('data', common.mustNotCall());
43-
req.on('end', common.mustCall(() => {
43+
req.on('close', common.mustCall(() => {
4444
assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR);
4545
client.close();
4646
server.close();

test/parallel/test-http2-respond-nghttperrors.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ function runTest(test) {
8787
req.resume();
8888
req.end();
8989

90-
req.on('end', common.mustCall(() => {
90+
req.on('close', common.mustCall(() => {
9191
client.close();
9292

9393
if (!tests.length) {

test/parallel/test-http2-respond-with-fd-errors.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ function runTest(test) {
9595
req.resume();
9696
req.end();
9797

98-
req.on('end', common.mustCall(() => {
98+
req.on('close', common.mustCall(() => {
9999
client.close();
100100

101101
if (!tests.length) {

test/parallel/test-http2-server-shutdown-before-respond.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,5 @@ server.on('listening', common.mustCall(() => {
3232
}));
3333
req.resume();
3434
req.on('data', common.mustNotCall());
35-
req.on('end', common.mustCall(() => server.close()));
35+
req.on('close', common.mustCall(() => server.close()));
3636
}));

test/parallel/test-http2-server-socket-destroy.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,4 @@ server.on('listening', common.mustCall(() => {
5252

5353
req.on('aborted', common.mustCall());
5454
req.resume();
55-
req.on('end', common.mustCall());
5655
}));
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { Duplex } = require('stream');
5+
const { strictEqual } = require('assert');
6+
7+
const duplex = new Duplex({
8+
write(chunk, enc, cb) {
9+
cb(new Error('kaboom'));
10+
},
11+
read() {
12+
this.push(null);
13+
}
14+
});
15+
16+
duplex.on('error', common.mustCall(function() {
17+
strictEqual(this._readableState.errorEmitted, true);
18+
strictEqual(this._writableState.errorEmitted, true);
19+
}));
20+
21+
duplex.on('end', common.mustNotCall());
22+
23+
duplex.end('hello');
24+
duplex.resume();

test/parallel/test-stream-readable-destroy.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,3 +189,18 @@ const { inherits } = require('util');
189189
read.push('hi');
190190
read.on('data', common.mustNotCall());
191191
}
192+
193+
{
194+
// double error case
195+
const read = new Readable({
196+
read() {}
197+
});
198+
199+
read.on('close', common.mustCall());
200+
read.on('error', common.mustCall());
201+
202+
read.destroy(new Error('kaboom 1'));
203+
read.destroy(new Error('kaboom 2'));
204+
assert.strictEqual(read._readableState.errorEmitted, true);
205+
assert.strictEqual(read.destroyed, true);
206+
}

0 commit comments

Comments
 (0)