Skip to content

Commit 9525b0d

Browse files
RafaelGSSdanielleadams
authored andcommitted
stream: fix enqueue race condition on esm modules
stream: use nextTick on close PR-URL: #40901 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 8d8225b commit 9525b0d

File tree

2 files changed

+48
-7
lines changed

2 files changed

+48
-7
lines changed

lib/internal/webstreams/readablestream.js

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1431,13 +1431,18 @@ function readableStreamTee(stream, cloneForBranch2) {
14311431
});
14321432
},
14331433
[kClose]() {
1434-
reading = false;
1435-
if (!canceled1)
1436-
readableStreamDefaultControllerClose(branch1[kState].controller);
1437-
if (!canceled2)
1438-
readableStreamDefaultControllerClose(branch2[kState].controller);
1439-
if (!canceled1 || !canceled2)
1440-
cancelPromise.resolve();
1434+
// The `process.nextTick()` is not part of the spec.
1435+
// This approach was needed to avoid a race condition working with esm
1436+
// Further information, see: https://github.com/nodejs/node/issues/39758
1437+
process.nextTick(() => {
1438+
reading = false;
1439+
if (!canceled1)
1440+
readableStreamDefaultControllerClose(branch1[kState].controller);
1441+
if (!canceled2)
1442+
readableStreamDefaultControllerClose(branch2[kState].controller);
1443+
if (!canceled1 || !canceled2)
1444+
cancelPromise.resolve();
1445+
});
14411446
},
14421447
[kError]() {
14431448
reading = false;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { mustCall } from '../common/index.mjs';
2+
import { ReadableStream } from 'stream/web';
3+
import assert from 'assert';
4+
5+
{
6+
// Test tee() with close in the nextTick after enqueue
7+
async function read(stream) {
8+
const chunks = [];
9+
for await (const chunk of stream)
10+
chunks.push(chunk);
11+
return Buffer.concat(chunks).toString();
12+
}
13+
14+
const [r1, r2] = new ReadableStream({
15+
start(controller) {
16+
process.nextTick(() => {
17+
controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114]));
18+
19+
process.nextTick(() => {
20+
controller.close();
21+
});
22+
});
23+
}
24+
}).tee();
25+
26+
(async () => {
27+
const [dataReader1, dataReader2] = await Promise.all([
28+
read(r1),
29+
read(r2),
30+
]);
31+
32+
assert.strictEqual(dataReader1, dataReader2);
33+
assert.strictEqual(dataReader1, 'foobar');
34+
assert.strictEqual(dataReader2, 'foobar');
35+
})().then(mustCall());
36+
}

0 commit comments

Comments
 (0)