@@ -1716,7 +1716,7 @@ for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
1716
1716
}
1717
1717
// With an asynchronous mapper, making at most 2 queries at a time.
1718
1718
const resolver = new Resolver ();
1719
- const dnsResults = await Readable .from ([
1719
+ const dnsResults = Readable .from ([
1720
1720
' nodejs.org' ,
1721
1721
' openjsf.org' ,
1722
1722
' www.linuxfoundation.org' ,
@@ -1761,7 +1761,7 @@ for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1761
1761
}
1762
1762
// With an asynchronous predicate, making at most 2 queries at a time.
1763
1763
const resolver = new Resolver ();
1764
- const dnsResults = await Readable .from ([
1764
+ const dnsResults = Readable .from ([
1765
1765
' nodejs.org' ,
1766
1766
' openjsf.org' ,
1767
1767
' www.linuxfoundation.org' ,
@@ -1775,6 +1775,65 @@ for await (const result of dnsResults) {
1775
1775
}
1776
1776
```
1777
1777
1778
+ ### ` readable.forEach(fn[, options]) `
1779
+
1780
+ <!-- YAML
1781
+ added: REPLACEME
1782
+ -->
1783
+
1784
+ > Stability: 1 - Experimental
1785
+
1786
+ * ` fn ` {Function|AsyncFunction} a function to call on each item of the stream.
1787
+ * ` data ` {any} a chunk of data from the stream.
1788
+ * ` options ` {Object}
1789
+ * ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
1790
+ abort the ` fn ` call early.
1791
+ * ` options ` {Object}
1792
+ * ` concurrency ` {number} the maximum concurrent invocation of ` fn ` to call
1793
+ on the stream at once. ** Default:** ` 1 ` .
1794
+ * ` signal ` {AbortSignal} allows destroying the stream if the signal is
1795
+ aborted.
1796
+ * Returns: {Promise} a promise for when the stream has finished.
1797
+
1798
+ This method allows iterating a stream. For each item in the stream the
1799
+ ` fn ` function will be called. If the ` fn ` function returns a promise - that
1800
+ promise will be ` await ` ed.
1801
+
1802
+ This method is different from ` for await...of ` loops in that it can optionally
1803
+ process items concurrently. In addition, a ` forEach ` iteration can only be
1804
+ stopped by having passed a ` signal ` option and aborting the related
1805
+ ` AbortController ` while ` for await...of ` can be stopped with ` break ` or
1806
+ ` return ` . In either case the stream will be destroyed.
1807
+
1808
+ This method is different from listening to the [ ` 'data' ` ] [ ] event in that it
1809
+ uses the [ ` readable ` ] [ ] event in the underlying machinary and can limit the
1810
+ number of concurrent ` fn ` calls.
1811
+
1812
+ ``` mjs
1813
+ import { Readable } from ' stream' ;
1814
+ import { Resolver } from ' dns/promises' ;
1815
+
1816
+ // With a synchronous predicate.
1817
+ for await (const item of Readable .from ([1 , 2 , 3 , 4 ]).filter ((x ) => x > 2 )) {
1818
+ console .log (item); // 3, 4
1819
+ }
1820
+ // With an asynchronous predicate, making at most 2 queries at a time.
1821
+ const resolver = new Resolver ();
1822
+ const dnsResults = Readable .from ([
1823
+ ' nodejs.org' ,
1824
+ ' openjsf.org' ,
1825
+ ' www.linuxfoundation.org' ,
1826
+ ]).map (async (domain ) => {
1827
+ const { address } = await resolver .resolve4 (domain, { ttl: true });
1828
+ return address;
1829
+ }, { concurrency: 2 });
1830
+ await dnsResults .forEach ((result ) => {
1831
+ // Logs result, similar to `for await (const result of dnsResults)`
1832
+ console .log (result);
1833
+ });
1834
+ console .log (' done' ); // Stream has finished
1835
+ ```
1836
+
1778
1837
### Duplex and transform streams
1779
1838
1780
1839
#### Class: ` stream.Duplex `
0 commit comments