From d44775c68ff456d91ffb27903cffc456ced8e730 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 21:00:33 +0200 Subject: [PATCH 01/17] stream: implement fetch body mixin on Readable Make Readable exposew the fetch boxy mixin API. Bypasses webstream glue when possible. Refs: https://fetch.spec.whatwg.org/#body-mixin --- .../streams/readable-readstring-async.js | 37 ++ benchmark/streams/readable-readstring-data.js | 32 ++ .../streams/readable-readstring-native.js | 25 ++ benchmark/streams/readable-readstring-text.js | 28 ++ doc/api/stream.md | 61 +++ lib/internal/streams/readable.js | 340 ++++++++++++++++- .../test-stream-readable-body-mixin.js | 355 ++++++++++++++++++ 7 files changed, 859 insertions(+), 19 deletions(-) create mode 100644 benchmark/streams/readable-readstring-async.js create mode 100644 benchmark/streams/readable-readstring-data.js create mode 100644 benchmark/streams/readable-readstring-native.js create mode 100644 benchmark/streams/readable-readstring-text.js create mode 100644 test/parallel/test-stream-readable-body-mixin.js diff --git a/benchmark/streams/readable-readstring-async.js b/benchmark/streams/readable-readstring-async.js new file mode 100644 index 00000000000000..f2eff78ba575f2 --- /dev/null +++ b/benchmark/streams/readable-readstring-async.js @@ -0,0 +1,37 @@ +'use strict'; + +const common = require('../common'); +const Readable = require('stream').Readable; + +const bench = common.createBenchmark(main, { + n: [50e2] +}); + +function main({ n }) { + const b = 'some random string'; + const s = new Readable(); + function noop() {} + s._read = noop; + + bench.start(); + + text(s).then((ret) => { + console.log(ret.length); + bench.end(n, ret); + }); + + for (let k = 0; k < n; ++k) { + for (let i = 0; i < 1e3; ++i) + s.push(b); + } + s.push(null); +} + +async function text(s) { + s.setEncoding('utf8'); + let ret = ''; + for await (const chunk of s) { + ret += chunk; + } + return ret; +} diff --git a/benchmark/streams/readable-readstring-data.js b/benchmark/streams/readable-readstring-data.js new file mode 100644 index 00000000000000..03d9e36d38befc --- /dev/null +++ b/benchmark/streams/readable-readstring-data.js @@ -0,0 +1,32 @@ +'use strict'; + +const common = require('../common'); +const Readable = require('stream').Readable; + +const bench = common.createBenchmark(main, { + n: [50e2] +}); + +function main({ n }) { + const b = 'some random string'; + const s = new Readable(); + function noop() {} + s._read = noop; + + bench.start(); + + let ret = ''; + s.setEncoding('utf8'); + s.on('data', (chunk) => { + ret += chunk; + }).on('end', () => { + console.log(ret.length); + bench.end(n, ret); + }); + + for (let k = 0; k < n; ++k) { + for (let i = 0; i < 1e3; ++i) + s.push(b); + } + s.push(null); +} diff --git a/benchmark/streams/readable-readstring-native.js b/benchmark/streams/readable-readstring-native.js new file mode 100644 index 00000000000000..1374bcf53aff76 --- /dev/null +++ b/benchmark/streams/readable-readstring-native.js @@ -0,0 +1,25 @@ +'use strict'; + +const common = require('../common'); +const Readable = require('stream').Readable; + +const bench = common.createBenchmark(main, { + n: [50e2] +}); + +function main({ n }) { + const b = 'some random string'; + const s = new Readable(); + function noop() {} + s._read = noop; + + bench.start(); + + let ret = ''; + for (let k = 0; k < n; ++k) { + for (let i = 0; i < 1e3; ++i) + ret += b; + } + console.log(ret.length); + bench.end(n); +} diff --git a/benchmark/streams/readable-readstring-text.js b/benchmark/streams/readable-readstring-text.js new file mode 100644 index 00000000000000..eba63f4077646e --- /dev/null +++ b/benchmark/streams/readable-readstring-text.js @@ -0,0 +1,28 @@ +'use strict'; + +const common = require('../common'); +const Readable = require('stream').Readable; + +const bench = common.createBenchmark(main, { + n: [50e2] +}); + +function main({ n }) { + const b = 'some random string'; + const s = new Readable(); + function noop() {} + s._read = noop; + + bench.start(); + + s.text().then((ret) => { + console.log(ret.length); + bench.end(n); + }); + + for (let k = 0; k < n; ++k) { + for (let i = 0; i < 1e3; ++i) + s.push(b); + } + s.push(null); +} diff --git a/doc/api/stream.md b/doc/api/stream.md index d616bb78bccfd0..e4a3db97b1b0d8 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -814,6 +814,8 @@ added: v0.9.4 +`Readable` implements the body mixin API from the [fetch specification][]. + ##### Event: `'close'` + +* Returns: {Promise} + +Returns a promise resolving to the full content as an array buffer. + +##### `readable.blob()` + + +* Returns: {Promise} + +Returns a promise resolving to the full content read as +Blob. + +##### `readable.body` + + +* Returns: {ReadableStream} + +Returns a `ReadableStream`. + +##### `readable.bodyUsed` + + +* Returns: {Boolean} + +Returns true if stream has emitted `'data'`, `'end'`, `'error'` or +`'close'`. + ##### `readable.destroy([error])` + +* Returns: {Promise} + +Returns a promise resolving to the full content read as +utf8 string and parsed to JSON object. + ##### `readable.pause()` + +* Returns: {Promise} + +Returns a promise resolving to the full content read as +utf8 string. + ##### `readable.unpipe([destination])` +> Stability: 1 - Experimental + * Returns: {Promise} Returns a promise resolving to the full content as an array buffer. @@ -1002,6 +1004,8 @@ Returns a promise resolving to the full content as an array buffer. added: REPLACEME ---> +> Stability: 1 - Experimental + * Returns: {Promise} Returns a promise resolving to the full content read as @@ -1012,6 +1016,8 @@ Blob. added: REPLACEME ---> +> Stability: 1 - Experimental + * Returns: {ReadableStream} Returns a `ReadableStream`. @@ -1021,6 +1027,8 @@ Returns a `ReadableStream`. added: REPLACEME ---> +> Stability: 1 - Experimental + * Returns: {Boolean} Returns true if stream has emitted `'data'`, `'end'`, `'error'` or @@ -1085,6 +1093,8 @@ readable.isPaused(); // === false added: REPLACEME ---> +> Stability: 1 - Experimental + * Returns: {Promise} Returns a promise resolving to the full content read as @@ -1396,6 +1406,8 @@ readable.on('data', (chunk) => { added: REPLACEME ---> +> Stability: 1 - Experimental + * Returns: {Promise} Returns a promise resolving to the full content read as From 13bbb010dafb23d91eba568b521f15cb9b5297d3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 28 Jul 2021 23:07:03 +0200 Subject: [PATCH 06/17] Apply suggestions from code review Co-authored-by: James M Snell --- doc/api/stream.md | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index ce1e7289495bc5..b83464c520981d 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -997,7 +997,7 @@ added: REPLACEME * Returns: {Promise} -Returns a promise resolving to the full content as an array buffer. +Returns a promise fulfilled with the full content as an {ArrayBuffer}. ##### `readable.blob()` -`Readable` implements the body mixin API from the [fetch specification][]. - ##### Event: `'close'` +`Readable` implements the body mixin API from the [fetch specification][]. + ##### Event: `'close'` @@ -1000,7 +1000,7 @@ added: REPLACEME Returns a promise fulfilled with the full content as an {ArrayBuffer}. ##### `readable.blob()` - @@ -1011,7 +1011,7 @@ added: REPLACEME Returns a promise fulfilled with the full content as a {Blob}. ##### `readable.body` - @@ -1022,7 +1022,7 @@ added: REPLACEME Returns a `ReadableStream`. ##### `readable.bodyUsed` - @@ -1088,7 +1088,7 @@ readable.isPaused(); // === false ``` ##### `readable.json()` - @@ -1401,7 +1401,7 @@ readable.on('data', (chunk) => { ``` ##### `readable.text()` - From 2e9fa0dd1ffb706bcddf44c51f1bd1a82ae7efb6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 29 Jul 2021 13:26:42 +0200 Subject: [PATCH 13/17] fixup --- doc/api/stream.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index f1eb774bab9c49..eb3eef791d2104 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -991,7 +991,7 @@ called and `readableFlowing` is not `true`. ##### `readable.arrayBuffer()` +--> > Stability: 1 - Experimental @@ -1002,7 +1002,7 @@ Returns a promise fulfilled with the full content as an {ArrayBuffer}. ##### `readable.blob()` +--> > Stability: 1 - Experimental @@ -1013,7 +1013,7 @@ Returns a promise fulfilled with the full content as a {Blob}. ##### `readable.body` +--> > Stability: 1 - Experimental @@ -1022,9 +1022,9 @@ added: REPLACEME Returns a `ReadableStream`. ##### `readable.bodyUsed` - +--> > Stability: 1 - Experimental @@ -1090,7 +1090,7 @@ readable.isPaused(); // === false ##### `readable.json()` +--> > Stability: 1 - Experimental @@ -1403,7 +1403,7 @@ readable.on('data', (chunk) => { ##### `readable.text()` +--> > Stability: 1 - Experimental From 72588db13b928df3d01a93d8fb0627d1bc2a6dc0 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 29 Jul 2021 13:55:18 +0200 Subject: [PATCH 14/17] fixup --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index eb3eef791d2104..67a7618f7f2e60 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1028,7 +1028,7 @@ added: REPLACEME > Stability: 1 - Experimental -* Returns: {Boolean} +* Returns: {boolean} Returns true if stream has emitted `'data'`, `'end'`, `'error'` or `'close'`. From 95a30f08f393672adeeac993038c843d94934200 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 29 Jul 2021 14:20:45 +0200 Subject: [PATCH 15/17] fixup --- doc/api/stream.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 67a7618f7f2e60..c9bff3bab9e41a 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -989,7 +989,7 @@ The `'resume'` event is emitted when [`stream.resume()`][stream-resume] is called and `readableFlowing` is not `true`. ##### `readable.arrayBuffer()` - @@ -1000,7 +1000,7 @@ added: REPLACEME Returns a promise fulfilled with the full content as an {ArrayBuffer}. ##### `readable.blob()` - @@ -1011,7 +1011,7 @@ added: REPLACEME Returns a promise fulfilled with the full content as a {Blob}. ##### `readable.body` - @@ -1088,7 +1088,7 @@ readable.isPaused(); // === false ``` ##### `readable.json()` - @@ -1401,7 +1401,7 @@ readable.on('data', (chunk) => { ``` ##### `readable.text()` - From d3df1ece7b6ba2e32361e00da6ee4c662a1fa2a1 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 29 Jul 2021 15:01:23 +0200 Subject: [PATCH 16/17] fixuP --- .../streams/readable-readstring-async.js | 37 ------------------- benchmark/streams/readable-readstring-data.js | 32 ---------------- .../streams/readable-readstring-native.js | 25 ------------- benchmark/streams/readable-readstring-text.js | 28 -------------- test/parallel/test-http-unix-socket.js | 6 +-- 5 files changed, 3 insertions(+), 125 deletions(-) delete mode 100644 benchmark/streams/readable-readstring-async.js delete mode 100644 benchmark/streams/readable-readstring-data.js delete mode 100644 benchmark/streams/readable-readstring-native.js delete mode 100644 benchmark/streams/readable-readstring-text.js diff --git a/benchmark/streams/readable-readstring-async.js b/benchmark/streams/readable-readstring-async.js deleted file mode 100644 index f2eff78ba575f2..00000000000000 --- a/benchmark/streams/readable-readstring-async.js +++ /dev/null @@ -1,37 +0,0 @@ -'use strict'; - -const common = require('../common'); -const Readable = require('stream').Readable; - -const bench = common.createBenchmark(main, { - n: [50e2] -}); - -function main({ n }) { - const b = 'some random string'; - const s = new Readable(); - function noop() {} - s._read = noop; - - bench.start(); - - text(s).then((ret) => { - console.log(ret.length); - bench.end(n, ret); - }); - - for (let k = 0; k < n; ++k) { - for (let i = 0; i < 1e3; ++i) - s.push(b); - } - s.push(null); -} - -async function text(s) { - s.setEncoding('utf8'); - let ret = ''; - for await (const chunk of s) { - ret += chunk; - } - return ret; -} diff --git a/benchmark/streams/readable-readstring-data.js b/benchmark/streams/readable-readstring-data.js deleted file mode 100644 index 03d9e36d38befc..00000000000000 --- a/benchmark/streams/readable-readstring-data.js +++ /dev/null @@ -1,32 +0,0 @@ -'use strict'; - -const common = require('../common'); -const Readable = require('stream').Readable; - -const bench = common.createBenchmark(main, { - n: [50e2] -}); - -function main({ n }) { - const b = 'some random string'; - const s = new Readable(); - function noop() {} - s._read = noop; - - bench.start(); - - let ret = ''; - s.setEncoding('utf8'); - s.on('data', (chunk) => { - ret += chunk; - }).on('end', () => { - console.log(ret.length); - bench.end(n, ret); - }); - - for (let k = 0; k < n; ++k) { - for (let i = 0; i < 1e3; ++i) - s.push(b); - } - s.push(null); -} diff --git a/benchmark/streams/readable-readstring-native.js b/benchmark/streams/readable-readstring-native.js deleted file mode 100644 index 1374bcf53aff76..00000000000000 --- a/benchmark/streams/readable-readstring-native.js +++ /dev/null @@ -1,25 +0,0 @@ -'use strict'; - -const common = require('../common'); -const Readable = require('stream').Readable; - -const bench = common.createBenchmark(main, { - n: [50e2] -}); - -function main({ n }) { - const b = 'some random string'; - const s = new Readable(); - function noop() {} - s._read = noop; - - bench.start(); - - let ret = ''; - for (let k = 0; k < n; ++k) { - for (let i = 0; i < 1e3; ++i) - ret += b; - } - console.log(ret.length); - bench.end(n); -} diff --git a/benchmark/streams/readable-readstring-text.js b/benchmark/streams/readable-readstring-text.js deleted file mode 100644 index eba63f4077646e..00000000000000 --- a/benchmark/streams/readable-readstring-text.js +++ /dev/null @@ -1,28 +0,0 @@ -'use strict'; - -const common = require('../common'); -const Readable = require('stream').Readable; - -const bench = common.createBenchmark(main, { - n: [50e2] -}); - -function main({ n }) { - const b = 'some random string'; - const s = new Readable(); - function noop() {} - s._read = noop; - - bench.start(); - - s.text().then((ret) => { - console.log(ret.length); - bench.end(n); - }); - - for (let k = 0; k < n; ++k) { - for (let i = 0; i < 1e3; ++i) - s.push(b); - } - s.push(null); -} diff --git a/test/parallel/test-http-unix-socket.js b/test/parallel/test-http-unix-socket.js index f8362d6183d8e1..e65c42ff243fc9 100644 --- a/test/parallel/test-http-unix-socket.js +++ b/test/parallel/test-http-unix-socket.js @@ -48,15 +48,15 @@ server.listen(common.PIPE, common.mustCall(function() { assert.strictEqual(res.statusCode, 200); assert.strictEqual(res.headers['content-type'], 'text/plain'); - res.body = ''; + res._body = ''; res.setEncoding('utf8'); res.on('data', function(chunk) { - res.body += chunk; + res._body += chunk; }); res.on('end', common.mustCall(function() { - assert.strictEqual(res.body, 'hello world\n'); + assert.strictEqual(res._body, 'hello world\n'); server.close(common.mustCall(function(error) { assert.strictEqual(error, undefined); server.close(common.expectsError({ From bb368bf7f4cfa1d219fbe8da820d4d089f691618 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 30 Jul 2021 09:44:42 +0200 Subject: [PATCH 17/17] fixup: body compat --- lib/internal/streams/readable.js | 12 ++++++++++++ test/parallel/test-stream-readable-body-mixin.js | 10 ++++++++++ 2 files changed, 22 insertions(+) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index afb0e15a1df8e2..b1a12708a1fed2 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -79,6 +79,8 @@ const { validateObject } = require('internal/validators'); const kPaused = Symbol('kPaused'); const kConsume = Symbol('kConsume'); const kReading = Symbol('kReading'); +const kBody = Symbol('kBody'); +const kHasBody = Symbol('kHasBody'); const { StringDecoder } = require('string_decoder'); const from = require('internal/streams/from'); @@ -1335,11 +1337,21 @@ ObjectDefineProperties(Readable.prototype, { // https://fetch.spec.whatwg.org/#dom-body-body body: { get() { + // Compat + if (this[kHasBody]) { + return this[kBody]; + } + if (this[kConsume] && this[kConsume].type === kWebStreamType) { return this[kConsume].stream; } return consume(this, kWebStreamType); + }, + // Compat + set(value) { + this[kBody] = value; + this[kHasBody] = true; } } }); diff --git a/test/parallel/test-stream-readable-body-mixin.js b/test/parallel/test-stream-readable-body-mixin.js index 9c4f8bc4f046d7..7f76921f4107bc 100644 --- a/test/parallel/test-stream-readable-body-mixin.js +++ b/test/parallel/test-stream-readable-body-mixin.js @@ -353,3 +353,13 @@ for (const key of ['text', 'json', 'arrayBuffer', 'blob']) { r.on('close', common.mustCall()); } + +{ + const r = new Readable({ + read() { + this.push(null); + } + }); + r.body = 'asd'; + assert.strictEqual(r.body, 'asd'); +}