From 0dc4f9422cff897a62e2eceda2bf192e4d7babd6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 19 Aug 2019 17:46:10 +0200 Subject: [PATCH 1/3] stream: don't emit finish after destroy() When destroying the stream will be potentially incomplete and therefore it doesn't make sense to emit finish. --- lib/_stream_writable.js | 3 ++- test/parallel/test-stream-writable-destroy.js | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index e212881c4ac555..defcd0823b82e8 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -620,7 +620,8 @@ function needFinish(state) { state.length === 0 && state.bufferedRequest === null && !state.finished && - !state.writing); + !state.writing && + !state.destroyed); } function callFinal(stream, state) { stream._final((err) => { diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index a431d6d48d1c8e..191fa5dd8a38cd 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -111,7 +111,7 @@ const assert = require('assert'); write.destroy(); write.removeListener('finish', fail); - write.on('finish', common.mustCall()); + write.on('finish', common.mustNotCall()); assert.strictEqual(write.destroyed, true); } From 4df3c3ee6149d0bd05340cae749b0d2fe82ea604 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 19 Aug 2019 17:48:45 +0200 Subject: [PATCH 2/3] stream: consistent clearBuffer --- lib/_stream_writable.js | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index defcd0823b82e8..41f40c0e2efa84 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -316,7 +316,8 @@ Writable.prototype.uncork = function() { if (state.corked) { state.corked--; - if (!state.writing && + if (!state.destroyed && + !state.writing && !state.corked && !state.bufferProcessing && state.bufferedRequest) @@ -468,16 +469,16 @@ function onwrite(stream, er) { if (er) onwriteError(stream, state, sync, er, cb); else { - // Check if we're actually ready to finish, but don't emit yet - var finished = needFinish(state) || stream.destroyed; - if (!finished && + if (!state.destroyed && + !state.writing && !state.corked && !state.bufferProcessing && - state.bufferedRequest) { + state.bufferedRequest) clearBuffer(stream, state); - } + // Check if we're actually ready to finish, but don't emit yet + var finished = needFinish(state); if (sync) { process.nextTick(afterWrite, stream, state, cb); } else { From 8add9705878ccf7134b4a07194a76ccd5bfcde96 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 19 Aug 2019 17:49:23 +0200 Subject: [PATCH 3/3] stream: move condition into clearBuffer --- lib/_stream_writable.js | 25 +++++++----------- lib/internal/fs/streams.js | 26 +++++-------------- test/parallel/test-file-write-stream.js | 21 +++++++-------- .../test-fs-write-stream-autoclose-option.js | 5 ++-- test/parallel/test-http2-compat-socket-set.js | 3 ++- test/parallel/test-stream-duplex-destroy.js | 2 +- .../parallel/test-stream-transform-destroy.js | 2 +- test/parallel/test-stream-write-destroy.js | 6 ++--- 8 files changed, 34 insertions(+), 56 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 41f40c0e2efa84..34a21ed914911d 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -315,13 +315,7 @@ Writable.prototype.uncork = function() { if (state.corked) { state.corked--; - - if (!state.destroyed && - !state.writing && - !state.corked && - !state.bufferProcessing && - state.bufferedRequest) - clearBuffer(this, state); + clearBuffer(this, state); } }; @@ -469,16 +463,8 @@ function onwrite(stream, er) { if (er) onwriteError(stream, state, sync, er, cb); else { + clearBuffer(stream, state); - if (!state.destroyed && - !state.writing && - !state.corked && - !state.bufferProcessing && - state.bufferedRequest) - clearBuffer(stream, state); - - // Check if we're actually ready to finish, but don't emit yet - var finished = needFinish(state); if (sync) { process.nextTick(afterWrite, stream, state, cb); } else { @@ -501,6 +487,13 @@ function afterWrite(stream, state, cb) { // If there's something in the buffer waiting, then process it function clearBuffer(stream, state) { + if (state.destroyed || + state.writing || + state.corked || + state.bufferProcessing || + !state.bufferedRequest) + return; + state.bufferProcessing = true; var entry = state.bufferedRequest; diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 87d58a8be55f80..525eb5d2f2c37e 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -62,6 +62,8 @@ function ReadStream(path, options) { // For backwards compat do not emit close on destroy. options.emitClose = false; + options.autoDestroy = options.autoClose === undefined ? + true : options.autoClose; Readable.call(this, options); @@ -73,7 +75,7 @@ function ReadStream(path, options) { this.start = options.start; this.end = options.end; - this.autoClose = options.autoClose === undefined ? true : options.autoClose; + this.autoClose = options.autoDestroy; this.pos = undefined; this.bytesRead = 0; this.closed = false; @@ -100,12 +102,6 @@ function ReadStream(path, options) { if (typeof this.fd !== 'number') this.open(); - - this.on('end', function() { - if (this.autoClose) { - this.destroy(); - } - }); } Object.setPrototypeOf(ReadStream.prototype, Readable.prototype); Object.setPrototypeOf(ReadStream, Readable); @@ -238,6 +234,8 @@ function WriteStream(path, options) { // For backwards compat do not emit close on destroy. options.emitClose = false; + options.autoDestroy = options.autoClose === undefined ? + true : options.autoClose; Writable.call(this, options); @@ -248,7 +246,7 @@ function WriteStream(path, options) { this.mode = options.mode === undefined ? 0o666 : options.mode; this.start = options.start; - this.autoClose = options.autoClose === undefined ? true : !!options.autoClose; + this.autoClose = options.autoDestroy; this.pos = undefined; this.bytesWritten = 0; this.closed = false; @@ -268,14 +266,6 @@ function WriteStream(path, options) { Object.setPrototypeOf(WriteStream.prototype, Writable.prototype); Object.setPrototypeOf(WriteStream, Writable); -WriteStream.prototype._final = function(callback) { - if (this.autoClose) { - this.destroy(); - } - - callback(); -}; - WriteStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (er, fd) => { if (er) { @@ -307,9 +297,6 @@ WriteStream.prototype._write = function(data, encoding, cb) { fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { if (er) { - if (this.autoClose) { - this.destroy(); - } return cb(er); } this.bytesWritten += bytes; @@ -342,7 +329,6 @@ WriteStream.prototype._writev = function(data, cb) { fs.writev(this.fd, chunks, this.pos, function(er, bytes) { if (er) { - self.destroy(); return cb(er); } self.bytesWritten += bytes; diff --git a/test/parallel/test-file-write-stream.js b/test/parallel/test-file-write-stream.js index d66657d690f1b2..3fb78a73a3af4b 100644 --- a/test/parallel/test-file-write-stream.js +++ b/test/parallel/test-file-write-stream.js @@ -65,17 +65,16 @@ file assert.strictEqual(file.bytesWritten, EXPECTED.length * 2); callbacks.close++; - common.expectsError( - () => { - console.error('write after end should not be allowed'); - file.write('should not work anymore'); - }, - { - code: 'ERR_STREAM_WRITE_AFTER_END', - type: Error, - message: 'write after end' - } - ); + file.on('error', common.expectsError({ + code: 'ERR_STREAM_WRITE_AFTER_END', + type: Error, + message: 'write after end' + })); + file.write('should not work anymore', common.expectsError({ + code: 'ERR_STREAM_WRITE_AFTER_END', + type: Error, + message: 'write after end' + })); fs.unlinkSync(fn); }); diff --git a/test/parallel/test-fs-write-stream-autoclose-option.js b/test/parallel/test-fs-write-stream-autoclose-option.js index e39f4d615ab7e0..81b1c344b56a7b 100644 --- a/test/parallel/test-fs-write-stream-autoclose-option.js +++ b/test/parallel/test-fs-write-stream-autoclose-option.js @@ -25,10 +25,11 @@ function next() { stream = fs.createWriteStream(null, { fd: stream.fd, start: 0 }); stream.write('Test2'); stream.end(); + stream.on('finish', common.mustCall(function() { assert.strictEqual(stream.closed, false); - assert.strictEqual(stream.fd, null); stream.on('close', common.mustCall(function() { + assert.strictEqual(stream.fd, null); assert.strictEqual(stream.closed, true); process.nextTick(next2); })); @@ -51,8 +52,8 @@ function next3() { stream.end(); stream.on('finish', common.mustCall(function() { assert.strictEqual(stream.closed, false); - assert.strictEqual(stream.fd, null); stream.on('close', common.mustCall(function() { + assert.strictEqual(stream.fd, null); assert.strictEqual(stream.closed, true); })); })); diff --git a/test/parallel/test-http2-compat-socket-set.js b/test/parallel/test-http2-compat-socket-set.js index 05beb09d548e91..65bbfb0e60c9c5 100644 --- a/test/parallel/test-http2-compat-socket-set.js +++ b/test/parallel/test-http2-compat-socket-set.js @@ -74,6 +74,7 @@ server.on('request', common.mustCall(function(request, response) { common.expectsError(() => request.socket.resume = noop, errMsg); request.stream.on('finish', common.mustCall(() => { + response.stream.destroy(); setImmediate(() => { request.socket.setTimeout = noop; assert.strictEqual(request.stream.setTimeout, noop); @@ -83,7 +84,7 @@ server.on('request', common.mustCall(function(request, response) { assert.strictEqual(request.stream._isProcessing, true); }); })); - response.stream.destroy(); + response.stream.end(); })); server.listen(0, common.mustCall(function() { diff --git a/test/parallel/test-stream-duplex-destroy.js b/test/parallel/test-stream-duplex-destroy.js index 3c38d2c364051c..906d9c63068460 100644 --- a/test/parallel/test-stream-duplex-destroy.js +++ b/test/parallel/test-stream-duplex-destroy.js @@ -125,7 +125,7 @@ const assert = require('assert'); duplex.removeListener('end', fail); duplex.removeListener('finish', fail); duplex.on('end', common.mustCall()); - duplex.on('finish', common.mustCall()); + duplex.on('finish', common.mustNotCall()); assert.strictEqual(duplex.destroyed, true); } diff --git a/test/parallel/test-stream-transform-destroy.js b/test/parallel/test-stream-transform-destroy.js index c594d9989ae4de..8750c65164e5e0 100644 --- a/test/parallel/test-stream-transform-destroy.js +++ b/test/parallel/test-stream-transform-destroy.js @@ -117,7 +117,7 @@ const assert = require('assert'); transform.removeListener('end', fail); transform.removeListener('finish', fail); transform.on('end', common.mustCall()); - transform.on('finish', common.mustCall()); + transform.on('finish', common.mustNotCall()); } { diff --git a/test/parallel/test-stream-write-destroy.js b/test/parallel/test-stream-write-destroy.js index 83b329a6a8a7b3..58096c5635c9cf 100644 --- a/test/parallel/test-stream-write-destroy.js +++ b/test/parallel/test-stream-write-destroy.js @@ -48,12 +48,10 @@ for (const withPendingData of [ false, true ]) { w.destroy(); assert.strictEqual(chunksWritten, 1); callbacks.shift()(); - assert.strictEqual(chunksWritten, 2); + assert.strictEqual(chunksWritten, useEnd && !withPendingData ? 1 : 2); assert.strictEqual(callbacks.length, 0); assert.strictEqual(drains, 1); - // When we used `.end()`, we see the 'finished' event if and only if - // we actually finished processing the write queue. - assert.strictEqual(finished, !withPendingData && useEnd); + assert.strictEqual(finished, false); } }