Skip to content

Commit aafca2a

Browse files
committed
stream: add reduce
1 parent a8afe26 commit aafca2a

File tree

3 files changed

+207
-3
lines changed

3 files changed

+207
-3
lines changed

doc/api/stream.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2118,6 +2118,49 @@ import { Readable } from 'stream';
21182118
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
21192119
```
21202120

2121+
### `readable.reduce(fn[, initial[, options])`
2122+
2123+
<!-- YAML
2124+
added: v17.4.0
2125+
-->
2126+
2127+
> Stability: 1 - Experimental
2128+
2129+
* `fn` {Function|AsyncFunction} a reducer function to call over every chunk
2130+
in the stream.
2131+
* `previous` {any} the value obtained from the last call to `fn` or the
2132+
`initial` value if specified or the first chunk of the stream otherwise.
2133+
* `data` {any} a chunk of data from the stream.
2134+
* `options` {Object}
2135+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2136+
abort the `fn` call early.
2137+
* `initial` {any} the initial value to use in the reduction.
2138+
* `options` {Object}
2139+
* `signal` {AbortSignal} allows destroying the stream if the signal is
2140+
aborted.
2141+
* Returns: {Promise} a promise for the final value of the reduction.
2142+
2143+
This method calls `fn` on each chunk of the stream in order, passing it the
2144+
result from the calculation on the previous element. It returns a promise for
2145+
the final value of the reduction.
2146+
2147+
The reducer function iterates the stream element-by-element which means that
2148+
there is no `concurrency` parameter or parallism. To perform a `reduce`
2149+
concurrently, it can be chained to the [`readable.map`][] method.
2150+
2151+
If no `initial` value is supplied the first chunk of the stream is used as the
2152+
initial value. If the stream is empty, the promise is rejected with a
2153+
`TypeError` with the `ERR_INVALID_ARGS` code property.
2154+
2155+
```mjs
2156+
import { Readable } from 'stream';
2157+
2158+
const ten = await Readable.from([1, 2, 3, 4]).reduce((previous, data) => {
2159+
return previous + data;
2160+
});
2161+
console.log(ten); // 10
2162+
```
2163+
21212164
### Duplex and transform streams
21222165

21232166
#### Class: `stream.Duplex`
@@ -4193,6 +4236,7 @@ contain multi-byte characters.
41934236
[`process.stdin`]: process.md#processstdin
41944237
[`process.stdout`]: process.md#processstdout
41954238
[`readable._read()`]: #readable_readsize
4239+
[`readable.map`]: #readablemapfn-options
41964240
[`readable.push('')`]: #readablepush
41974241
[`readable.setEncoding()`]: #readablesetencodingencoding
41984242
[`stream.Readable.from()`]: #streamreadablefromiterable-options

lib/internal/streams/operators.js

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const {
66
codes: {
77
ERR_INVALID_ARG_TYPE,
88
ERR_OUT_OF_RANGE,
9+
ERR_MISSING_ARGS,
910
},
1011
AbortError,
1112
} = require('internal/errors');
@@ -186,8 +187,8 @@ async function every(fn, options) {
186187
'fn', ['Function', 'AsyncFunction'], fn);
187188
}
188189
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
189-
return !(await some.call(this, async (x) => {
190-
return !(await fn(x));
190+
return !(await some.call(this, async (...args) => {
191+
return !(await fn(...args));
191192
}, options));
192193
}
193194

@@ -218,11 +219,57 @@ async function * filter(fn, options) {
218219
yield* this.map(filterFn, options);
219220
}
220221

222+
// Specific to provide better error to reduce since the argument is only
223+
// missing if the stream has no items in it - but the code is still appropriate
224+
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
225+
constructor() {
226+
super('reduce');
227+
this.message = 'Reduce of an empty stream requires an initial value';
228+
}
229+
}
230+
231+
async function reduce(reducer, initialValue, options) {
232+
if (typeof reducer !== 'function') {
233+
throw new ERR_INVALID_ARG_TYPE(
234+
'reducer', ['Function', 'AsyncFunction'], reducer);
235+
}
236+
let hasInitialValue = arguments.length > 1;
237+
if (options?.signal?.aborted) {
238+
const err = new AbortError(undefined, { cause: options.signal.reason });
239+
this.once('error', () => {}); // The error is already propagated
240+
this.destroy(err);
241+
throw err;
242+
}
243+
const ac = new AbortController();
244+
const signal = ac.signal;
245+
let gotAnyItemFromStream = false;
246+
try {
247+
for await (const value of this) {
248+
gotAnyItemFromStream = true;
249+
if (options?.signal?.aborted) {
250+
throw new AbortError();
251+
}
252+
if (!hasInitialValue) {
253+
initialValue = value;
254+
hasInitialValue = true;
255+
} else {
256+
initialValue = await reducer(initialValue, value, { signal });
257+
}
258+
}
259+
if (!gotAnyItemFromStream && !hasInitialValue) {
260+
throw new ReduceAwareErrMissingArgs();
261+
}
262+
} finally {
263+
ac.abort();
264+
}
265+
return initialValue;
266+
}
267+
221268
async function toArray(options) {
222269
const result = [];
223270
for await (const val of this) {
224271
if (options?.signal?.aborted) {
225-
throw new AbortError({ cause: options.signal.reason });
272+
throw new AbortError(undefined, { cause: options.signal.reason });
226273
}
227274
ArrayPrototypePush(result, val);
228275
}
@@ -296,6 +343,7 @@ module.exports.streamReturningOperators = {
296343
module.exports.promiseReturningOperators = {
297344
every,
298345
forEach,
346+
reduce,
299347
toArray,
300348
some,
301349
};

test/parallel/test-stream-reduce.js

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
9+
function sum(p, c) {
10+
return p + c;
11+
}
12+
13+
{
14+
// Does the same thing as `(await stream.toArray()).reduce(...)`
15+
(async () => {
16+
const tests = [
17+
[[], sum, 0],
18+
[[1], sum, 0],
19+
[[1, 2, 3, 4, 5], sum, 0],
20+
[Array(100).fill().map((_, i) => i), sum, 0],
21+
[['a', 'b', 'c'], sum, ''],
22+
[[1, 2], sum],
23+
[[1, 2, 3], (x, y) => y],
24+
];
25+
for (const [values, fn, initial] of tests) {
26+
const streamReduce = await Readable.from(values)
27+
.reduce(fn, initial);
28+
const arrayReduce = values.reduce(fn, initial);
29+
assert.deepStrictEqual(streamReduce, arrayReduce);
30+
}
31+
// Does the same thing as `(await stream.toArray()).reduce(...)` with an
32+
// asynchronous reducer
33+
for (const [values, fn, initial] of tests) {
34+
const streamReduce = await Readable.from(values)
35+
.map(async (x) => x)
36+
.reduce(fn, initial);
37+
const arrayReduce = values.reduce(fn, initial);
38+
assert.deepStrictEqual(streamReduce, arrayReduce);
39+
}
40+
})().then(common.mustCall());
41+
}
42+
{
43+
// Works with an async reducer, with or without initial value
44+
(async () => {
45+
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c, 0);
46+
assert.strictEqual(six, 6);
47+
})().then(common.mustCall());
48+
(async () => {
49+
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c);
50+
assert.strictEqual(six, 6);
51+
})().then(common.mustCall());
52+
}
53+
{
54+
// Works lazily
55+
assert.rejects(Readable.from([1, 2, 3, 4, 5, 6])
56+
.map(common.mustCall((x) => {
57+
return x;
58+
}, 3)) // Two consumed and one buffered by `map` due to default concurrency
59+
.reduce(async (p, c) => {
60+
if (p === 1) {
61+
throw new Error('boom');
62+
}
63+
return c;
64+
}, 0)
65+
, /boom/).then(common.mustCall());
66+
}
67+
68+
{
69+
// Support for AbortSignal
70+
const ac = new AbortController();
71+
assert.rejects(async () => {
72+
await Readable.from([1, 2, 3]).reduce(async (p, c) => {
73+
if (c === 3) {
74+
await new Promise(() => {}); // Explicitly do not pass signal here
75+
}
76+
return Promise.resolve();
77+
}, 0, { signal: ac.signal });
78+
}, {
79+
name: 'AbortError',
80+
}).then(common.mustCall());
81+
ac.abort();
82+
}
83+
84+
85+
{
86+
// Support for AbortSignal - pre aborted
87+
const stream = Readable.from([1, 2, 3]);
88+
assert.rejects(async () => {
89+
await stream.reduce(async (p, c) => {
90+
if (c === 3) {
91+
await new Promise(() => {}); // Explicitly do not pass signal here
92+
}
93+
return Promise.resolve();
94+
}, 0, { signal: AbortSignal.abort() });
95+
}, {
96+
name: 'AbortError',
97+
}).then(common.mustCall(() => {
98+
assert.strictEqual(stream.destroyed, true);
99+
}));
100+
}
101+
102+
{
103+
// Error cases
104+
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
105+
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
106+
}
107+
108+
{
109+
// Test result is a Promise
110+
const result = Readable.from([1, 2, 3, 4, 5]).reduce(sum, 0);
111+
assert.ok(result instanceof Promise);
112+
}

0 commit comments

Comments
 (0)