Skip to content

Commit 42ad413

Browse files
authored
stream: add iterator helper find
Continue iterator-helpers work by adding `find` to readable streams. PR-URL: #41849 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 359c93d commit 42ad413

File tree

7 files changed

+282
-153
lines changed

7 files changed

+282
-153
lines changed

doc/api/stream.md

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1745,7 +1745,8 @@ added: v17.4.0
17451745

17461746
> Stability: 1 - Experimental
17471747
1748-
* `fn` {Function|AsyncFunction} a function to map over every item in the stream.
1748+
* `fn` {Function|AsyncFunction} a function to map over every chunk in the
1749+
stream.
17491750
* `data` {any} a chunk of data from the stream.
17501751
* `options` {Object}
17511752
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1758,16 +1759,16 @@ added: v17.4.0
17581759
* Returns: {Readable} a stream mapped with the function `fn`.
17591760

17601761
This method allows mapping over the stream. The `fn` function will be called
1761-
for every item in the stream. If the `fn` function returns a promise - that
1762+
for every chunk in the stream. If the `fn` function returns a promise - that
17621763
promise will be `await`ed before being passed to the result stream.
17631764

17641765
```mjs
17651766
import { Readable } from 'stream';
17661767
import { Resolver } from 'dns/promises';
17671768

17681769
// With a synchronous mapper.
1769-
for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
1770-
console.log(item); // 2, 4, 6, 8
1770+
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
1771+
console.log(chunk); // 2, 4, 6, 8
17711772
}
17721773
// With an asynchronous mapper, making at most 2 queries at a time.
17731774
const resolver = new Resolver();
@@ -1789,7 +1790,7 @@ added: v17.4.0
17891790

17901791
> Stability: 1 - Experimental
17911792
1792-
* `fn` {Function|AsyncFunction} a function to filter items from stream.
1793+
* `fn` {Function|AsyncFunction} a function to filter chunks from the stream.
17931794
* `data` {any} a chunk of data from the stream.
17941795
* `options` {Object}
17951796
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1801,8 +1802,8 @@ added: v17.4.0
18011802
aborted.
18021803
* Returns: {Readable} a stream filtered with the predicate `fn`.
18031804

1804-
This method allows filtering the stream. For each item in the stream the `fn`
1805-
function will be called and if it returns a truthy value, the item will be
1805+
This method allows filtering the stream. For each chunk in the stream the `fn`
1806+
function will be called and if it returns a truthy value, the chunk will be
18061807
passed to the result stream. If the `fn` function returns a promise - that
18071808
promise will be `await`ed.
18081809

@@ -1811,8 +1812,8 @@ import { Readable } from 'stream';
18111812
import { Resolver } from 'dns/promises';
18121813

18131814
// With a synchronous predicate.
1814-
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1815-
console.log(item); // 3, 4
1815+
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1816+
console.log(chunk); // 3, 4
18161817
}
18171818
// With an asynchronous predicate, making at most 2 queries at a time.
18181819
const resolver = new Resolver();
@@ -1838,7 +1839,7 @@ added: REPLACEME
18381839

18391840
> Stability: 1 - Experimental
18401841
1841-
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
1842+
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
18421843
* `data` {any} a chunk of data from the stream.
18431844
* `options` {Object}
18441845
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1850,12 +1851,12 @@ added: REPLACEME
18501851
aborted.
18511852
* Returns: {Promise} a promise for when the stream has finished.
18521853

1853-
This method allows iterating a stream. For each item in the stream the
1854+
This method allows iterating a stream. For each chunk in the stream the
18541855
`fn` function will be called. If the `fn` function returns a promise - that
18551856
promise will be `await`ed.
18561857

18571858
This method is different from `for await...of` loops in that it can optionally
1858-
process items concurrently. In addition, a `forEach` iteration can only be
1859+
process chunks concurrently. In addition, a `forEach` iteration can only be
18591860
stopped by having passed a `signal` option and aborting the related
18601861
`AbortController` while `for await...of` can be stopped with `break` or
18611862
`return`. In either case the stream will be destroyed.
@@ -1869,8 +1870,8 @@ import { Readable } from 'stream';
18691870
import { Resolver } from 'dns/promises';
18701871

18711872
// With a synchronous predicate.
1872-
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1873-
console.log(item); // 3, 4
1873+
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1874+
console.log(chunk); // 3, 4
18741875
}
18751876
// With an asynchronous predicate, making at most 2 queries at a time.
18761877
const resolver = new Resolver();
@@ -1935,7 +1936,7 @@ added: REPLACEME
19351936

19361937
> Stability: 1 - Experimental
19371938
1938-
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
1939+
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
19391940
* `data` {any} a chunk of data from the stream.
19401941
* `options` {Object}
19411942
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1976,6 +1977,56 @@ console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
19761977
console.log('done'); // Stream has finished
19771978
```
19781979

1980+
### `readable.find(fn[, options])`
1981+
1982+
<!-- YAML
1983+
added: REPLACEME
1984+
-->
1985+
1986+
> Stability: 1 - Experimental
1987+
1988+
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
1989+
* `data` {any} a chunk of data from the stream.
1990+
* `options` {Object}
1991+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1992+
abort the `fn` call early.
1993+
* `options` {Object}
1994+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
1995+
on the stream at once. **Default:** `1`.
1996+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1997+
aborted.
1998+
* Returns: {Promise} a promise evaluating to the first chunk for which `fn`
1999+
evaluated with a truthy value, or `undefined` if no element was found.
2000+
2001+
This method is similar to `Array.prototype.find` and calls `fn` on each chunk
2002+
in the stream to find a chunk with a truthy value for `fn`. Once an `fn` call's
2003+
awaited return value is truthy, the stream is destroyed and the promise is
2004+
fulfilled with value for which `fn` returned a truthy value. If all of the
2005+
`fn` calls on the chunks return a falsy value, the promise is fulfilled with
2006+
`undefined`.
2007+
2008+
```mjs
2009+
import { Readable } from 'stream';
2010+
import { stat } from 'fs/promises';
2011+
2012+
// With a synchronous predicate.
2013+
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
2014+
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
2015+
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined
2016+
2017+
// With an asynchronous predicate, making at most 2 file checks at a time.
2018+
const foundBigFile = await Readable.from([
2019+
'file1',
2020+
'file2',
2021+
'file3',
2022+
]).find(async (fileName) => {
2023+
const stats = await stat(fileName);
2024+
return stat.size > 1024 * 1024;
2025+
}, { concurrency: 2 });
2026+
console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
2027+
console.log('done'); // Stream has finished
2028+
```
2029+
19792030
### `readable.every(fn[, options])`
19802031

19812032
<!-- YAML
@@ -1984,7 +2035,7 @@ added: REPLACEME
19842035

19852036
> Stability: 1 - Experimental
19862037
1987-
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
2038+
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
19882039
* `data` {any} a chunk of data from the stream.
19892040
* `options` {Object}
19902041
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -2034,7 +2085,7 @@ added: REPLACEME
20342085
> Stability: 1 - Experimental
20352086
20362087
* `fn` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over
2037-
every item in the stream.
2088+
every chunk in the stream.
20382089
* `data` {any} a chunk of data from the stream.
20392090
* `options` {Object}
20402091
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -2058,8 +2109,8 @@ import { Readable } from 'stream';
20582109
import { createReadStream } from 'fs';
20592110

20602111
// With a synchronous mapper.
2061-
for await (const item of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
2062-
console.log(item); // 1, 1, 2, 2, 3, 3, 4, 4
2112+
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
2113+
console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
20632114
}
20642115
// With an asynchronous mapper, combine the contents of 4 files
20652116
const concatResult = Readable.from([

lib/internal/streams/operators.js

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -186,31 +186,9 @@ function asIndexedPairs(options = undefined) {
186186
}
187187

188188
async function some(fn, options) {
189-
if (options != null && typeof options !== 'object') {
190-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
191-
}
192-
if (options?.signal != null) {
193-
validateAbortSignal(options.signal, 'options.signal');
194-
}
195-
196-
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
197-
// Note that some does short circuit but also closes the iterator if it does
198-
const ac = new AbortController();
199-
if (options?.signal) {
200-
if (options.signal.aborted) {
201-
ac.abort();
202-
}
203-
options.signal.addEventListener('abort', () => ac.abort(), {
204-
[kWeakHandler]: this,
205-
once: true,
206-
});
207-
}
208-
const mapped = this.map(fn, { ...options, signal: ac.signal });
209-
for await (const result of mapped) {
210-
if (result) {
211-
ac.abort();
212-
return true;
213-
}
189+
// eslint-disable-next-line no-unused-vars
190+
for await (const unused of filter.call(this, fn, options)) {
191+
return true;
214192
}
215193
return false;
216194
}
@@ -226,6 +204,13 @@ async function every(fn, options) {
226204
}, options));
227205
}
228206

207+
async function find(fn, options) {
208+
for await (const result of filter.call(this, fn, options)) {
209+
return result;
210+
}
211+
return undefined;
212+
}
213+
229214
async function forEach(fn, options) {
230215
if (typeof fn !== 'function') {
231216
throw new ERR_INVALID_ARG_TYPE(
@@ -236,7 +221,7 @@ async function forEach(fn, options) {
236221
return kEmpty;
237222
}
238223
// eslint-disable-next-line no-unused-vars
239-
for await (const unused of this.map(forEachFn, options));
224+
for await (const unused of map.call(this, forEachFn, options));
240225
}
241226

242227
function filter(fn, options) {
@@ -250,7 +235,7 @@ function filter(fn, options) {
250235
}
251236
return kEmpty;
252237
}
253-
return this.map(filterFn, options);
238+
return map.call(this, filterFn, options);
254239
}
255240

256241
// Specific to provide better error to reduce since the argument is only
@@ -329,7 +314,7 @@ async function toArray(options) {
329314
}
330315

331316
function flatMap(fn, options) {
332-
const values = this.map(fn, options);
317+
const values = map.call(this, fn, options);
333318
return async function* flatMap() {
334319
for await (const val of values) {
335320
yield* val;
@@ -415,4 +400,5 @@ module.exports.promiseReturningOperators = {
415400
reduce,
416401
toArray,
417402
some,
403+
find,
418404
};

test/parallel/test-stream-filter.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,11 @@ const { setTimeout } = require('timers/promises');
9898
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true);
9999
assert.strictEqual(stream.readable, true);
100100
}
101+
{
102+
const stream = Readable.from([1, 2, 3, 4, 5]);
103+
Object.defineProperty(stream, 'map', {
104+
value: common.mustNotCall(() => {}),
105+
});
106+
// Check that map isn't getting called.
107+
stream.filter(() => true);
108+
}

test/parallel/test-stream-flatMap.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,11 @@ function oneTo5() {
121121
const stream = oneTo5().flatMap((x) => x);
122122
assert.strictEqual(stream.readable, true);
123123
}
124+
{
125+
const stream = oneTo5();
126+
Object.defineProperty(stream, 'map', {
127+
value: common.mustNotCall(() => {}),
128+
});
129+
// Check that map isn't getting called.
130+
stream.flatMap(() => true);
131+
}

test/parallel/test-stream-forEach.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,11 @@ const { setTimeout } = require('timers/promises');
8484
const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true);
8585
assert.strictEqual(typeof stream.then, 'function');
8686
}
87+
{
88+
const stream = Readable.from([1, 2, 3, 4, 5]);
89+
Object.defineProperty(stream, 'map', {
90+
value: common.mustNotCall(() => {}),
91+
});
92+
// Check that map isn't getting called.
93+
stream.forEach(() => true);
94+
}

0 commit comments

Comments
 (0)