Skip to content

Commit abc1d0b

Browse files
committed
stream: avoid drain for sync streams
Previously a sync writable receiving chunks larger than highwatermark would unecessarily ping pong needDrain.
1 parent 4a6a5c3 commit abc1d0b

7 files changed

+24
-16
lines changed

benchmark/streams/writable-manywrites.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ const bench = common.createBenchmark(main, {
77
n: [2e6],
88
sync: ['yes', 'no'],
99
writev: ['yes', 'no'],
10-
callback: ['yes', 'no']
10+
callback: ['yes', 'no'],
11+
len: [1024, 32 * 1024]
1112
});
1213

13-
function main({ n, sync, writev, callback }) {
14-
const b = Buffer.allocUnsafe(1024);
14+
function main({ n, sync, writev, callback, len }) {
15+
const b = Buffer.allocUnsafe(len);
1516
const s = new Writable();
1617
sync = sync === 'yes';
1718

lib/_stream_writable.js

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -344,11 +344,6 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
344344

345345
state.length += len;
346346

347-
const ret = state.length < state.highWaterMark;
348-
// We must ensure that previous needDrain will not be reset to false.
349-
if (!ret)
350-
state.needDrain = true;
351-
352347
if (state.writing || state.corked || state.errored) {
353348
const last = state.lastBufferedRequest;
354349
state.lastBufferedRequest = {
@@ -367,6 +362,12 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
367362
doWrite(stream, state, false, len, chunk, encoding, cb);
368363
}
369364

365+
const ret = state.length < state.highWaterMark;
366+
367+
// We must ensure that previous needDrain will not be reset to false.
368+
if (!ret)
369+
state.needDrain = true;
370+
370371
// Return false if errored or destroyed in order to break
371372
// any synchronous while(stream.write(data)) loops.
372373
return ret && !state.errored && !state.destroyed;

test/parallel/test-stream-big-packet.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ class TestStream extends stream.Transform {
3636
}
3737
}
3838

39-
const s1 = new stream.PassThrough();
39+
const s1 = new stream.Transform({
40+
transform(chunk, encoding, cb) {
41+
process.nextTick(cb, null, chunk);
42+
}
43+
});
4044
const s2 = new stream.PassThrough();
4145
const s3 = new TestStream();
4246
s1.pipe(s3);

test/parallel/test-stream-catch-rejections.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const assert = require('assert');
3030
captureRejections: true,
3131
highWaterMark: 1,
3232
write(chunk, enc, cb) {
33-
cb();
33+
process.nextTick(cb);
3434
}
3535
});
3636

test/parallel/test-stream-pipe-await-drain-push-while-write.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const writable = new stream.Writable({
1919
});
2020
}
2121

22-
cb();
22+
process.nextTick(cb);
2323
}, 3)
2424
});
2525

test/parallel/test-stream-pipe-await-drain.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ reader._read = () => {};
1919

2020
writer1._write = common.mustCall(function(chunk, encoding, cb) {
2121
this.emit('chunk-received');
22-
cb();
22+
process.nextTick(cb);
2323
}, 1);
2424

2525
writer1.once('chunk-received', () => {
@@ -42,7 +42,7 @@ writer2._write = common.mustCall((chunk, encoding, cb) => {
4242
reader._readableState.awaitDrainWriters.size,
4343
1,
4444
'awaitDrain should be 1 after first push, actual is ' +
45-
reader._readableState.awaitDrainWriters
45+
reader._readableState.awaitDrainWriters.size
4646
);
4747
// Not calling cb here to "simulate" slow stream.
4848
// This should be called exactly once, since the first .write() call
@@ -54,7 +54,7 @@ writer3._write = common.mustCall((chunk, encoding, cb) => {
5454
reader._readableState.awaitDrainWriters.size,
5555
2,
5656
'awaitDrain should be 2 after second push, actual is ' +
57-
reader._readableState.awaitDrainWriters
57+
reader._readableState.awaitDrainWriters.size
5858
);
5959
// Not calling cb here to "simulate" slow stream.
6060
// This should be called exactly once, since the first .write() call

test/parallel/test-stream-writable-needdrain-state.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ const transform = new stream.Transform({
1010
});
1111

1212
function _transform(chunk, encoding, cb) {
13-
assert.strictEqual(transform._writableState.needDrain, true);
14-
cb();
13+
process.nextTick(() => {
14+
assert.strictEqual(transform._writableState.needDrain, true);
15+
cb();
16+
});
1517
}
1618

1719
assert.strictEqual(transform._writableState.needDrain, false);

0 commit comments

Comments
 (0)