Skip to content

Commit e038fba

Browse files
committed
fixup
1 parent dccc0e7 commit e038fba

File tree

3 files changed

+222
-55
lines changed

3 files changed

+222
-55
lines changed

lib/internal/streams/destroy.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,8 @@ function destroyer(stream, err) {
356356
}
357357

358358
module.exports = {
359+
kConstruct,
360+
kDestroy,
359361
construct,
360362
destroyer,
361363
destroy,

lib/internal/streams/readable.js

Lines changed: 215 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const {
3535
SymbolAsyncIterator,
3636
Symbol,
3737
TypeError,
38+
Uint8Array,
3839
} = primordials;
3940

4041
module.exports = Readable;
@@ -45,6 +46,8 @@ const { Stream, prependListener } = require('internal/streams/legacy');
4546
const { Buffer } = require('buffer');
4647

4748
let Blob;
49+
let ReadableStream;
50+
let CountQueuingStrategy;
4851

4952
const {
5053
addAbortSignal,
@@ -75,9 +78,11 @@ const { validateObject } = require('internal/validators');
7578

7679
const kPaused = Symbol('kPaused');
7780
const kConsume = Symbol('kConsume');
81+
const kReading = Symbol('kReading');
7882

7983
const { StringDecoder } = require('string_decoder');
8084
const from = require('internal/streams/from');
85+
const assert = require('internal/assert');
8186

8287
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
8388
ObjectSetPrototypeOf(Readable, Stream);
@@ -213,6 +218,7 @@ function Readable(options) {
213218
}
214219

215220
this[kConsume] = null;
221+
this[kReading] = false; // Is stream being consumed through Readable API?
216222

217223
Stream.call(this, options);
218224

@@ -238,6 +244,11 @@ Readable.prototype[EE.captureRejectionSymbol] = function(err) {
238244
// similar to how Writable.write() returns true if you should
239245
// write() some more.
240246
Readable.prototype.push = function(chunk, encoding) {
247+
if (this[kConsume] && chunk !== null && !this[kReading]) {
248+
encoding = encoding || this._readableState.defaultEncoding;
249+
return this[kConsume].push(chunk, encoding);
250+
}
251+
241252
return readableAddChunk(this, chunk, encoding, false);
242253
};
243254

@@ -307,10 +318,12 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
307318
maybeReadMore(stream, state);
308319
}
309320

321+
const consumed = this[kConsume] ? this[kConsume].push(chunk, encoding) : true;
322+
310323
// We can push more data if we are below the highWaterMark.
311324
// Also, if we have no data yet, we can stand some more bytes.
312325
// This is to work around cases where hwm=0, such as the repl.
313-
return !state.ended &&
326+
return consumed && !state.ended &&
314327
(state.length < state.highWaterMark || state.length === 0);
315328
}
316329

@@ -402,6 +415,27 @@ function howMuchToRead(n, state) {
402415
return state.ended ? state.length : 0;
403416
}
404417

418+
419+
function _read(self, n) {
420+
// Call internal read method
421+
try {
422+
const result = self._read(n);
423+
if (result != null) {
424+
const then = result.then;
425+
if (typeof then === 'function') {
426+
then.call(
427+
result,
428+
nop,
429+
function(err) {
430+
errorOrDestroy(self, err);
431+
});
432+
}
433+
}
434+
} catch (err) {
435+
errorOrDestroy(self, err);
436+
}
437+
}
438+
405439
// You can override either this method, or the async _read(n) below.
406440
Readable.prototype.read = function(n) {
407441
debug('read', n);
@@ -496,22 +530,7 @@ Readable.prototype.read = function(n) {
496530
state.needReadable = true;
497531

498532
// Call internal read method
499-
try {
500-
const result = this._read(state.highWaterMark);
501-
if (result != null) {
502-
const then = result.then;
503-
if (typeof then === 'function') {
504-
then.call(
505-
result,
506-
nop,
507-
function(err) {
508-
errorOrDestroy(this, err);
509-
});
510-
}
511-
}
512-
} catch (err) {
513-
errorOrDestroy(this, err);
514-
}
533+
_read(this, state.highWaterMark);
515534

516535
state.sync = false;
517536
// If _read pushed data synchronously, then `reading` will be false,
@@ -906,6 +925,8 @@ Readable.prototype.on = function(ev, fn) {
906925
const state = this._readableState;
907926

908927
if (ev === 'data') {
928+
this[kReading] = true;
929+
909930
// Update readableListening so that resume() may be a no-op
910931
// a few lines down. This is needed to support once('readable').
911932
state.readableListening = this.listenerCount('readable') > 0;
@@ -914,6 +935,8 @@ Readable.prototype.on = function(ev, fn) {
914935
if (state.flowing !== false)
915936
this.resume();
916937
} else if (ev === 'readable') {
938+
this[kReading] = true;
939+
917940
if (!state.endEmitted && !state.readableListening) {
918941
state.readableListening = state.needReadable = true;
919942
state.flowing = false;
@@ -1310,7 +1333,7 @@ ObjectDefineProperties(ReadableState.prototype, {
13101333
body: {
13111334
get() {
13121335
if (this[kConsume]?.type === kWebStreamType) {
1313-
return this[kConsume].body;
1336+
return this[kConsume].stream;
13141337
}
13151338

13161339
return consume(this, kWebStreamType);
@@ -1343,8 +1366,7 @@ ObjectDefineProperties(ReadableState.prototype, {
13431366
});
13441367

13451368
function isLocked(self) {
1346-
return self[kConsume] &&
1347-
(self[kConsume].type !== kWebStreamType || self[kConsume].body.locked);
1369+
return self[kConsume]?.stream?.locked === true;
13481370
}
13491371

13501372
// https://streams.spec.whatwg.org/#readablestream-disturbed
@@ -1363,56 +1385,193 @@ function consume(self, type) {
13631385
}
13641386

13651387
if (type === kWebStreamType) {
1366-
self[kConsume] = {
1388+
if (!ReadableStream) {
1389+
ReadableStream = require('internal/webstreams/readablestream')
1390+
.ReadableStream;
1391+
}
1392+
1393+
const objectMode = self.readableObjectMode;
1394+
const highWaterMark = self.readableHighWaterMark;
1395+
// When not running in objectMode explicitly, we just fall
1396+
// back to a minimal strategy that just specifies the highWaterMark
1397+
// and no size algorithm. Using a ByteLengthQueuingStrategy here
1398+
// is unnecessary.
1399+
let strategy;
1400+
if (objectMode) {
1401+
if (!CountQueuingStrategy) {
1402+
CountQueuingStrategy = require('internal/webstreams/queuingstrategies');
1403+
}
1404+
strategy = new CountQueuingStrategy({ highWaterMark });
1405+
} else {
1406+
strategy = { highWaterMark };
1407+
}
1408+
1409+
self
1410+
.on('error', function(err) {
1411+
const { controller } = this[kConsume];
1412+
controller.error(err);
1413+
})
1414+
.on('close', function() {
1415+
const { controller } = this[kConsume];
1416+
if (controller) {
1417+
controller.error(new AbortError());
1418+
}
1419+
});
1420+
1421+
const consume = self[kConsume] = {
13671422
type,
1368-
body: Readable.toWeb(self)
1423+
objectMode,
1424+
controller: null,
1425+
push(chunk) {
1426+
const { objectMode, controller } = this;
1427+
1428+
assert(controller);
1429+
1430+
if (chunk === null) {
1431+
controller.close();
1432+
this.controller = null;
1433+
} else {
1434+
if (!objectMode) {
1435+
if (typeof chunk === 'string') {
1436+
chunk = new Uint8Array(Buffer.from(chunk));
1437+
} else if (Buffer.isBuffer(chunk)) {
1438+
// Copy the Buffer to detach it from the pool.
1439+
chunk = new Uint8Array(chunk);
1440+
} else if (Stream._isUint8Array(chunk)) {
1441+
// Do nothing...
1442+
} else if (chunk != null) {
1443+
throw new ERR_INVALID_ARG_TYPE(
1444+
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
1445+
}
1446+
}
1447+
1448+
// TODO: Does controller perform any type checks?
1449+
controller.enqueue(chunk);
1450+
}
1451+
1452+
return controller.desiredSize > 0;
1453+
},
1454+
stream: new ReadableStream({
1455+
async start(controller) {
1456+
consume.controller = controller;
1457+
1458+
const { _readableState: state } = self;
1459+
1460+
if (self[kReading]) {
1461+
while (controller.desiredSize > 0) {
1462+
const chunk = self.read();
1463+
if (chunk === null) {
1464+
break;
1465+
}
1466+
controller.enqueue(chunk);
1467+
}
1468+
} else {
1469+
const buffer = state.buffer;
1470+
while (buffer.length) {
1471+
controller.enqueue(buffer.shift());
1472+
}
1473+
state.lenth = 0;
1474+
}
1475+
1476+
if (state.ended) {
1477+
controller.close();
1478+
}
1479+
1480+
if (!state.constructed) {
1481+
await EE.once(destroyImpl.kConstruct, self);
1482+
}
1483+
},
1484+
pull() {
1485+
const { _readableState: state } = self;
1486+
1487+
const n = consume.controller.desiredSize;
1488+
1489+
if (self[kReading]) {
1490+
assert(state.length === 0);
1491+
self.read(n);
1492+
} else {
1493+
_read(self, n);
1494+
}
1495+
},
1496+
cancel(reason) {
1497+
self.destroy(reason);
1498+
},
1499+
}, strategy)
13691500
};
13701501

1371-
return self[kConsume].body;
1502+
return consume.stream;
13721503
}
13731504

13741505
return new Promise((resolve, reject) => {
13751506
self[kConsume] = {
13761507
type,
13771508
resolve,
13781509
reject,
1379-
body: type === kTextType || type === kJSONType ? '' : []
1380-
};
1381-
self
1382-
.on('error', reject)
1383-
.on('data', function(val) {
1384-
const { type } = this[kConsume];
1510+
decoder: null,
1511+
body: type === kTextType || type === kJSONType ? '' : [],
1512+
push(chunk, encoding) {
1513+
const { type, body, resolve, decoder } = this[kConsume];
1514+
1515+
if (chunk === null) {
1516+
try {
1517+
if (type === kTextType) {
1518+
resolve(body + (decoder ? decoder.end() : ''));
1519+
} else if (type === kJSONType) {
1520+
resolve(JSONParse(body + (decoder ? decoder.end() : '')));
1521+
} else if (type === kArrayBufferType) {
1522+
resolve(Buffer.concat(body).buffer);
1523+
} else if (type === kBlobType) {
1524+
if (!Blob) {
1525+
Blob = require('buffer').Blob;
1526+
}
1527+
resolve(new Blob(body));
1528+
}
13851529

1386-
// TODO (fix): Do we need type check and/or conversion?
1530+
this[kConsume].body = null;
1531+
} catch (err) {
1532+
self.destroy(err);
1533+
}
1534+
} else if (type === kTextType || type === kJSONType) {
1535+
if (typeof chunk === 'string') {
1536+
if (decoder) {
1537+
chunk = decoder.write(Buffer.from(chunk));
1538+
}
1539+
// TODO: Encoding check/transform?
1540+
} else if (chunk instanceof Buffer) {
1541+
if (!decoder) {
1542+
this[kConsume].decoder = new StringDecoder('utf8');
1543+
}
1544+
encoding = decoder.write(chunk);
1545+
} else if (Stream._isUint8Array(chunk)) {
1546+
encoding = decoder.write(Stream._uint8ArrayToBuffer(chunk));
1547+
} else {
1548+
throw new ERR_INVALID_ARG_TYPE(
1549+
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
1550+
}
13871551

1388-
if (type === kTextType || type === kJSONType) {
1389-
this[kConsume].body += val;
1552+
this[kConsume].body += chunk;
13901553
} else {
1391-
this[kConsume].body.push(val);
1392-
}
1393-
})
1394-
.on('end', function() {
1395-
const { type, resolve, body } = this[kConsume];
1396-
1397-
try {
1398-
if (type === kTextType) {
1399-
resolve(body);
1400-
} else if (type === kJSONType) {
1401-
resolve(JSONParse(body));
1402-
} else if (type === kArrayBufferType) {
1403-
resolve(Buffer.concat(body).buffer);
1404-
} else if (type === kBlobType) {
1405-
if (!Blob) {
1406-
Blob = require('buffer').Blob;
1407-
}
1408-
resolve(new Blob(body));
1554+
if (typeof chunk === 'string') {
1555+
chunk = Buffer.from(chunk);
1556+
// TODO: Encoding check/transform?
1557+
} else if (chunk instanceof Buffer) {
1558+
// Do nothing...
1559+
} else if (Stream._isUint8Array(chunk)) {
1560+
chunk = Stream._uint8ArrayToBuffer(chunk);
1561+
} else {
1562+
throw new ERR_INVALID_ARG_TYPE(
1563+
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
14091564
}
14101565

1411-
this[kConsume].body = null;
1412-
} catch (err) {
1413-
self.destroy(err);
1566+
this[kConsume].body.push(chunk);
14141567
}
1415-
})
1568+
1569+
return true;
1570+
}
1571+
};
1572+
1573+
self
1574+
.on('error', reject)
14161575
.on('close', function() {
14171576
const { body, reject } = this[kConsume];
14181577

@@ -1522,5 +1681,6 @@ Readable.fromWeb = function(readableStream, options) {
15221681
};
15231682

15241683
Readable.toWeb = function(streamReadable) {
1525-
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable);
1684+
return streamReadable[kConsume] !== undefined ? streamReadable.stream :
1685+
lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable);
15261686
};

0 commit comments

Comments
 (0)