From b75a54fa6836ee0f527464c83bcc27c6ef7db7b7 Mon Sep 17 00:00:00 2001 From: th0r Date: Mon, 8 Jul 2019 14:22:22 +0300 Subject: [PATCH 1/4] Don't rely on presence of global `process` object --- build/files.js | 31 +++++++++++++++++++ lib/_stream_duplex.js | 5 ++- lib/_stream_readable.js | 24 ++++++++------ lib/_stream_writable.js | 16 +++++----- lib/internal/streams/async_iterator.js | 6 ++-- lib/internal/streams/buffer_list.js | 2 +- lib/internal/streams/destroy.js | 13 +++++--- next-tick-browser.js | 8 +++++ next-tick.js | 1 + package.json | 1 + test/common/dns.js | 4 +-- test/common/index.js | 4 +-- ...t-stream-pipe-await-drain-manual-resume.js | 4 +-- .../test-stream-pipe-multiple-pipes.js | 8 ++--- .../test-stream-readable-async-iterators.js | 2 +- test/parallel/test-stream-write-destroy.js | 7 ++--- test/parallel/test-streams-highwatermark.js | 6 ++-- 17 files changed, 97 insertions(+), 45 deletions(-) create mode 100644 next-tick-browser.js create mode 100644 next-tick.js diff --git a/build/files.js b/build/files.js index 7276dfc25a..d4dc87064e 100644 --- a/build/files.js +++ b/build/files.js @@ -212,6 +212,25 @@ function CorkedRequest(state) { /const \{ once \} = require\('internal\/util'\);/ , 'function once(callback) { let called = false; return function(...args) { if (called) return; called = true; callback(...args); }; }' ] + , nextTickUsage = [ + /process\.nextTick\((.+?)\)/g + , 'nextTick($1)' + ] + , nextTickDeclaration = depth => [ + /^('use strict';)$/m + , `$1\n\nvar nextTick = require('${'../'.repeat(depth)}next-tick.js')` + ] + , isStdStreamCheck = [ + /dest !== process\.(stdout|stderr)/g + , `!isStdStream(dest, '$1')` + ] + , isStdStreamDeclaration = [ + /$/ + , ` +function isStdStream(stream, type) { + return typeof process === 'undefined' ? false : stream === process[type]; +}` + ] module.exports['_stream_duplex.js'] = [ requireReplacement @@ -222,6 +241,8 @@ module.exports['_stream_duplex.js'] = [ , objectKeysReplacement , objectKeysDefine , errorsOneLevel + , nextTickDeclaration(1) + , nextTickUsage ] module.exports['_stream_passthrough.js'] = [ @@ -257,6 +278,10 @@ module.exports['_stream_readable.js'] = [ , numberIE11 , noAsyncIterators1 , noAsyncIterators2 + , nextTickDeclaration(1) + , nextTickUsage + , isStdStreamCheck + , isStdStreamDeclaration ] module.exports['_stream_transform.js'] = [ @@ -295,6 +320,8 @@ module.exports['_stream_writable.js'] = [ , useCorkedRequest , addConstructors , errorsOneLevel + , nextTickDeclaration(1) + , nextTickUsage ] module.exports['internal/streams/buffer_list.js'] = [ @@ -312,6 +339,8 @@ const custom = inspect && inspect.custom || 'inspect' ] module.exports['internal/streams/destroy.js'] = [ errorsTwoLevel + , nextTickDeclaration(3) + , nextTickUsage ] module.exports['internal/streams/state.js'] = [ @@ -333,6 +362,8 @@ module.exports['internal/streams/async_iterator.js'] = [ / return\(\)/, '[Symbol.asyncIterator]() { return this },\n return\(\)' ] + , nextTickDeclaration(3) + , nextTickUsage ] module.exports['internal/streams/end-of-stream.js'] = [ diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js index 6752519225..6fdef12bbb 100644 --- a/lib/_stream_duplex.js +++ b/lib/_stream_duplex.js @@ -23,8 +23,11 @@ // prototypally inherits from Readable, and then parasitically from // Writable. 'use strict'; + +var nextTick = require('../next-tick.js'); /**/ + var objectKeys = Object.keys || function (obj) { var keys = []; @@ -105,7 +108,7 @@ function onend() { if (this._writableState.ended) return; // no more data can be written. // But allow more writes to happen in this tick. - process.nextTick(onEndNT, this); + nextTick(onEndNT, this); } function onEndNT(self) { diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 33f478d7e8..c074626ff5 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -20,6 +20,8 @@ // USE OR OTHER DEALINGS IN THE SOFTWARE. 'use strict'; +var nextTick = require('../next-tick.js'); + module.exports = Readable; /**/ @@ -508,7 +510,7 @@ function emitReadable(stream) { if (!state.emittedReadable) { debug('emitReadable', state.flowing); state.emittedReadable = true; - process.nextTick(emitReadable_, stream); + nextTick(emitReadable_, stream); } } @@ -539,7 +541,7 @@ function emitReadable_(stream) { function maybeReadMore(stream, state) { if (!state.readingMore) { state.readingMore = true; - process.nextTick(maybeReadMore_, stream, state); + nextTick(maybeReadMore_, stream, state); } } @@ -606,9 +608,9 @@ Readable.prototype.pipe = function (dest, pipeOpts) { state.pipesCount += 1; debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts); - var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr; + var doEnd = (!pipeOpts || pipeOpts.end !== false) && !isStdStream(dest, 'stdout') && !isStdStream(dest, 'stderr'); var endFn = doEnd ? onend : unpipe; - if (state.endEmitted) process.nextTick(endFn);else src.once('end', endFn); + if (state.endEmitted) nextTick(endFn);else src.once('end', endFn); dest.on('unpipe', onunpipe); function onunpipe(readable, unpipeInfo) { @@ -802,7 +804,7 @@ Readable.prototype.on = function (ev, fn) { if (state.length) { emitReadable(this); } else if (!state.reading) { - process.nextTick(nReadingNextTick, this); + nextTick(nReadingNextTick, this); } } } @@ -822,7 +824,7 @@ Readable.prototype.removeListener = function (ev, fn) { // support once('readable', fn) cycles. This means that calling // resume within the same tick will have no // effect. - process.nextTick(updateReadableListening, this); + nextTick(updateReadableListening, this); } return res; @@ -838,7 +840,7 @@ Readable.prototype.removeAllListeners = function (ev) { // support once('readable', fn) cycles. This means that calling // resume within the same tick will have no // effect. - process.nextTick(updateReadableListening, this); + nextTick(updateReadableListening, this); } return res; @@ -883,7 +885,7 @@ Readable.prototype.resume = function () { function resume(stream, state) { if (!state.resumeScheduled) { state.resumeScheduled = true; - process.nextTick(resume_, stream, state); + nextTick(resume_, stream, state); } } @@ -1064,7 +1066,7 @@ function endReadable(stream) { if (!state.endEmitted) { state.ended = true; - process.nextTick(endReadableNT, state, stream); + nextTick(endReadableNT, state, stream); } } @@ -1084,4 +1086,8 @@ function indexOf(xs, x) { } return -1; +} + +function isStdStream(stream, type) { + return typeof process === 'undefined' ? false : stream === process[type]; } \ No newline at end of file diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index b35447aedc..92247fe6c8 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -23,6 +23,8 @@ // the drain event emission and buffering. 'use strict'; +var nextTick = require('../next-tick.js'); + module.exports = Writable; /* */ @@ -257,7 +259,7 @@ function writeAfterEnd(stream, cb) { var er = new ERR_STREAM_WRITE_AFTER_END(); // TODO: defer error events consistently everywhere, not just the cb stream.emit('error', er); - process.nextTick(cb, er); + nextTick(cb, er); } // Checks that a user-supplied chunk is valid, especially for the particular // mode the stream is in. Currently this means that `null` is never accepted // and undefined/non-string values are only allowed in object mode. @@ -274,7 +276,7 @@ function validChunk(stream, state, chunk, cb) { if (er) { stream.emit('error', er); - process.nextTick(cb, er); + nextTick(cb, er); return false; } @@ -412,10 +414,10 @@ function onwriteError(stream, state, sync, er, cb) { if (sync) { // defer the callback if we are being called synchronously // to avoid piling up things on the stack - process.nextTick(cb, er); // this can emit finish, and it will always happen + nextTick(cb, er); // this can emit finish, and it will always happen // after error - process.nextTick(finishMaybe, stream, state); + nextTick(finishMaybe, stream, state); stream._writableState.errorEmitted = true; stream.emit('error', er); } else { @@ -452,7 +454,7 @@ function onwrite(stream, er) { } if (sync) { - process.nextTick(afterWrite, stream, state, finished, cb); + nextTick(afterWrite, stream, state, finished, cb); } else { afterWrite(stream, state, finished, cb); } @@ -601,7 +603,7 @@ function prefinish(stream, state) { if (typeof stream._final === 'function' && !state.destroyed) { state.pendingcb++; state.finalCalled = true; - process.nextTick(callFinal, stream, state); + nextTick(callFinal, stream, state); } else { state.prefinished = true; stream.emit('prefinish'); @@ -629,7 +631,7 @@ function endWritable(stream, state, cb) { finishMaybe(stream, state); if (cb) { - if (state.finished) process.nextTick(cb);else stream.once('finish', cb); + if (state.finished) nextTick(cb);else stream.once('finish', cb); } state.ended = true; diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 9fb615a2f3..1ec7612b3a 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -4,6 +4,8 @@ var _Object$setPrototypeO; function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } +var nextTick = require('../../../next-tick.js'); + var finished = require('./end-of-stream'); var kLastResolve = Symbol('lastResolve'); @@ -41,7 +43,7 @@ function readAndResolve(iter) { function onReadable(iter) { // we wait for the next tick, because it might // emit an error with process.nextTick - process.nextTick(readAndResolve, iter); + nextTick(readAndResolve, iter); } function wrapForNext(lastPromise, iter) { @@ -84,7 +86,7 @@ var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPro // we cannot guarantee that there is no error lingering around // waiting to be emitted. return new Promise(function (resolve, reject) { - process.nextTick(function () { + nextTick(function () { if (_this[kError]) { reject(_this[kError]); } else { diff --git a/lib/internal/streams/buffer_list.js b/lib/internal/streams/buffer_list.js index ebaf5c72aa..142cf26f31 100644 --- a/lib/internal/streams/buffer_list.js +++ b/lib/internal/streams/buffer_list.js @@ -1,6 +1,6 @@ 'use strict'; -function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; var ownKeys = Object.keys(source); if (typeof Object.getOwnPropertySymbols === 'function') { ownKeys = ownKeys.concat(Object.getOwnPropertySymbols(source).filter(function (sym) { return Object.getOwnPropertyDescriptor(source, sym).enumerable; })); } ownKeys.forEach(function (key) { _defineProperty(target, key, source[key]); }); } return target; } +function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { if (i % 2) { var source = arguments[i] != null ? arguments[i] : {}; var ownKeys = Object.keys(source); if (typeof Object.getOwnPropertySymbols === 'function') { ownKeys = ownKeys.concat(Object.getOwnPropertySymbols(source).filter(function (sym) { return Object.getOwnPropertyDescriptor(source, sym).enumerable; })); } ownKeys.forEach(function (key) { _defineProperty(target, key, source[key]); }); } else { Object.defineProperties(target, Object.getOwnPropertyDescriptors(arguments[i])); } } return target; } function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 63ae49928d..67bcbf8678 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -1,4 +1,7 @@ -'use strict'; // undocumented cb() API, needed for core, not for public API +'use strict'; + +var nextTick = require('../../../next-tick.js'); // undocumented cb() API, needed for core, not for public API + function destroy(err, cb) { var _this = this; @@ -10,7 +13,7 @@ function destroy(err, cb) { if (cb) { cb(err); } else if (err && (!this._writableState || !this._writableState.errorEmitted)) { - process.nextTick(emitErrorNT, this, err); + nextTick(emitErrorNT, this, err); } return this; @@ -29,16 +32,16 @@ function destroy(err, cb) { this._destroy(err || null, function (err) { if (!cb && err) { - process.nextTick(emitErrorAndCloseNT, _this, err); + nextTick(emitErrorAndCloseNT, _this, err); if (_this._writableState) { _this._writableState.errorEmitted = true; } } else if (cb) { - process.nextTick(emitCloseNT, _this); + nextTick(emitCloseNT, _this); cb(err); } else { - process.nextTick(emitCloseNT, _this); + nextTick(emitCloseNT, _this); } }); diff --git a/next-tick-browser.js b/next-tick-browser.js new file mode 100644 index 0000000000..7a0fbf72a7 --- /dev/null +++ b/next-tick-browser.js @@ -0,0 +1,8 @@ +module.exports = nextTick; + +function nextTick(fn) { + var args = Array.prototype.slice.call(arguments, 1); + setTimeout(function () { + fn.apply(null, args); + }, 0); +} diff --git a/next-tick.js b/next-tick.js new file mode 100644 index 0000000000..51f167c982 --- /dev/null +++ b/next-tick.js @@ -0,0 +1 @@ +module.exports = process.nextTick; diff --git a/package.json b/package.json index 6c6bd3b901..7042131f34 100644 --- a/package.json +++ b/package.json @@ -55,6 +55,7 @@ "worker_threads": false, "./errors": "./errors-browser.js", "./readable.js": "./readable-browser.js", + "./next-tick.js": "./next-tick-browser.js", "./lib/internal/streams/stream.js": "./lib/internal/streams/stream-browser.js" }, "nyc": { diff --git a/test/common/dns.js b/test/common/dns.js index 983cd5bb86..d99704a813 100644 --- a/test/common/dns.js +++ b/test/common/dns.js @@ -98,8 +98,8 @@ function parseDNSPacket(buffer) { var counts = [['questions', buffer.readUInt16BE(4)], ['answers', buffer.readUInt16BE(6)], ['authorityAnswers', buffer.readUInt16BE(8)], ['additionalRecords', buffer.readUInt16BE(10)]]; var offset = 12; - for (var _i = 0; _i < counts.length; _i++) { - var _counts$_i = _slicedToArray(counts[_i], 2), + for (var _i = 0, _counts = counts; _i < _counts.length; _i++) { + var _counts$_i = _slicedToArray(_counts[_i], 2), sectionName = _counts$_i[0], count = _counts$_i[1]; diff --git a/test/common/index.js b/test/common/index.js index f47899958c..d75835358c 100644 --- a/test/common/index.js +++ b/test/common/index.js @@ -701,8 +701,8 @@ function getArrayBufferViews(buf) { var out = []; var arrayBufferViews = [Int8Array, Uint8Array, Uint8ClampedArray, Int16Array, Uint16Array, Int32Array, Uint32Array, Float32Array, Float64Array, DataView]; - for (var _i = 0; _i < arrayBufferViews.length; _i++) { - var type = arrayBufferViews[_i]; + for (var _i = 0, _arrayBufferViews = arrayBufferViews; _i < _arrayBufferViews.length; _i++) { + var type = _arrayBufferViews[_i]; var _type$BYTES_PER_ELEME = type.BYTES_PER_ELEMENT, BYTES_PER_ELEMENT = _type$BYTES_PER_ELEME === void 0 ? 1 : _type$BYTES_PER_ELEME; diff --git a/test/parallel/test-stream-pipe-await-drain-manual-resume.js b/test/parallel/test-stream-pipe-await-drain-manual-resume.js index d36d4f3651..b27101722c 100644 --- a/test/parallel/test-stream-pipe-await-drain-manual-resume.js +++ b/test/parallel/test-stream-pipe-await-drain-manual-resume.js @@ -47,8 +47,8 @@ readable.once('pause', common.mustCall(function () { isCurrentlyBufferingWrites = false; - for (var _i = 0; _i < queue.length; _i++) { - var queued = queue[_i]; + for (var _i = 0, _queue = queue; _i < _queue.length; _i++) { + var queued = _queue[_i]; queued.cb(); } })); diff --git a/test/parallel/test-stream-pipe-multiple-pipes.js b/test/parallel/test-stream-pipe-multiple-pipes.js index 458b4c8b23..76fa3335ab 100644 --- a/test/parallel/test-stream-pipe-multiple-pipes.js +++ b/test/parallel/test-stream-pipe-multiple-pipes.js @@ -38,8 +38,8 @@ readable.push(input); // The pipe() calls will postpone emission of the 'resume' // so no data will be available to the writable streams until then. process.nextTick(common.mustCall(function () { - for (var _i = 0; _i < writables.length; _i++) { - var target = writables[_i]; + for (var _i = 0, _writables = writables; _i < _writables.length; _i++) { + var target = _writables[_i]; assert.deepStrictEqual(target.output, [input]); target.on('unpipe', common.mustCall()); readable.unpipe(target); @@ -51,8 +51,8 @@ process.nextTick(common.mustCall(function () { readable.resume(); // Make sure the 'end' event gets emitted. })); readable.on('end', common.mustCall(function () { - for (var _i2 = 0; _i2 < writables.length; _i2++) { - var target = writables[_i2]; + for (var _i2 = 0, _writables2 = writables; _i2 < _writables2.length; _i2++) { + var target = _writables2[_i2]; assert.deepStrictEqual(target.output, [input]); } })); diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 78ab350010..f0fe61ef68 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -4,7 +4,7 @@ function asyncGeneratorStep(gen, resolve, reject, _next, _throw, key, arg) { try function _asyncToGenerator(fn) { return function () { var self = this, args = arguments; return new Promise(function (resolve, reject) { var gen = fn.apply(self, args); function _next(value) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "next", value); } function _throw(err) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "throw", err); } _next(undefined); }); }; } -function _asyncIterator(iterable) { var method; if (typeof Symbol === "function") { if (Symbol.asyncIterator) { method = iterable[Symbol.asyncIterator]; if (method != null) return method.call(iterable); } if (Symbol.iterator) { method = iterable[Symbol.iterator]; if (method != null) return method.call(iterable); } } throw new TypeError("Object is not async iterable"); } +function _asyncIterator(iterable) { var method; if (typeof Symbol !== "undefined") { if (Symbol.asyncIterator) { method = iterable[Symbol.asyncIterator]; if (method != null) return method.call(iterable); } if (Symbol.iterator) { method = iterable[Symbol.iterator]; if (method != null) return method.call(iterable); } } throw new TypeError("Object is not async iterable"); } /**/ var bufferShim = require('safe-buffer').Buffer; diff --git a/test/parallel/test-stream-write-destroy.js b/test/parallel/test-stream-write-destroy.js index 00688cd70c..6528b2bacd 100644 --- a/test/parallel/test-stream-write-destroy.js +++ b/test/parallel/test-stream-write-destroy.js @@ -14,11 +14,8 @@ var _require = require('../../'), // writes. -var _arr = [false, true]; - -for (var _i = 0; _i < _arr.length; _i++) { +for (var _i = 0, _arr = [false, true]; _i < _arr.length; _i++) { var withPendingData = _arr[_i]; - var _arr2 = [false, true]; var _loop = function _loop() { var useEnd = _arr2[_i2]; @@ -80,7 +77,7 @@ for (var _i = 0; _i < _arr.length; _i++) { assert.strictEqual(finished, !withPendingData && useEnd); }; - for (var _i2 = 0; _i2 < _arr2.length; _i2++) { + for (var _i2 = 0, _arr2 = [false, true]; _i2 < _arr2.length; _i2++) { _loop(); } } diff --git a/test/parallel/test-streams-highwatermark.js b/test/parallel/test-streams-highwatermark.js index 66f245de08..b6231c5ba7 100644 --- a/test/parallel/test-streams-highwatermark.js +++ b/test/parallel/test-streams-highwatermark.js @@ -26,11 +26,9 @@ var stream = require('../../'); highWaterMark: ovfl }); assert.strictEqual(writable._writableState.highWaterMark, ovfl); - var _arr = [true, false, '5', {}, -5, NaN]; var _loop = function _loop() { var invalidHwm = _arr[_i]; - var _arr2 = [stream.Readable, stream.Writable]; var _loop2 = function _loop2() { var type = _arr2[_i2]; @@ -45,12 +43,12 @@ var stream = require('../../'); }); }; - for (var _i2 = 0; _i2 < _arr2.length; _i2++) { + for (var _i2 = 0, _arr2 = [stream.Readable, stream.Writable]; _i2 < _arr2.length; _i2++) { _loop2(); } }; - for (var _i = 0; _i < _arr.length; _i++) { + for (var _i = 0, _arr = [true, false, '5', {}, -5, NaN]; _i < _arr.length; _i++) { _loop(); } } From 9e9c9e90b29c13a8630f1fa2d6565bad4ce029cb Mon Sep 17 00:00:00 2001 From: th0r Date: Mon, 8 Jul 2019 14:51:46 +0300 Subject: [PATCH 2/4] Fix usage with fake timers --- next-tick.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/next-tick.js b/next-tick.js index 51f167c982..0065f8db96 100644 --- a/next-tick.js +++ b/next-tick.js @@ -1 +1,5 @@ -module.exports = process.nextTick; +module.exports = nextTick; + +function nextTick(...args) { + process.nextTick(...args); +} From 6df6d84b6d81b0849c2329ff2861bbf5c955e91b Mon Sep 17 00:00:00 2001 From: th0r Date: Mon, 8 Jul 2019 20:01:11 +0300 Subject: [PATCH 3/4] Make `fake-process` module instead of `next-tick` --- build/files.js | 16 +-- fake-process-browser.js | 155 +++++++++++++++++++++++++ fake-process.js | 1 + lib/_stream_duplex.js | 4 +- lib/_stream_readable.js | 18 +-- lib/_stream_writable.js | 16 +-- lib/internal/streams/async_iterator.js | 6 +- lib/internal/streams/destroy.js | 10 +- next-tick-browser.js | 8 -- next-tick.js | 5 - package.json | 2 +- 11 files changed, 192 insertions(+), 49 deletions(-) create mode 100644 fake-process-browser.js create mode 100644 fake-process.js delete mode 100644 next-tick-browser.js delete mode 100644 next-tick.js diff --git a/build/files.js b/build/files.js index d4dc87064e..fa837ba803 100644 --- a/build/files.js +++ b/build/files.js @@ -214,11 +214,11 @@ function CorkedRequest(state) { ] , nextTickUsage = [ /process\.nextTick\((.+?)\)/g - , 'nextTick($1)' + , 'fakeProcess.nextTick($1)' ] - , nextTickDeclaration = depth => [ + , fakeProcessDeclaration = depth => [ /^('use strict';)$/m - , `$1\n\nvar nextTick = require('${'../'.repeat(depth)}next-tick.js')` + , `$1\n\nvar fakeProcess = require('${'../'.repeat(depth)}fake-process.js');` ] , isStdStreamCheck = [ /dest !== process\.(stdout|stderr)/g @@ -241,7 +241,7 @@ module.exports['_stream_duplex.js'] = [ , objectKeysReplacement , objectKeysDefine , errorsOneLevel - , nextTickDeclaration(1) + , fakeProcessDeclaration(1) , nextTickUsage ] @@ -278,7 +278,7 @@ module.exports['_stream_readable.js'] = [ , numberIE11 , noAsyncIterators1 , noAsyncIterators2 - , nextTickDeclaration(1) + , fakeProcessDeclaration(1) , nextTickUsage , isStdStreamCheck , isStdStreamDeclaration @@ -320,7 +320,7 @@ module.exports['_stream_writable.js'] = [ , useCorkedRequest , addConstructors , errorsOneLevel - , nextTickDeclaration(1) + , fakeProcessDeclaration(1) , nextTickUsage ] @@ -339,7 +339,7 @@ const custom = inspect && inspect.custom || 'inspect' ] module.exports['internal/streams/destroy.js'] = [ errorsTwoLevel - , nextTickDeclaration(3) + , fakeProcessDeclaration(3) , nextTickUsage ] @@ -362,7 +362,7 @@ module.exports['internal/streams/async_iterator.js'] = [ / return\(\)/, '[Symbol.asyncIterator]() { return this },\n return\(\)' ] - , nextTickDeclaration(3) + , fakeProcessDeclaration(3) , nextTickUsage ] diff --git a/fake-process-browser.js b/fake-process-browser.js new file mode 100644 index 0000000000..ad9d4d4d83 --- /dev/null +++ b/fake-process-browser.js @@ -0,0 +1,155 @@ +module.exports = { + nextTick: nextTick +}; + +// cached from whatever global is present so that test runners that stub it +// don't break things. But we need to wrap it in a try catch in case it is +// wrapped in strict mode code which doesn't define any globals. It's inside a +// function because try/catches deoptimize in certain engines. + +var cachedSetTimeout; +var cachedClearTimeout; + +function defaultSetTimeout() { + throw new Error('setTimeout has not been defined'); +} + +function defaultClearTimeout () { + throw new Error('clearTimeout has not been defined'); +} + +(function () { + try { + if (typeof setTimeout === 'function') { + cachedSetTimeout = setTimeout; + } else { + cachedSetTimeout = defaultSetTimeout; + } + } catch (e) { + cachedSetTimeout = defaultSetTimeout; + } + try { + if (typeof clearTimeout === 'function') { + cachedClearTimeout = clearTimeout; + } else { + cachedClearTimeout = defaultClearTimeout; + } + } catch (e) { + cachedClearTimeout = defaultClearTimeout; + } +}()); + +function runTimeout(fun) { + if (cachedSetTimeout === setTimeout) { + //normal enviroments in sane situations + return setTimeout(fun, 0); + } + // if setTimeout wasn't available but was latter defined + if ((cachedSetTimeout === defaultSetTimeout || !cachedSetTimeout) && setTimeout) { + cachedSetTimeout = setTimeout; + return setTimeout(fun, 0); + } + try { + // when when somebody has screwed with setTimeout but no I.E. maddness + return cachedSetTimeout(fun, 0); + } catch (e) { + try { + // When we are in I.E. but the script has been evaled so I.E. doesn't trust the global object when called normally + return cachedSetTimeout.call(null, fun, 0); + } catch(e){ + // same as above but when it's a version of I.E. that must have the global object for 'this', hopfully our context correct otherwise it will throw a global error + return cachedSetTimeout.call(this, fun, 0); + } + } +} + +function runClearTimeout(marker) { + if (cachedClearTimeout === clearTimeout) { + //normal enviroments in sane situations + return clearTimeout(marker); + } + // if clearTimeout wasn't available but was latter defined + if ((cachedClearTimeout === defaultClearTimeout || !cachedClearTimeout) && clearTimeout) { + cachedClearTimeout = clearTimeout; + return clearTimeout(marker); + } + try { + // when when somebody has screwed with setTimeout but no I.E. maddness + return cachedClearTimeout(marker); + } catch (e){ + try { + // When we are in I.E. but the script has been evaled so I.E. doesn't trust the global object when called normally + return cachedClearTimeout.call(null, marker); + } catch (e){ + // same as above but when it's a version of I.E. that must have the global object for 'this', hopfully our context correct otherwise it will throw a global error. + // Some versions of I.E. have different rules for clearTimeout vs setTimeout + return cachedClearTimeout.call(this, marker); + } + } +} + +var queue = []; +var draining = false; +var currentQueue; +var queueIndex = -1; + +function cleanUpNextTick() { + if (!draining || !currentQueue) { + return; + } + draining = false; + if (currentQueue.length) { + queue = currentQueue.concat(queue); + } else { + queueIndex = -1; + } + if (queue.length) { + drainQueue(); + } +} + +function drainQueue() { + if (draining) { + return; + } + var timeout = runTimeout(cleanUpNextTick); + draining = true; + + var len = queue.length; + while(len) { + currentQueue = queue; + queue = []; + while (++queueIndex < len) { + if (currentQueue) { + currentQueue[queueIndex].run(); + } + } + queueIndex = -1; + len = queue.length; + } + currentQueue = null; + draining = false; + runClearTimeout(timeout); +} + +function nextTick(fun) { + var args = new Array(arguments.length - 1); + if (arguments.length > 1) { + for (var i = 1; i < arguments.length; i++) { + args[i - 1] = arguments[i]; + } + } + queue.push(new Item(fun, args)); + if (queue.length === 1 && !draining) { + runTimeout(drainQueue); + } +} + +function Item(fun, array) { + this.fun = fun; + this.array = array; +} + +Item.prototype.run = function () { + this.fun.apply(null, this.array); +}; diff --git a/fake-process.js b/fake-process.js new file mode 100644 index 0000000000..e01f1ecd99 --- /dev/null +++ b/fake-process.js @@ -0,0 +1 @@ +module.exports = process; diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js index 6fdef12bbb..a94fa4c9c2 100644 --- a/lib/_stream_duplex.js +++ b/lib/_stream_duplex.js @@ -24,7 +24,7 @@ // Writable. 'use strict'; -var nextTick = require('../next-tick.js'); +var fakeProcess = require('../fake-process.js'); /**/ @@ -108,7 +108,7 @@ function onend() { if (this._writableState.ended) return; // no more data can be written. // But allow more writes to happen in this tick. - nextTick(onEndNT, this); + fakeProcess.nextTick(onEndNT, this); } function onEndNT(self) { diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index c074626ff5..672ed667ca 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -20,7 +20,7 @@ // USE OR OTHER DEALINGS IN THE SOFTWARE. 'use strict'; -var nextTick = require('../next-tick.js'); +var fakeProcess = require('../fake-process.js'); module.exports = Readable; /**/ @@ -510,7 +510,7 @@ function emitReadable(stream) { if (!state.emittedReadable) { debug('emitReadable', state.flowing); state.emittedReadable = true; - nextTick(emitReadable_, stream); + fakeProcess.nextTick(emitReadable_, stream); } } @@ -541,7 +541,7 @@ function emitReadable_(stream) { function maybeReadMore(stream, state) { if (!state.readingMore) { state.readingMore = true; - nextTick(maybeReadMore_, stream, state); + fakeProcess.nextTick(maybeReadMore_, stream, state); } } @@ -610,7 +610,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) { debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts); var doEnd = (!pipeOpts || pipeOpts.end !== false) && !isStdStream(dest, 'stdout') && !isStdStream(dest, 'stderr'); var endFn = doEnd ? onend : unpipe; - if (state.endEmitted) nextTick(endFn);else src.once('end', endFn); + if (state.endEmitted) fakeProcess.nextTick(endFn);else src.once('end', endFn); dest.on('unpipe', onunpipe); function onunpipe(readable, unpipeInfo) { @@ -804,7 +804,7 @@ Readable.prototype.on = function (ev, fn) { if (state.length) { emitReadable(this); } else if (!state.reading) { - nextTick(nReadingNextTick, this); + fakeProcess.nextTick(nReadingNextTick, this); } } } @@ -824,7 +824,7 @@ Readable.prototype.removeListener = function (ev, fn) { // support once('readable', fn) cycles. This means that calling // resume within the same tick will have no // effect. - nextTick(updateReadableListening, this); + fakeProcess.nextTick(updateReadableListening, this); } return res; @@ -840,7 +840,7 @@ Readable.prototype.removeAllListeners = function (ev) { // support once('readable', fn) cycles. This means that calling // resume within the same tick will have no // effect. - nextTick(updateReadableListening, this); + fakeProcess.nextTick(updateReadableListening, this); } return res; @@ -885,7 +885,7 @@ Readable.prototype.resume = function () { function resume(stream, state) { if (!state.resumeScheduled) { state.resumeScheduled = true; - nextTick(resume_, stream, state); + fakeProcess.nextTick(resume_, stream, state); } } @@ -1066,7 +1066,7 @@ function endReadable(stream) { if (!state.endEmitted) { state.ended = true; - nextTick(endReadableNT, state, stream); + fakeProcess.nextTick(endReadableNT, state, stream); } } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 92247fe6c8..4987db00eb 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -23,7 +23,7 @@ // the drain event emission and buffering. 'use strict'; -var nextTick = require('../next-tick.js'); +var fakeProcess = require('../fake-process.js'); module.exports = Writable; /* */ @@ -259,7 +259,7 @@ function writeAfterEnd(stream, cb) { var er = new ERR_STREAM_WRITE_AFTER_END(); // TODO: defer error events consistently everywhere, not just the cb stream.emit('error', er); - nextTick(cb, er); + fakeProcess.nextTick(cb, er); } // Checks that a user-supplied chunk is valid, especially for the particular // mode the stream is in. Currently this means that `null` is never accepted // and undefined/non-string values are only allowed in object mode. @@ -276,7 +276,7 @@ function validChunk(stream, state, chunk, cb) { if (er) { stream.emit('error', er); - nextTick(cb, er); + fakeProcess.nextTick(cb, er); return false; } @@ -414,10 +414,10 @@ function onwriteError(stream, state, sync, er, cb) { if (sync) { // defer the callback if we are being called synchronously // to avoid piling up things on the stack - nextTick(cb, er); // this can emit finish, and it will always happen + fakeProcess.nextTick(cb, er); // this can emit finish, and it will always happen // after error - nextTick(finishMaybe, stream, state); + fakeProcess.nextTick(finishMaybe, stream, state); stream._writableState.errorEmitted = true; stream.emit('error', er); } else { @@ -454,7 +454,7 @@ function onwrite(stream, er) { } if (sync) { - nextTick(afterWrite, stream, state, finished, cb); + fakeProcess.nextTick(afterWrite, stream, state, finished, cb); } else { afterWrite(stream, state, finished, cb); } @@ -603,7 +603,7 @@ function prefinish(stream, state) { if (typeof stream._final === 'function' && !state.destroyed) { state.pendingcb++; state.finalCalled = true; - nextTick(callFinal, stream, state); + fakeProcess.nextTick(callFinal, stream, state); } else { state.prefinished = true; stream.emit('prefinish'); @@ -631,7 +631,7 @@ function endWritable(stream, state, cb) { finishMaybe(stream, state); if (cb) { - if (state.finished) nextTick(cb);else stream.once('finish', cb); + if (state.finished) fakeProcess.nextTick(cb);else stream.once('finish', cb); } state.ended = true; diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 1ec7612b3a..532760cff7 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -4,7 +4,7 @@ var _Object$setPrototypeO; function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } -var nextTick = require('../../../next-tick.js'); +var fakeProcess = require('../../../fake-process.js'); var finished = require('./end-of-stream'); @@ -43,7 +43,7 @@ function readAndResolve(iter) { function onReadable(iter) { // we wait for the next tick, because it might // emit an error with process.nextTick - nextTick(readAndResolve, iter); + fakeProcess.nextTick(readAndResolve, iter); } function wrapForNext(lastPromise, iter) { @@ -86,7 +86,7 @@ var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPro // we cannot guarantee that there is no error lingering around // waiting to be emitted. return new Promise(function (resolve, reject) { - nextTick(function () { + fakeProcess.nextTick(function () { if (_this[kError]) { reject(_this[kError]); } else { diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 67bcbf8678..c9de905f19 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -1,6 +1,6 @@ 'use strict'; -var nextTick = require('../../../next-tick.js'); // undocumented cb() API, needed for core, not for public API +var fakeProcess = require('../../../fake-process.js'); // undocumented cb() API, needed for core, not for public API function destroy(err, cb) { @@ -13,7 +13,7 @@ function destroy(err, cb) { if (cb) { cb(err); } else if (err && (!this._writableState || !this._writableState.errorEmitted)) { - nextTick(emitErrorNT, this, err); + fakeProcess.nextTick(emitErrorNT, this, err); } return this; @@ -32,16 +32,16 @@ function destroy(err, cb) { this._destroy(err || null, function (err) { if (!cb && err) { - nextTick(emitErrorAndCloseNT, _this, err); + fakeProcess.nextTick(emitErrorAndCloseNT, _this, err); if (_this._writableState) { _this._writableState.errorEmitted = true; } } else if (cb) { - nextTick(emitCloseNT, _this); + fakeProcess.nextTick(emitCloseNT, _this); cb(err); } else { - nextTick(emitCloseNT, _this); + fakeProcess.nextTick(emitCloseNT, _this); } }); diff --git a/next-tick-browser.js b/next-tick-browser.js deleted file mode 100644 index 7a0fbf72a7..0000000000 --- a/next-tick-browser.js +++ /dev/null @@ -1,8 +0,0 @@ -module.exports = nextTick; - -function nextTick(fn) { - var args = Array.prototype.slice.call(arguments, 1); - setTimeout(function () { - fn.apply(null, args); - }, 0); -} diff --git a/next-tick.js b/next-tick.js deleted file mode 100644 index 0065f8db96..0000000000 --- a/next-tick.js +++ /dev/null @@ -1,5 +0,0 @@ -module.exports = nextTick; - -function nextTick(...args) { - process.nextTick(...args); -} diff --git a/package.json b/package.json index 7042131f34..4cc056f9c4 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,7 @@ "worker_threads": false, "./errors": "./errors-browser.js", "./readable.js": "./readable-browser.js", - "./next-tick.js": "./next-tick-browser.js", + "./fake-process.js": "./fake-process-browser.js", "./lib/internal/streams/stream.js": "./lib/internal/streams/stream-browser.js" }, "nyc": { From 5798745237e9627141c3540aff93e115e087749d Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Wed, 10 Jul 2019 14:56:24 +0200 Subject: [PATCH 4/4] Fixed tests, updated to 10.16.0. --- README.md | 2 +- build/test-replacements.js | 4 + lib/_stream_readable.js | 25 +++- lib/_stream_writable.js | 28 +++- lib/internal/streams/buffer_list.js | 4 +- lib/internal/streams/destroy.js | 32 ++++- test/common/README.md | 11 ++ test/common/index.js | 125 +++++++++++++----- test/common/tick.js | 48 +++++++ test/parallel/test-stream-auto-destroy.js | 99 ++++++++++++++ test/parallel/test-stream-buffer-list.js | 2 - test/parallel/test-stream-writable-destroy.js | 22 +++ .../test-stream2-readable-from-list.js | 1 - 13 files changed, 344 insertions(+), 59 deletions(-) create mode 100644 test/common/tick.js create mode 100644 test/parallel/test-stream-auto-destroy.js diff --git a/README.md b/README.md index 28ccae1616..22fcec2145 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ npm install --save readable-stream This package is a mirror of the streams implementations in Node.js. -Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.15.3/docs/api/stream.html). +Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.16.0/docs/api/stream.html). If you want to guarantee a stable streams base, regardless of what version of Node you, or the users of your libraries are using, use **readable-stream** *only* and avoid the *"stream"* module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html). diff --git a/build/test-replacements.js b/build/test-replacements.js index b9b1fc2396..3bcca752cb 100644 --- a/build/test-replacements.js +++ b/build/test-replacements.js @@ -59,6 +59,10 @@ module.exports.all = [ /require\(['"]assert['"]\)/g , 'require(\'assert/\')' ] + , [ + /.*--expose.internals.*/ + , '' + ] ] module.exports['test-stream2-basic.js'] = [ diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 672ed667ca..adacbfebda 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -93,6 +93,7 @@ var createReadableStreamAsyncIterator; require('inherits')(Readable, Stream); +var errorOrDestroy = destroyImpl.errorOrDestroy; var kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; function prependListener(emitter, event, fn) { @@ -146,7 +147,9 @@ function ReadableState(options, stream, isDuplex) { this.resumeScheduled = false; this.paused = true; // Should close be emitted on destroy. Defaults to true. - this.emitClose = options.emitClose !== false; // has it been destroyed + this.emitClose = options.emitClose !== false; // Should .destroy() be called after 'end' (and potentially 'finish') + + this.autoDestroy = !!options.autoDestroy; // has it been destroyed this.destroyed = false; // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. @@ -259,16 +262,16 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { if (!skipChunkCheck) er = chunkInvalid(state, chunk); if (er) { - stream.emit('error', er); + errorOrDestroy(stream, er); } else if (state.objectMode || chunk && chunk.length > 0) { if (typeof chunk !== 'string' && !state.objectMode && Object.getPrototypeOf(chunk) !== Buffer.prototype) { chunk = _uint8ArrayToBuffer(chunk); } if (addToFront) { - if (state.endEmitted) stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());else addChunk(stream, state, chunk, true); + if (state.endEmitted) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());else addChunk(stream, state, chunk, true); } else if (state.ended) { - stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF()); + errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); } else if (state.destroyed) { return false; } else { @@ -585,7 +588,7 @@ function maybeReadMore_(stream, state) { Readable.prototype._read = function (n) { - this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()')); + errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()')); }; Readable.prototype.pipe = function (dest, pipeOpts) { @@ -684,7 +687,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) { debug('onerror', er); unpipe(); dest.removeListener('error', onerror); - if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er); + if (EElistenerCount(dest, 'error') === 0) errorOrDestroy(dest, er); } // Make sure our error handler is attached before userland ones. @@ -1077,6 +1080,16 @@ function endReadableNT(state, stream) { state.endEmitted = true; stream.readable = false; stream.emit('end'); + + if (state.autoDestroy) { + // In case of duplex streams we need a way to detect + // if the writable side is ready for autoDestroy as well + var wState = stream._writableState; + + if (!wState || wState.autoDestroy && wState.finished) { + stream.destroy(); + } + } } } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 4987db00eb..5416996313 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -96,6 +96,8 @@ var _require$codes = require('../errors').codes, ERR_STREAM_WRITE_AFTER_END = _require$codes.ERR_STREAM_WRITE_AFTER_END, ERR_UNKNOWN_ENCODING = _require$codes.ERR_UNKNOWN_ENCODING; +var errorOrDestroy = destroyImpl.errorOrDestroy; + require('inherits')(Writable, Stream); function nop() {} @@ -175,7 +177,9 @@ function WritableState(options, stream, isDuplex) { this.errorEmitted = false; // Should close be emitted on destroy. Defaults to true. - this.emitClose = options.emitClose !== false; // count buffered requests + this.emitClose = options.emitClose !== false; // Should .destroy() be called after 'finish' (and potentially 'end') + + this.autoDestroy = !!options.autoDestroy; // count buffered requests this.bufferedRequestCount = 0; // allocate the first CorkedRequest, there is always // one allocated and free to use, and we maintain at most two @@ -252,13 +256,13 @@ function Writable(options) { Writable.prototype.pipe = function () { - this.emit('error', new ERR_STREAM_CANNOT_PIPE()); + errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); }; function writeAfterEnd(stream, cb) { var er = new ERR_STREAM_WRITE_AFTER_END(); // TODO: defer error events consistently everywhere, not just the cb - stream.emit('error', er); + errorOrDestroy(stream, er); fakeProcess.nextTick(cb, er); } // Checks that a user-supplied chunk is valid, especially for the particular // mode the stream is in. Currently this means that `null` is never accepted @@ -275,7 +279,7 @@ function validChunk(stream, state, chunk, cb) { } if (er) { - stream.emit('error', er); + errorOrDestroy(stream, er); fakeProcess.nextTick(cb, er); return false; } @@ -419,13 +423,13 @@ function onwriteError(stream, state, sync, er, cb) { fakeProcess.nextTick(finishMaybe, stream, state); stream._writableState.errorEmitted = true; - stream.emit('error', er); + errorOrDestroy(stream, er); } else { // the caller expect this to happen before if // it is async cb(er); stream._writableState.errorEmitted = true; - stream.emit('error', er); // this can emit finish, but finish must + errorOrDestroy(stream, er); // this can emit finish, but finish must // always follow error finishMaybe(stream, state); @@ -589,7 +593,7 @@ function callFinal(stream, state) { state.pendingcb--; if (err) { - stream.emit('error', err); + errorOrDestroy(stream, err); } state.prefinished = true; @@ -620,6 +624,16 @@ function finishMaybe(stream, state) { if (state.pendingcb === 0) { state.finished = true; stream.emit('finish'); + + if (state.autoDestroy) { + // In case of duplex streams we need a way to detect + // if the readable side is ready for autoDestroy as well + var rState = stream._readableState; + + if (!rState || rState.autoDestroy && rState.endEmitted) { + stream.destroy(); + } + } } } diff --git a/lib/internal/streams/buffer_list.js b/lib/internal/streams/buffer_list.js index 142cf26f31..3d69d0609d 100644 --- a/lib/internal/streams/buffer_list.js +++ b/lib/internal/streams/buffer_list.js @@ -1,6 +1,8 @@ 'use strict'; -function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { if (i % 2) { var source = arguments[i] != null ? arguments[i] : {}; var ownKeys = Object.keys(source); if (typeof Object.getOwnPropertySymbols === 'function') { ownKeys = ownKeys.concat(Object.getOwnPropertySymbols(source).filter(function (sym) { return Object.getOwnPropertyDescriptor(source, sym).enumerable; })); } ownKeys.forEach(function (key) { _defineProperty(target, key, source[key]); }); } else { Object.defineProperties(target, Object.getOwnPropertyDescriptors(arguments[i])); } } return target; } +function ownKeys(object, enumerableOnly) { var keys = Object.keys(object); if (Object.getOwnPropertySymbols) { keys.push.apply(keys, Object.getOwnPropertySymbols(object)); } if (enumerableOnly) keys = keys.filter(function (sym) { return Object.getOwnPropertyDescriptor(object, sym).enumerable; }); return keys; } + +function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; if (i % 2) { ownKeys(source, true).forEach(function (key) { _defineProperty(target, key, source[key]); }); } else if (Object.getOwnPropertyDescriptors) { Object.defineProperties(target, Object.getOwnPropertyDescriptors(source)); } else { ownKeys(source).forEach(function (key) { Object.defineProperty(target, key, Object.getOwnPropertyDescriptor(source, key)); }); } } return target; } function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index c9de905f19..4681a88d73 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -12,8 +12,13 @@ function destroy(err, cb) { if (readableDestroyed || writableDestroyed) { if (cb) { cb(err); - } else if (err && (!this._writableState || !this._writableState.errorEmitted)) { - fakeProcess.nextTick(emitErrorNT, this, err); + } else if (err) { + if (!this._writableState) { + fakeProcess.nextTick(emitErrorNT, this, err); + } else if (!this._writableState.errorEmitted) { + this._writableState.errorEmitted = true; + fakeProcess.nextTick(emitErrorNT, this, err); + } } return this; @@ -32,10 +37,13 @@ function destroy(err, cb) { this._destroy(err || null, function (err) { if (!cb && err) { - fakeProcess.nextTick(emitErrorAndCloseNT, _this, err); - - if (_this._writableState) { + if (!_this._writableState) { + fakeProcess.nextTick(emitErrorAndCloseNT, _this, err); + } else if (!_this._writableState.errorEmitted) { _this._writableState.errorEmitted = true; + fakeProcess.nextTick(emitErrorAndCloseNT, _this, err); + } else { + fakeProcess.nextTick(emitCloseNT, _this); } } else if (cb) { fakeProcess.nextTick(emitCloseNT, _this); @@ -82,7 +90,19 @@ function emitErrorNT(self, err) { self.emit('error', err); } +function errorOrDestroy(stream, err) { + // We have tests that rely on errors being emitted + // in the same tick, so changing this is semver major. + // For now when you opt-in to autoDestroy we allow + // the error to be emitted nextTick. In a future + // semver major update we should change the default to this. + var rState = stream._readableState; + var wState = stream._writableState; + if (rState && rState.autoDestroy || wState && wState.autoDestroy) stream.destroy(err);else stream.emit('error', err); +} + module.exports = { destroy: destroy, - undestroy: undestroy + undestroy: undestroy, + errorOrDestroy: errorOrDestroy }; \ No newline at end of file diff --git a/test/common/README.md b/test/common/README.md index 7bf22e3c8f..0e773debfc 100644 --- a/test/common/README.md +++ b/test/common/README.md @@ -17,6 +17,7 @@ This directory contains modules used to test the Node.js implementation. * [Heap dump checker module](#heap-dump-checker-module) * [HTTP2 module](#http2-module) * [Internet module](#internet-module) +* [tick module](#tick-module) * [tmpdir module](#tmpdir-module) * [WPT module](#wpt-module) @@ -748,6 +749,16 @@ a full `setImmediate()` invocation passes. should not be in scope when `listener.ongc()` is created. +## tick Module + +The `tick` module provides a helper function that can be used to call a callback +after a given number of event loop "ticks". + +### tick(x, cb) + +* `x` [<number>] Number of event loop "ticks". +* `cb` [<Function>] A callback function. + ## tmpdir Module The `tmpdir` module supports the use of a temporary directory for testing. diff --git a/test/common/index.js b/test/common/index.js index d75835358c..a1bf7015a0 100644 --- a/test/common/index.js +++ b/test/common/index.js @@ -93,14 +93,72 @@ var _process$binding = process.binding('config'), var noop = function noop() {}; +var hasCrypto = true; + var isMainThread = function () { - try { + if (require('module').builtinModules.includes('worker_threads')) { return require('worker_threads').isMainThread; - } catch (_e) { - // Worker module not enabled → only a single main thread exists. - return true; + } // Worker module not enabled → only a single main thread exists. + + + return true; +}(); // Check for flags. Skip this for workers (both, the `cluster` module and +// `worker_threads`) and child processes. + + +if (process.argv.length === 2 && isMainThread && module.parent && require('cluster').isMaster) { + // The copyright notice is relatively big and the flags could come afterwards. + var bytesToRead = 1500; + var buffer = Buffer.allocUnsafe(bytesToRead); + var fd = fs.openSync(module.parent.filename, 'r'); + var bytesRead = fs.readSync(fd, buffer, 0, bytesToRead); + fs.closeSync(fd); + var source = buffer.toString('utf8', 0, bytesRead); + var flagStart = source.indexOf('// Flags: --') + 10; + + if (flagStart !== 9) { + var flagEnd = source.indexOf('\n', flagStart); // Normalize different EOL. + + if (source[flagEnd - 1] === '\r') { + flagEnd--; + } + + var flags = source.substring(flagStart, flagEnd).replace(/_/g, '-').split(' '); + var args = process.execArgv.map(function (arg) { + return arg.replace(/_/g, '-'); + }); + var _iteratorNormalCompletion = true; + var _didIteratorError = false; + var _iteratorError = undefined; + + try { + for (var _iterator = flags[Symbol.iterator](), _step; !(_iteratorNormalCompletion = (_step = _iterator.next()).done); _iteratorNormalCompletion = true) { + var flag = _step.value; + + if (!args.includes(flag) && // If the binary was built without-ssl then the crypto flags are + // invalid (bad option). The test itself should handle this case. + hasCrypto && ( // If the binary is build without `intl` the inspect option is + // invalid. The test itself should handle this case. + process.config.variables.v8_enable_inspector !== 0 || !flag.startsWith('--inspect'))) { + throw new Error("Test has to be started with the flag: '".concat(flag, "'")); + } + } + } catch (err) { + _didIteratorError = true; + _iteratorError = err; + } finally { + try { + if (!_iteratorNormalCompletion && _iterator.return != null) { + _iterator.return(); + } + } finally { + if (_didIteratorError) { + throw _iteratorError; + } + } + } } -}(); +} var isWindows = process.platform === 'win32'; var isAIX = process.platform === 'aix'; @@ -110,7 +168,6 @@ var isFreeBSD = process.platform === 'freebsd'; var isOpenBSD = process.platform === 'openbsd'; var isLinux = process.platform === 'linux'; var isOSX = process.platform === 'darwin'; -var isOSXMojave = isOSX && os.release().startsWith('18'); var enoughTestMem = os.totalmem() > 0x70000000; /* 1.75 Gb */ @@ -119,8 +176,7 @@ var cpus = os.cpus().length === 0 ? [{ }] : os.cpus(); var enoughTestCpu = Array.isArray(cpus) && (cpus.length > 1 || cpus[0].speed > 999); var rootDir = isWindows ? 'c:\\' : '/'; -var buildType = 'readable-stream'; -var hasCrypto = true; // If env var is set then enable async_hook hooks for all tests. +var buildType = 'readable-stream'; // If env var is set then enable async_hook hooks for all tests. if (process.env.NODE_TEST_WITH_ASYNC_HOOKS) { var destroydIdsList = {}; @@ -404,7 +460,7 @@ function canCreateSymLink() { timout: 1000 }); return output.includes('SeCreateSymbolicLinkPrivilege'); - } catch (_unused) { + } catch (_e) { return false; } } // On non-Windows platforms, this always returns `true` @@ -488,7 +544,7 @@ function isAlive(pid) { try { process.kill(pid, 'SIGCONT'); return true; - } catch (_unused2) { + } catch (_unused) { return false; } } @@ -550,26 +606,26 @@ function expectWarning(nameOrMap, expected, code) { } var Comparison = function Comparison(obj, keys) { - var _iteratorNormalCompletion = true; - var _didIteratorError = false; - var _iteratorError = undefined; + var _iteratorNormalCompletion2 = true; + var _didIteratorError2 = false; + var _iteratorError2 = undefined; try { - for (var _iterator = keys[Symbol.iterator](), _step; !(_iteratorNormalCompletion = (_step = _iterator.next()).done); _iteratorNormalCompletion = true) { - var key = _step.value; + for (var _iterator2 = keys[Symbol.iterator](), _step2; !(_iteratorNormalCompletion2 = (_step2 = _iterator2.next()).done); _iteratorNormalCompletion2 = true) { + var key = _step2.value; if (key in obj) this[key] = obj[key]; } } catch (err) { - _didIteratorError = true; - _iteratorError = err; + _didIteratorError2 = true; + _iteratorError2 = err; } finally { try { - if (!_iteratorNormalCompletion && _iterator.return != null) { - _iterator.return(); + if (!_iteratorNormalCompletion2 && _iterator2.return != null) { + _iterator2.return(); } } finally { - if (_didIteratorError) { - throw _iteratorError; + if (_didIteratorError2) { + throw _iteratorError2; } } } @@ -621,13 +677,13 @@ function expectsError(fn, settings, exact) { var keys = objectKeys(settings); - var _iteratorNormalCompletion2 = true; - var _didIteratorError2 = false; - var _iteratorError2 = undefined; + var _iteratorNormalCompletion3 = true; + var _didIteratorError3 = false; + var _iteratorError3 = undefined; try { - for (var _iterator2 = keys[Symbol.iterator](), _step2; !(_iteratorNormalCompletion2 = (_step2 = _iterator2.next()).done); _iteratorNormalCompletion2 = true) { - var key = _step2.value; + for (var _iterator3 = keys[Symbol.iterator](), _step3; !(_iteratorNormalCompletion3 = (_step3 = _iterator3.next()).done); _iteratorNormalCompletion3 = true) { + var key = _step3.value; if (!require('deep-strict-equal')(error[key], innerSettings[key])) { // Create placeholder objects to create a nice output. @@ -651,16 +707,16 @@ function expectsError(fn, settings, exact) { } } } catch (err) { - _didIteratorError2 = true; - _iteratorError2 = err; + _didIteratorError3 = true; + _iteratorError3 = err; } finally { try { - if (!_iteratorNormalCompletion2 && _iterator2.return != null) { - _iterator2.return(); + if (!_iteratorNormalCompletion3 && _iterator3.return != null) { + _iterator3.return(); } } finally { - if (_didIteratorError2) { - throw _iteratorError2; + if (_didIteratorError3) { + throw _iteratorError3; } } } @@ -740,7 +796,7 @@ function getTTYfd() { if (ttyFd === undefined) { try { return fs.openSync('/dev/tty'); - } catch (_unused3) { + } catch (_unused2) { // There aren't any tty fd's available to use. return -1; } @@ -757,7 +813,7 @@ function runWithInvalidFD(func) { while (fs.fstatSync(fd--) && fd > 0) { ; } - } catch (_unused4) { + } catch (_unused3) { return func(fd); } @@ -792,7 +848,6 @@ module.exports = { isMainThread: isMainThread, isOpenBSD: isOpenBSD, isOSX: isOSX, - isOSXMojave: isOSXMojave, isSunOS: isSunOS, isWindows: isWindows, localIPv6Hosts: localIPv6Hosts, diff --git a/test/common/tick.js b/test/common/tick.js new file mode 100644 index 0000000000..f4f5fb772d --- /dev/null +++ b/test/common/tick.js @@ -0,0 +1,48 @@ +"use strict"; + +/**/ +require('@babel/polyfill'); + +var util = require('util'); + +for (var i in util) { + exports[i] = util[i]; +} +/**/ + + +'use strict'; +/**/ + + +var objectKeys = objectKeys || function (obj) { + var keys = []; + + for (var key in obj) { + keys.push(key); + } + + return keys; +}; +/**/ + + +require('../common'); + +module.exports = function tick(x, cb) { + function ontick() { + if (--x === 0) { + if (typeof cb === 'function') cb(); + } else { + setImmediate(ontick); + } + } + + setImmediate(ontick); +}; + +function forEach(xs, f) { + for (var i = 0, l = xs.length; i < l; i++) { + f(xs[i], i); + } +} \ No newline at end of file diff --git a/test/parallel/test-stream-auto-destroy.js b/test/parallel/test-stream-auto-destroy.js new file mode 100644 index 0000000000..93338b4c20 --- /dev/null +++ b/test/parallel/test-stream-auto-destroy.js @@ -0,0 +1,99 @@ +"use strict"; + +/**/ +var bufferShim = require('safe-buffer').Buffer; +/**/ + + +var common = require('../common'); + +var stream = require('../../'); + +var assert = require('assert/'); + +{ + var r = new stream.Readable({ + autoDestroy: true, + read: function read() { + this.push('hello'); + this.push('world'); + this.push(null); + }, + destroy: common.mustCall(function (err, cb) { + return cb(); + }) + }); + var ended = false; + r.resume(); + r.on('end', common.mustCall(function () { + ended = true; + })); + r.on('close', common.mustCall(function () { + assert(ended); + })); +} +{ + var w = new stream.Writable({ + autoDestroy: true, + write: function write(data, enc, cb) { + cb(null); + }, + destroy: common.mustCall(function (err, cb) { + return cb(); + }) + }); + var finished = false; + w.write('hello'); + w.write('world'); + w.end(); + w.on('finish', common.mustCall(function () { + finished = true; + })); + w.on('close', common.mustCall(function () { + assert(finished); + })); +} +{ + var t = new stream.Transform({ + autoDestroy: true, + transform: function transform(data, enc, cb) { + cb(null, data); + }, + destroy: common.mustCall(function (err, cb) { + return cb(); + }) + }); + var _ended = false; + var _finished = false; + t.write('hello'); + t.write('world'); + t.end(); + t.resume(); + t.on('end', common.mustCall(function () { + _ended = true; + })); + t.on('finish', common.mustCall(function () { + _finished = true; + })); + t.on('close', common.mustCall(function () { + assert(_ended); + assert(_finished); + })); +} +; + +(function () { + var t = require('tap'); + + t.pass('sync run'); +})(); + +var _list = process.listeners('uncaughtException'); + +process.removeAllListeners('uncaughtException'); + +_list.pop(); + +_list.forEach(function (e) { + return process.on('uncaughtException', e); +}); \ No newline at end of file diff --git a/test/parallel/test-stream-buffer-list.js b/test/parallel/test-stream-buffer-list.js index 99f3b5a8bc..85d1aea460 100644 --- a/test/parallel/test-stream-buffer-list.js +++ b/test/parallel/test-stream-buffer-list.js @@ -1,7 +1,5 @@ "use strict"; -// Flags: --expose_internals - /**/ var bufferShim = require('safe-buffer').Buffer; /**/ diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index cc00ce8c38..dae0316b01 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -189,6 +189,28 @@ var _require2 = require('util'), assert.strictEqual(_write7._writableState.errorEmitted, true); assert.strictEqual(_write7.destroyed, true); } +{ + var writable = new Writable({ + destroy: common.mustCall(function (err, cb) { + process.nextTick(cb, new Error('kaboom 1')); + }), + write: function write(chunk, enc, cb) { + cb(); + } + }); + writable.on('close', common.mustCall()); + writable.on('error', common.expectsError({ + type: Error, + message: 'kaboom 2' + })); + writable.destroy(); + assert.strictEqual(writable.destroyed, true); + assert.strictEqual(writable._writableState.errorEmitted, false); // Test case where `writable.destroy()` is called again with an error before + // the `_destroy()` callback is called. + + writable.destroy(new Error('kaboom 2')); + assert.strictEqual(writable._writableState.errorEmitted, true); +} { var _write8 = new Writable({ write: function write(chunk, enc, cb) { diff --git a/test/parallel/test-stream2-readable-from-list.js b/test/parallel/test-stream2-readable-from-list.js index 1d9e285749..90376973d9 100644 --- a/test/parallel/test-stream2-readable-from-list.js +++ b/test/parallel/test-stream2-readable-from-list.js @@ -20,7 +20,6 @@ // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. -// Flags: --expose_internals /**/ var bufferShim = require('safe-buffer').Buffer;