diff --git a/doc/api/stream.md b/doc/api/stream.md index 1e87af4d8f6677..a4f8ed6e5aedab 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2579,6 +2579,9 @@ further errors except from `_destroy()` may be emitted as `'error'`. -* `stream` {Stream} A readable and/or writable stream. +* `stream` {Stream|ReadableStream|WritableStream} + +A readable and/or writable stream/webstream. * `options` {Object} * `error` {boolean} If set to `false`, then a call to `emit('error', err)` is @@ -3022,10 +3027,16 @@ added: v17.0.0 * `src` {Stream|Blob|ArrayBuffer|string|Iterable|AsyncIterable| - AsyncGeneratorFunction|AsyncFunction|Promise|Object} + AsyncGeneratorFunction|AsyncFunction|Promise|Object| + ReadableStream|WritableStream} A utility method for creating duplex streams. @@ -3045,6 +3056,8 @@ A utility method for creating duplex streams. `writable` into `Stream` and then combines them into `Duplex` where the `Duplex` will write to the `writable` and read from the `readable`. * `Promise` converts into readable `Duplex`. Value `null` is ignored. +* `ReadableStream` converts into readable `Duplex`. +* `WritableStream` converts into writable `Duplex`. * Returns: {stream.Duplex} If an `Iterable` object containing promises is passed as an argument, diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index 13404e43bf1bae..788bcb63242c38 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -8,6 +8,8 @@ const { isReadableNodeStream, isWritableNodeStream, isDuplexNodeStream, + isReadableStream, + isWritableStream, } = require('internal/streams/utils'); const eos = require('internal/streams/end-of-stream'); const { @@ -20,6 +22,7 @@ const { const { destroyer } = require('internal/streams/destroy'); const Duplex = require('internal/streams/duplex'); const Readable = require('internal/streams/readable'); +const Writable = require('internal/streams/writable'); const { createDeferredPromise } = require('internal/util'); const from = require('internal/streams/from'); @@ -71,15 +74,13 @@ module.exports = function duplexify(body, name) { return _duplexify({ writable: false, readable: false }); } - // TODO: Webstreams - // if (isReadableStream(body)) { - // return _duplexify({ readable: Readable.fromWeb(body) }); - // } + if (isReadableStream(body)) { + return _duplexify({ readable: Readable.fromWeb(body) }); + } - // TODO: Webstreams - // if (isWritableStream(body)) { - // return _duplexify({ writable: Writable.fromWeb(body) }); - // } + if (isWritableStream(body)) { + return _duplexify({ writable: Writable.fromWeb(body) }); + } if (typeof body === 'function') { const { value, write, final, destroy } = fromAsyncGen(body); @@ -146,13 +147,12 @@ module.exports = function duplexify(body, name) { }); } - // TODO: Webstreams. - // if ( - // isReadableStream(body?.readable) && - // isWritableStream(body?.writable) - // ) { - // return Duplexify.fromWeb(body); - // } + if ( + isReadableStream(body?.readable) && + isWritableStream(body?.writable) + ) { + return Duplexify.fromWeb(body); + } if ( typeof body?.writable === 'object' || diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index c3f3dd756b2e66..e3c117ff8dedb0 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -3,6 +3,7 @@ const common = require('../common'); const assert = require('assert'); const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream'); +const { ReadableStream, WritableStream } = require('stream/web'); const { Blob } = require('buffer'); { @@ -299,3 +300,104 @@ const { Blob } = require('buffer'); assert.strictEqual(res, 'foobar'); })).on('close', common.mustCall()); } + +function makeATestReadableStream(value) { + return new ReadableStream({ + start(controller) { + controller.enqueue(value); + controller.close(); + } + }); +} + +function makeATestWritableStream(writeFunc) { + return new WritableStream({ + write(chunk) { + writeFunc(chunk); + } + }); +} + +{ + const d = Duplex.from({ + readable: makeATestReadableStream('foo'), + }); + assert.strictEqual(d.readable, true); + assert.strictEqual(d.writable, false); + + d.on('data', common.mustCall((data) => { + assert.strictEqual(data.toString(), 'foo'); + })); + + d.on('end', common.mustCall(() => { + assert.strictEqual(d.readable, false); + })); +} + +{ + const d = Duplex.from(makeATestReadableStream('foo')); + + assert.strictEqual(d.readable, true); + assert.strictEqual(d.writable, false); + + d.on('data', common.mustCall((data) => { + assert.strictEqual(data.toString(), 'foo'); + })); + + d.on('end', common.mustCall(() => { + assert.strictEqual(d.readable, false); + })); +} + +{ + let ret = ''; + const d = Duplex.from({ + writable: makeATestWritableStream((chunk) => ret += chunk), + }); + + assert.strictEqual(d.readable, false); + assert.strictEqual(d.writable, true); + + d.end('foo'); + d.on('finish', common.mustCall(() => { + assert.strictEqual(ret, 'foo'); + assert.strictEqual(d.writable, false); + })); +} + +{ + let ret = ''; + const d = Duplex.from(makeATestWritableStream((chunk) => ret += chunk)); + + assert.strictEqual(d.readable, false); + assert.strictEqual(d.writable, true); + + d.end('foo'); + d.on('finish', common.mustCall(() => { + assert.strictEqual(ret, 'foo'); + assert.strictEqual(d.writable, false); + })); +} + +{ + let ret = ''; + const d = Duplex.from({ + readable: makeATestReadableStream('foo'), + writable: makeATestWritableStream((chunk) => ret += chunk), + }); + + d.end('bar'); + + d.on('data', common.mustCall((data) => { + assert.strictEqual(data.toString(), 'foo'); + })); + + d.on('end', common.mustCall(() => { + assert.strictEqual(d.readable, false); + })); + + d.on('finish', common.mustCall(() => { + assert.strictEqual(ret, 'bar'); + assert.strictEqual(d.writable, false); + })); +}