Skip to content

Commit 311e12b

Browse files
committed
stream: fix multiple destroy calls
Previously destroy could be called multiple times causing inconsistent and hard to predict behavior. Furthermore, since the stream _destroy implementation can only be called once, the behavior of applying destroy multiple times becomes unclear. This changes so that only the first destroy() call is executed and any subsequent calls are noops. PR-URL: #29197 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Anna Henningsen <[email protected]>
1 parent 8b1efe0 commit 311e12b

19 files changed

+77
-73
lines changed

doc/api/stream.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,10 @@ This is a destructive and immediate way to destroy a stream. Previous calls to
385385
`write()` may not have drained, and may trigger an `ERR_STREAM_DESTROYED` error.
386386
Use `end()` instead of destroy if data should flush before close, or wait for
387387
the `'drain'` event before destroying the stream.
388+
389+
Once `destroy()` has been called any further calls will be a noop and no
390+
further errors except from `_destroy` may be emitted as `'error'`.
391+
388392
Implementors should not override this method,
389393
but instead implement [`writable._destroy()`][writable-_destroy].
390394

@@ -953,6 +957,10 @@ Destroy the stream. Optionally emit an `'error'` event, and emit a `'close'`
953957
event (unless `emitClose` is set to `false`). After this call, the readable
954958
stream will release any internal resources and subsequent calls to `push()`
955959
will be ignored.
960+
961+
Once `destroy()` has been called any further calls will be a noop and no
962+
further errors except from `_destroy` may be emitted as `'error'`.
963+
956964
Implementors should not override this method, but instead implement
957965
[`readable._destroy()`][readable-_destroy].
958966

@@ -1484,6 +1492,9 @@ Implementors should not override this method, but instead implement
14841492
The default implementation of `_destroy()` for `Transform` also emit `'close'`
14851493
unless `emitClose` is set in false.
14861494

1495+
Once `destroy()` has been called any further calls will be a noop and no
1496+
further errors except from `_destroy` may be emitted as `'error'`.
1497+
14871498
### `stream.finished(stream[, options], callback)`
14881499
<!-- YAML
14891500
added: v10.0.0

lib/_tls_wrap.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1628,7 +1628,7 @@ exports.connect = function connect(...args) {
16281628
tlssock._start();
16291629

16301630
tlssock.on('secure', onConnectSecure);
1631-
tlssock.once('end', onConnectEnd);
1631+
tlssock.prependListener('end', onConnectEnd);
16321632

16331633
return tlssock;
16341634
};

lib/internal/fs/streams.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ WriteStream.prototype._writev = function(data, cb) {
458458

459459
if (er) {
460460
if (this.autoClose) {
461-
this.destroy();
461+
this.destroy(er);
462462
}
463463
return cb(er);
464464
}

lib/internal/streams/destroy.js

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,15 @@ function destroy(err, cb) {
77
const r = this._readableState;
88
const w = this._writableState;
99

10+
if ((w && w.destroyed) || (r && r.destroyed)) {
11+
if (typeof cb === 'function') {
12+
// TODO(ronag): Invoke with `'close'`/`'error'`.
13+
cb();
14+
}
15+
16+
return this;
17+
}
18+
1019
if (err) {
1120
if (w) {
1221
w.errored = true;
@@ -16,16 +25,6 @@ function destroy(err, cb) {
1625
}
1726
}
1827

19-
if ((w && w.destroyed) || (r && r.destroyed)) {
20-
if (cb) {
21-
cb(err);
22-
} else if (err) {
23-
process.nextTick(emitErrorNT, this, err);
24-
}
25-
26-
return this;
27-
}
28-
2928
// We set destroyed to true before firing error callbacks in order
3029
// to make it re-entrance safe in case destroy() is called within callbacks
3130

@@ -53,13 +52,11 @@ function destroy(err, cb) {
5352
r.closed = true;
5453
}
5554

56-
if (cb) {
57-
// Invoke callback before scheduling emitClose so that callback
58-
// can schedule before.
55+
if (typeof cb === 'function') {
5956
cb(err);
60-
// Don't emit 'error' if passed a callback.
61-
process.nextTick(emitCloseNT, this);
62-
} else if (err) {
57+
}
58+
59+
if (err) {
6360
process.nextTick(emitErrorCloseNT, this, err);
6461
} else {
6562
process.nextTick(emitCloseNT, this);
@@ -138,6 +135,10 @@ function errorOrDestroy(stream, err, sync) {
138135
const r = stream._readableState;
139136
const w = stream._writableState;
140137

138+
if ((w && w.destroyed) || (r && r.destroyed)) {
139+
return this;
140+
}
141+
141142
if ((r && r.autoDestroy) || (w && w.autoDestroy))
142143
stream.destroy(err);
143144
else if (err) {

test/parallel/test-file-write-stream.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ file
6363

6464
callbacks.close++;
6565
console.error('write after end should not be allowed');
66-
file.write('should not work anymore');
67-
file.on('error', common.expectsError({
66+
file.write('should not work anymore', common.expectsError({
6867
code: 'ERR_STREAM_WRITE_AFTER_END',
6968
name: 'Error',
7069
message: 'write after end'

test/parallel/test-file-write-stream2.js

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ const filepath = path.join(tmpdir.path, 'write.txt');
3333

3434
const EXPECTED = '012345678910';
3535

36-
const cb_expected = 'write open drain write drain close error ';
36+
const cb_expected = 'write open drain write drain close ';
3737
let cb_occurred = '';
3838

3939
let countDrains = 0;
@@ -92,16 +92,11 @@ file.on('drain', function() {
9292
file.on('close', function() {
9393
cb_occurred += 'close ';
9494
assert.strictEqual(file.bytesWritten, EXPECTED.length * 2);
95-
file.write('should not work anymore');
95+
file.write('should not work anymore', (err) => {
96+
assert.ok(err.message.includes('write after end'));
97+
});
9698
});
9799

98-
99-
file.on('error', function(err) {
100-
cb_occurred += 'error ';
101-
assert.ok(err.message.includes('write after end'));
102-
});
103-
104-
105100
for (let i = 0; i < 11; i++) {
106101
const ret = file.write(String(i));
107102
console.error(`${i} ${ret}`);

test/parallel/test-http2-server-stream-session-destroy.js

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,9 @@ server.on('stream', common.mustCall((stream) => {
3434
name: 'Error'
3535
}
3636
);
37-
stream.on('error', common.expectsError({
38-
name: 'Error',
39-
code: 'ERR_STREAM_WRITE_AFTER_END',
40-
message: 'write after end'
41-
}));
37+
// When session is detroyed all streams are destroyed and no further
38+
// error should be emitted.
39+
stream.on('error', common.mustNotCall());
4240
assert.strictEqual(stream.write('data', common.expectsError({
4341
name: 'Error',
4442
code: 'ERR_STREAM_WRITE_AFTER_END',

test/parallel/test-net-socket-destroy-send.js

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,7 @@ server.listen(0, common.mustCall(function() {
1212
conn.on('connect', common.mustCall(function() {
1313
// Test destroy returns this, even on multiple calls when it short-circuits.
1414
assert.strictEqual(conn, conn.destroy().destroy());
15-
conn.on('error', common.expectsError({
16-
code: 'ERR_STREAM_DESTROYED',
17-
message: 'Cannot call write after a stream was destroyed',
18-
name: 'Error'
19-
}));
15+
conn.on('error', common.mustNotCall());
2016

2117
conn.write(Buffer.from('kaboom'), common.expectsError({
2218
code: 'ERR_STREAM_DESTROYED',

test/parallel/test-stream-catch-rejections.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ const assert = require('assert');
88
const r = new stream.Readable({
99
captureRejections: true,
1010
read() {
11-
this.push('hello');
12-
this.push('world');
13-
this.push(null);
1411
}
1512
});
13+
r.push('hello');
14+
r.push('world');
1615

1716
const err = new Error('kaboom');
1817

test/parallel/test-stream-pipeline.js

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -505,9 +505,7 @@ const { promisify } = require('util');
505505
res,
506506
stream,
507507
common.mustCall((err) => {
508-
assert.ok(err);
509-
// TODO(ronag):
510-
// assert.strictEqual(err.message, 'oh no');
508+
assert.strictEqual(err.message, 'oh no');
511509
server.close();
512510
})
513511
);

test/parallel/test-stream-readable-destroy.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,12 @@ const assert = require('assert');
183183

184184
let ticked = false;
185185
read.on('close', common.mustCall(() => {
186-
assert.strictEqual(read._readableState.errorEmitted, false);
186+
assert.strictEqual(read._readableState.errorEmitted, true);
187187
assert.strictEqual(ticked, true);
188188
}));
189-
// 'error' should not be emitted since a callback is passed to
190-
// destroy(err, callback);
191-
read.on('error', common.mustNotCall());
189+
read.on('error', common.mustCall((err) => {
190+
assert.strictEqual(err, expected);
191+
}));
192192

193193
assert.strictEqual(read._readableState.errored, false);
194194
assert.strictEqual(read._readableState.errorEmitted, false);
@@ -217,7 +217,7 @@ const assert = require('assert');
217217
}));
218218
readable.on('error', common.mustCall((err) => {
219219
assert.strictEqual(ticked, true);
220-
assert.strictEqual(err.message, 'kaboom 2');
220+
assert.strictEqual(err.message, 'kaboom 1');
221221
assert.strictEqual(readable._readableState.errorEmitted, true);
222222
}));
223223

@@ -230,7 +230,7 @@ const assert = require('assert');
230230
// the `_destroy()` callback is called.
231231
readable.destroy(new Error('kaboom 2'));
232232
assert.strictEqual(readable._readableState.errorEmitted, false);
233-
assert.strictEqual(readable._readableState.errored, true);
233+
assert.strictEqual(readable._readableState.errored, false);
234234

235235
ticked = true;
236236
}

test/parallel/test-stream-writable-destroy.js

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,14 @@ const assert = require('assert');
187187

188188
let ticked = false;
189189
writable.on('close', common.mustCall(() => {
190+
writable.on('error', common.mustNotCall());
191+
writable.destroy(new Error('hello'));
190192
assert.strictEqual(ticked, true);
191193
assert.strictEqual(writable._writableState.errorEmitted, true);
192194
}));
193195
writable.on('error', common.mustCall((err) => {
194196
assert.strictEqual(ticked, true);
195-
assert.strictEqual(err.message, 'kaboom 2');
197+
assert.strictEqual(err.message, 'kaboom 1');
196198
assert.strictEqual(writable._writableState.errorEmitted, true);
197199
}));
198200

@@ -205,7 +207,7 @@ const assert = require('assert');
205207
// the `_destroy()` callback is called.
206208
writable.destroy(new Error('kaboom 2'));
207209
assert.strictEqual(writable._writableState.errorEmitted, false);
208-
assert.strictEqual(writable._writableState.errored, true);
210+
assert.strictEqual(writable._writableState.errored, false);
209211

210212
ticked = true;
211213
}
@@ -246,8 +248,8 @@ const assert = require('assert');
246248

247249
const expected = new Error('kaboom');
248250

249-
write.destroy(expected, common.mustCall(function(err) {
250-
assert.strictEqual(err, expected);
251+
write.destroy(expected, common.mustCall((err) => {
252+
assert.strictEqual(err, undefined);
251253
}));
252254
}
253255

@@ -271,11 +273,7 @@ const assert = require('assert');
271273
const write = new Writable();
272274

273275
write.destroy();
274-
write.on('error', common.expectsError({
275-
name: 'Error',
276-
code: 'ERR_STREAM_DESTROYED',
277-
message: 'Cannot call write after a stream was destroyed'
278-
}));
276+
write.on('error', common.mustNotCall());
279277
write.write('asd', common.expectsError({
280278
name: 'Error',
281279
code: 'ERR_STREAM_DESTROYED',
@@ -288,11 +286,7 @@ const assert = require('assert');
288286
write(chunk, enc, cb) { cb(); }
289287
});
290288

291-
write.on('error', common.expectsError({
292-
name: 'Error',
293-
code: 'ERR_STREAM_DESTROYED',
294-
message: 'Cannot call write after a stream was destroyed'
295-
}));
289+
write.on('error', common.mustNotCall());
296290

297291
write.cork();
298292
write.write('asd', common.mustCall());

test/parallel/test-stream-writable-writable.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ const { Writable } = require('stream');
2323
w.write('asd');
2424
assert.strictEqual(w.writable, false);
2525
w.on('error', common.mustCall());
26-
w.destroy();
2726
}
2827

2928
{

test/parallel/test-stream-writable-write-error.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ function test(autoDestroy) {
4343
_write() {}
4444
});
4545
w.destroy();
46-
expectError(w, 'asd', 'ERR_STREAM_DESTROYED');
4746
}
4847

4948
{

test/parallel/test-tls-wrap-econnreset-localaddress.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const server = net.createServer((c) => {
1313
}).listen(common.mustCall(() => {
1414
const port = server.address().port;
1515

16+
let errored = false;
1617
tls.connect({
1718
port: port,
1819
localAddress: common.localhostIPv4
@@ -24,5 +25,9 @@ const server = net.createServer((c) => {
2425
assert.strictEqual(e.port, port);
2526
assert.strictEqual(e.localAddress, common.localhostIPv4);
2627
server.close();
28+
errored = true;
29+
}))
30+
.on('close', common.mustCall(() => {
31+
assert.strictEqual(errored, true);
2732
}));
2833
}));

test/parallel/test-tls-wrap-econnreset-pipe.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ if (process.argv[2] !== 'child') {
3131
const server = net.createServer((c) => {
3232
c.end();
3333
}).listen(common.PIPE, common.mustCall(() => {
34+
let errored = false;
3435
tls.connect({ path: common.PIPE })
3536
.once('error', common.mustCall((e) => {
3637
assert.strictEqual(e.code, 'ECONNRESET');
@@ -39,5 +40,9 @@ const server = net.createServer((c) => {
3940
assert.strictEqual(e.host, undefined);
4041
assert.strictEqual(e.localAddress, undefined);
4142
server.close();
43+
errored = true;
44+
}))
45+
.on('close', common.mustCall(() => {
46+
assert.strictEqual(errored, true);
4247
}));
4348
}));

test/parallel/test-tls-wrap-econnreset-socket.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,19 @@ const server = net.createServer((c) => {
1515

1616
const socket = new net.Socket();
1717

18+
let errored = false;
1819
tls.connect({ socket })
1920
.once('error', common.mustCall((e) => {
2021
assert.strictEqual(e.code, 'ECONNRESET');
2122
assert.strictEqual(e.path, undefined);
2223
assert.strictEqual(e.host, undefined);
2324
assert.strictEqual(e.port, undefined);
2425
assert.strictEqual(e.localAddress, undefined);
26+
errored = true;
2527
server.close();
28+
}))
29+
.on('close', common.mustCall(() => {
30+
assert.strictEqual(errored, true);
2631
}));
2732

2833
socket.connect(port);

test/parallel/test-tls-wrap-econnreset.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const server = net.createServer((c) => {
1313
}).listen(common.mustCall(() => {
1414
const port = server.address().port;
1515

16+
let errored = false;
1617
tls.connect(port, common.localhostIPv4)
1718
.once('error', common.mustCall((e) => {
1819
assert.strictEqual(e.code, 'ECONNRESET');
@@ -21,5 +22,9 @@ const server = net.createServer((c) => {
2122
assert.strictEqual(e.port, port);
2223
assert.strictEqual(e.localAddress, undefined);
2324
server.close();
25+
errored = true;
26+
}))
27+
.on('close', common.mustCall(() => {
28+
assert.strictEqual(errored, true);
2429
}));
2530
}));

0 commit comments

Comments
 (0)