Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions doc/api/webstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -1270,5 +1270,78 @@ added: REPLACEME

* Type: {WritableStream}

### Utility Consumers
<!-- YAML
added: REPLACEME
-->

The utility consumer functions provide common options for consuming
streams.

They are accessed using:

```mjs
import {
arrayBuffer,
blob,
json,
text,
} from 'node:stream/consumers';
```

```cjs
const {
arrayBuffer,
blob,
json,
text,
} = require('stream/consumers');
```

#### `streamConsumers.arrayBuffer(stream)`
<!-- YAML
added: REPLACEME
-->

* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with an `ArrayBuffer` containing the full
contents of the stream.

#### `streamConsumers.blob(stream)`
<!-- YAML
added: REPLACEME
-->

* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with a {Blob} containing the full contents
of the stream.

#### `streamConsumers.buffer(stream)`
<!-- YAML
added: REPLACEME
-->

* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with a {Buffer} containing the full
contents of the stream.

#### `streamConsumers.json(stream)`
<!-- YAML
added: REPLACEME
-->

* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with the contents of the stream parsed as a
UTF-8 encoded string that is then passed through `JSON.parse()`.

#### `streamConsumers.text(stream)`
<!-- YAML
added: REPLACEME
-->

* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with the contents of the stream parsed as a
UTF-8 encoded string.

[Streams]: stream.md
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/
84 changes: 84 additions & 0 deletions lib/stream/consumers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
'use strict';

const {
JSONParse,
} = primordials;

const {
TextDecoder,
} = require('internal/encoding');

const {
Blob,
} = require('internal/blob');

const {
Buffer,
} = require('buffer');

/**
* @typedef {import('../internal/webstreams/readablestream').ReadableStream
* } ReadableStream
* @typedef {import('../internal/streams/readable')} Readable
*/

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<Blob>}
*/
async function blob(stream) {
const chunks = [];
for await (const chunk of stream)
chunks.push(chunk);
return new Blob(chunks);
}

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<ArrayBuffer>}
*/
async function arrayBuffer(stream) {
const ret = await blob(stream);
return ret.arrayBuffer();
}

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<Buffer>}
*/
async function buffer(stream) {
return Buffer.from(await arrayBuffer(stream));
}

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<string>}
*/
async function text(stream) {
const dec = new TextDecoder();
let str = '';
for await (const chunk of stream) {
if (typeof chunk === 'string')
str += chunk;
else
str += dec.decode(chunk, { stream: true });
}
return str;
}

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<any>}
*/
async function json(stream) {
const str = await text(stream);
return JSONParse(str);
}

module.exports = {
arrayBuffer,
blob,
buffer,
text,
json,
};
234 changes: 234 additions & 0 deletions test/parallel/test-stream-consumers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Flags: --no-warnings
'use strict';

const common = require('../common');
const assert = require('assert');

const {
arrayBuffer,
blob,
buffer,
text,
json,
} = require('stream/consumers');

const {
PassThrough
} = require('stream');

const {
TransformStream,
} = require('stream/web');

const buf = Buffer.from('hellothere');
const kArrayBuffer =
buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);

{
const passthrough = new PassThrough();

blob(passthrough).then(common.mustCall(async (blob) => {
assert.strictEqual(blob.size, 10);
assert.deepStrictEqual(await blob.arrayBuffer(), kArrayBuffer);
}));

passthrough.write('hello');
setTimeout(() => passthrough.end('there'), 10);
}

{
const passthrough = new PassThrough();

arrayBuffer(passthrough).then(common.mustCall(async (ab) => {
assert.strictEqual(ab.byteLength, 10);
assert.deepStrictEqual(ab, kArrayBuffer);
}));

passthrough.write('hello');
setTimeout(() => passthrough.end('there'), 10);
}

{
const passthrough = new PassThrough();

buffer(passthrough).then(common.mustCall(async (buf) => {
assert.strictEqual(buf.byteLength, 10);
assert.deepStrictEqual(buf.buffer, kArrayBuffer);
}));

passthrough.write('hello');
setTimeout(() => passthrough.end('there'), 10);
}


{
const passthrough = new PassThrough();

text(passthrough).then(common.mustCall(async (str) => {
assert.strictEqual(str.length, 10);
assert.deepStrictEqual(str, 'hellothere');
}));

passthrough.write('hello');
setTimeout(() => passthrough.end('there'), 10);
}

{
const passthrough = new PassThrough();

json(passthrough).then(common.mustCall(async (str) => {
assert.strictEqual(str.length, 10);
assert.deepStrictEqual(str, 'hellothere');
}));

passthrough.write('"hello');
setTimeout(() => passthrough.end('there"'), 10);
}

{
const { writable, readable } = new TransformStream();

blob(readable).then(common.mustCall(async (blob) => {
assert.strictEqual(blob.size, 10);
assert.deepStrictEqual(await blob.arrayBuffer(), kArrayBuffer);
}));

const writer = writable.getWriter();
writer.write('hello');
setTimeout(() => {
writer.write('there');
writer.close();
}, 10);

assert.rejects(blob(readable), { code: 'ERR_INVALID_STATE' });
}

{
const { writable, readable } = new TransformStream();

arrayBuffer(readable).then(common.mustCall(async (ab) => {
assert.strictEqual(ab.byteLength, 10);
assert.deepStrictEqual(ab, kArrayBuffer);
}));

const writer = writable.getWriter();
writer.write('hello');
setTimeout(() => {
writer.write('there');
writer.close();
}, 10);

assert.rejects(arrayBuffer(readable), { code: 'ERR_INVALID_STATE' });
}

{
const { writable, readable } = new TransformStream();

text(readable).then(common.mustCall(async (str) => {
assert.strictEqual(str.length, 10);
assert.deepStrictEqual(str, 'hellothere');
}));

const writer = writable.getWriter();
writer.write('hello');
setTimeout(() => {
writer.write('there');
writer.close();
}, 10);

assert.rejects(text(readable), { code: 'ERR_INVALID_STATE' });
}

{
const { writable, readable } = new TransformStream();

json(readable).then(common.mustCall(async (str) => {
assert.strictEqual(str.length, 10);
assert.deepStrictEqual(str, 'hellothere');
}));

const writer = writable.getWriter();
writer.write('"hello');
setTimeout(() => {
writer.write('there"');
writer.close();
}, 10);

assert.rejects(json(readable), { code: 'ERR_INVALID_STATE' });
}

{
const stream = new PassThrough({
readableObjectMode: true,
writableObjectMode: true,
});

blob(stream).then(common.mustCall((blob) => {
assert.strictEqual(blob.size, 30);
}));

stream.write({});
stream.end({});
}

{
const stream = new PassThrough({
readableObjectMode: true,
writableObjectMode: true,
});

arrayBuffer(stream).then(common.mustCall((ab) => {
assert.strictEqual(ab.byteLength, 30);
assert.strictEqual(
Buffer.from(ab).toString(),
'[object Object][object Object]');
}));

stream.write({});
stream.end({});
}

{
const stream = new PassThrough({
readableObjectMode: true,
writableObjectMode: true,
});

buffer(stream).then(common.mustCall((buf) => {
assert.strictEqual(buf.byteLength, 30);
assert.strictEqual(
buf.toString(),
'[object Object][object Object]');
}));

stream.write({});
stream.end({});
}

{
const stream = new PassThrough({
readableObjectMode: true,
writableObjectMode: true,
});

assert.rejects(text(stream), {
code: 'ERR_INVALID_ARG_TYPE',
});

stream.write({});
stream.end({});
}

{
const stream = new PassThrough({
readableObjectMode: true,
writableObjectMode: true,
});

assert.rejects(json(stream), {
code: 'ERR_INVALID_ARG_TYPE',
});

stream.write({});
stream.end({});
}