diff --git a/benchmark/webstreams/pipe-to.js b/benchmark/webstreams/pipe-to.js index a849cea0124749..530795cc374c0b 100644 --- a/benchmark/webstreams/pipe-to.js +++ b/benchmark/webstreams/pipe-to.js @@ -6,7 +6,7 @@ const { } = require('node:stream/web'); const bench = common.createBenchmark(main, { - n: [5e6], + n: [1e5], highWaterMarkR: [512, 1024, 2048, 4096], highWaterMarkW: [512, 1024, 2048, 4096], }); diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index c916ccd20d2063..452c6d06df4352 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -26,6 +26,7 @@ const { SymbolToStringTag, Uint8Array, } = primordials; +const BufferList = require('internal/streams/buffer_list'); const { AbortError, @@ -813,6 +814,7 @@ class ReadIntoRequest { get promise() { return this[kState].promise; } } + class ReadableStreamDefaultReader { [kType] = 'ReadableStreamDefaultReader'; @@ -823,7 +825,7 @@ class ReadableStreamDefaultReader { if (!isReadableStream(stream)) throw new ERR_INVALID_ARG_TYPE('stream', 'ReadableStream', stream); this[kState] = { - readRequests: [], + readRequests: new BufferList(), stream: undefined, close: { promise: undefined, @@ -1958,9 +1960,12 @@ function readableStreamClose(stream) { reader[kState].close.resolve(); if (readableStreamHasDefaultReader(stream)) { - for (let n = 0; n < reader[kState].readRequests.length; n++) - reader[kState].readRequests[n][kClose](); - reader[kState].readRequests = []; + let start = reader[kState].readRequests.head; + while (start !== null) { + start.data[kClose](); + start = start.next; + } + reader[kState].readRequests.clear(); } } @@ -1982,9 +1987,12 @@ function readableStreamError(stream, error) { setPromiseHandled(reader[kState].close.promise); if (readableStreamHasDefaultReader(stream)) { - for (let n = 0; n < reader[kState].readRequests.length; n++) - reader[kState].readRequests[n][kError](error); - reader[kState].readRequests = []; + let start = reader[kState].readRequests.head; + while (start !== null) { + start.data[kError](error); + start = start.next; + } + reader[kState].readRequests.clear(); } else { assert(readableStreamHasBYOBReader(stream)); for (let n = 0; n < reader[kState].readIntoRequests.length; n++) @@ -2033,7 +2041,7 @@ function readableStreamFulfillReadRequest(stream, chunk, done) { reader, } = stream[kState]; assert(reader[kState].readRequests.length); - const readRequest = ArrayPrototypeShift(reader[kState].readRequests); + const readRequest = reader[kState].readRequests.shift(); // TODO(@jasnell): It's not clear under what exact conditions done // will be true here. The spec requires this check but none of the @@ -2061,7 +2069,7 @@ function readableStreamFulfillReadIntoRequest(stream, chunk, done) { function readableStreamAddReadRequest(stream, readRequest) { assert(readableStreamHasDefaultReader(stream)); assert(stream[kState].state === 'readable'); - ArrayPrototypePush(stream[kState].reader[kState].readRequests, readRequest); + stream[kState].reader[kState].readRequests.push(readRequest); } function readableStreamAddReadIntoRequest(stream, readIntoRequest) { @@ -2114,10 +2122,12 @@ function readableStreamDefaultReaderRelease(reader) { } function readableStreamDefaultReaderErrorReadRequests(reader, e) { - for (let n = 0; n < reader[kState].readRequests.length; ++n) { - reader[kState].readRequests[n][kError](e); + let start = reader[kState].readRequests.head; + while (start !== null) { + start.data[kError](e); + start = start.next; } - reader[kState].readRequests = []; + reader[kState].readRequests.clear(); } function readableStreamBYOBReaderRelease(reader) { @@ -2210,7 +2220,7 @@ function setupReadableStreamDefaultReader(reader, stream) { if (isReadableStreamLocked(stream)) throw new ERR_INVALID_STATE.TypeError('ReadableStream is locked'); readableStreamReaderGenericInitialize(reader, stream); - reader[kState].readRequests = []; + reader[kState].readRequests.clear(); } function readableStreamDefaultControllerClose(controller) { @@ -2379,7 +2389,7 @@ function setupReadableStreamDefaultController( pullAgain: false, pullAlgorithm, pulling: false, - queue: [], + queue: new BufferList(), queueTotalSize: 0, started: false, sizeAlgorithm, @@ -2808,13 +2818,11 @@ function readableByteStreamControllerEnqueueChunkToQueue( buffer, byteOffset, byteLength) { - ArrayPrototypePush( - controller[kState].queue, - { - buffer, - byteOffset, - byteLength, - }); + controller[kState].queue.push({ + buffer, + byteOffset, + byteLength, + }); controller[kState].queueTotalSize += byteLength; } @@ -2868,7 +2876,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue( } = controller[kState]; while (totalBytesToCopyRemaining) { - const headOfQueue = queue[0]; + const headOfQueue = queue.head.data; const bytesToCopy = MathMin( totalBytesToCopyRemaining, headOfQueue.byteLength); @@ -2886,7 +2894,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue( headOfQueue.byteOffset, bytesToCopy); if (headOfQueue.byteLength === bytesToCopy) { - ArrayPrototypeShift(queue); + queue.shift(); } else { headOfQueue.byteOffset += bytesToCopy; headOfQueue.byteLength -= bytesToCopy; @@ -3087,7 +3095,7 @@ function readableByteStreamControllerFillReadRequestFromQueue(controller, readRe buffer, byteOffset, byteLength, - } = ArrayPrototypeShift(queue); + } = queue.shift(); controller[kState].queueTotalSize -= byteLength; readableByteStreamControllerHandleQueueDrain(controller); @@ -3103,13 +3111,15 @@ function readableByteStreamControllerProcessReadRequestsUsingQueue(controller) { const { reader } = stream[kState]; assert(isReadableStreamDefaultReader(reader)); + // TODO - may be able to change this to next next next while (reader[kState].readRequests.length > 0) { + // TODO - why this is in the while loop? if (queueTotalSize === 0) { return; } readableByteStreamControllerFillReadRequestFromQueue( controller, - ArrayPrototypeShift(reader[kState].readRequests), + reader[kState].readRequests.shift(), ); } } @@ -3177,7 +3187,7 @@ function setupReadableByteStreamController( pulling: false, started: false, stream, - queue: [], + queue: new BufferList(), queueTotalSize: 0, highWaterMark, pullAlgorithm, diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index 1979c55667b167..0895132b773de1 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -2,8 +2,6 @@ const { ArrayBufferPrototypeSlice, - ArrayPrototypePush, - ArrayPrototypeShift, AsyncIteratorPrototype, FunctionPrototypeCall, MathMax, @@ -146,7 +144,7 @@ function dequeueValue(controller) { const { value, size, - } = ArrayPrototypeShift(controller[kState].queue); + } = controller[kState].queue.shift(); controller[kState].queueTotalSize = MathMax(0, controller[kState].queueTotalSize - size); return value; @@ -155,7 +153,7 @@ function dequeueValue(controller) { function resetQueue(controller) { assert(controller[kState].queue !== undefined); assert(controller[kState].queueTotalSize !== undefined); - controller[kState].queue = []; + controller[kState].queue.clear(); controller[kState].queueTotalSize = 0; } @@ -163,7 +161,8 @@ function peekQueueValue(controller) { assert(controller[kState].queue !== undefined); assert(controller[kState].queueTotalSize !== undefined); assert(controller[kState].queue.length); - return controller[kState].queue[0].value; + debugger; + return controller[kState].queue.head.data.value; } function enqueueValueWithSize(controller, value, size) { @@ -176,7 +175,7 @@ function enqueueValueWithSize(controller, value, size) { size === Infinity) { throw new ERR_INVALID_ARG_VALUE.RangeError('size', size); } - ArrayPrototypePush(controller[kState].queue, { value, size }); + controller[kState].queue.push({ value, size }); controller[kState].queueTotalSize += size; } diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 46d6ae28772c32..2e3ee5b76b9f96 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -78,6 +78,8 @@ const { const assert = require('internal/assert'); +const BufferList = require('internal/streams/buffer_list'); + const kAbort = Symbol('kAbort'); const kCloseSentinel = Symbol('kCloseSentinel'); const kError = Symbol('kError'); @@ -1269,7 +1271,7 @@ function setupWritableStreamDefaultController( abortAlgorithm, closeAlgorithm, highWaterMark, - queue: [], + queue: new BufferList(), queueTotalSize: 0, abortController: new AbortController(), sizeAlgorithm,