Skip to content

Commit f764be1

Browse files
committed
stream: refactor streams
- Remove unnecessary scope. - Refactor to use more validators. - Avoid using deprecated APIs.
1 parent 365e5f5 commit f764be1

File tree

3 files changed

+27
-42
lines changed

3 files changed

+27
-42
lines changed

lib/internal/streams/duplex.js

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,10 @@ const Writable = require('internal/streams/writable');
4141
ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype);
4242
ObjectSetPrototypeOf(Duplex, Readable);
4343

44-
{
45-
// Allow the keys array to be GC'ed.
46-
for (const method of ObjectKeys(Writable.prototype)) {
47-
if (!Duplex.prototype[method])
48-
Duplex.prototype[method] = Writable.prototype[method];
49-
}
44+
// Allow the keys array to be GC'ed.
45+
for (const method of ObjectKeys(Writable.prototype)) {
46+
if (!Duplex.prototype[method])
47+
Duplex.prototype[method] = Writable.prototype[method];
5048
}
5149

5250
function Duplex(options) {

lib/internal/streams/operators.js

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ const {
1212
} = require('internal/errors');
1313
const {
1414
validateAbortSignal,
15+
validateFunction,
1516
validateInteger,
17+
validateObject,
1618
} = require('internal/validators');
1719
const { kWeakHandler } = require('internal/event_target');
1820
const { finished } = require('internal/streams/end-of-stream');
@@ -32,12 +34,9 @@ const kEmpty = Symbol('kEmpty');
3234
const kEof = Symbol('kEof');
3335

3436
function map(fn, options) {
35-
if (typeof fn !== 'function') {
36-
throw new ERR_INVALID_ARG_TYPE(
37-
'fn', ['Function', 'AsyncFunction'], fn);
38-
}
39-
if (options != null && typeof options !== 'object') {
40-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
37+
validateFunction(fn, 'fn');
38+
if (options != null) {
39+
validateObject(options, 'options');
4140
}
4241
if (options?.signal != null) {
4342
validateAbortSignal(options.signal, 'options.signal');
@@ -167,8 +166,8 @@ function map(fn, options) {
167166
}
168167

169168
function asIndexedPairs(options = undefined) {
170-
if (options != null && typeof options !== 'object') {
171-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
169+
if (options != null) {
170+
validateObject(options, 'options');
172171
}
173172
if (options?.signal != null) {
174173
validateAbortSignal(options.signal, 'options.signal');
@@ -186,8 +185,8 @@ function asIndexedPairs(options = undefined) {
186185
}
187186

188187
async function some(fn, options) {
189-
if (options != null && typeof options !== 'object') {
190-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
188+
if (options != null) {
189+
validateObject(options, 'options');
191190
}
192191
if (options?.signal != null) {
193192
validateAbortSignal(options.signal, 'options.signal');
@@ -216,21 +215,15 @@ async function some(fn, options) {
216215
}
217216

218217
async function every(fn, options) {
219-
if (typeof fn !== 'function') {
220-
throw new ERR_INVALID_ARG_TYPE(
221-
'fn', ['Function', 'AsyncFunction'], fn);
222-
}
218+
validateFunction(fn, 'fn');
223219
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
224220
return !(await some.call(this, async (...args) => {
225221
return !(await fn(...args));
226222
}, options));
227223
}
228224

229225
async function forEach(fn, options) {
230-
if (typeof fn !== 'function') {
231-
throw new ERR_INVALID_ARG_TYPE(
232-
'fn', ['Function', 'AsyncFunction'], fn);
233-
}
226+
validateFunction(fn, 'fn');
234227
async function forEachFn(value, options) {
235228
await fn(value, options);
236229
return kEmpty;
@@ -240,10 +233,7 @@ async function forEach(fn, options) {
240233
}
241234

242235
function filter(fn, options) {
243-
if (typeof fn !== 'function') {
244-
throw new ERR_INVALID_ARG_TYPE(
245-
'fn', ['Function', 'AsyncFunction'], fn);
246-
}
236+
validateFunction(fn, 'fn');
247237
async function filterFn(value, options) {
248238
if (await fn(value, options)) {
249239
return value;
@@ -263,12 +253,9 @@ class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
263253
}
264254

265255
async function reduce(reducer, initialValue, options) {
266-
if (typeof reducer !== 'function') {
267-
throw new ERR_INVALID_ARG_TYPE(
268-
'reducer', ['Function', 'AsyncFunction'], reducer);
269-
}
270-
if (options != null && typeof options !== 'object') {
271-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
256+
validateFunction(fn, 'fn');
257+
if (options != null) {
258+
validateObject(options, 'options');
272259
}
273260
if (options?.signal != null) {
274261
validateAbortSignal(options.signal, 'options.signal');
@@ -311,8 +298,8 @@ async function reduce(reducer, initialValue, options) {
311298
}
312299

313300
async function toArray(options) {
314-
if (options != null && typeof options !== 'object') {
315-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
301+
if (options != null) {
302+
validateObject(options, 'options');
316303
}
317304
if (options?.signal != null) {
318305
validateAbortSignal(options.signal, 'options.signal');
@@ -351,8 +338,8 @@ function toIntegerOrInfinity(number) {
351338
}
352339

353340
function drop(number, options = undefined) {
354-
if (options != null && typeof options !== 'object') {
355-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
341+
if (options != null) {
342+
validateObject(options, 'options');
356343
}
357344
if (options?.signal != null) {
358345
validateAbortSignal(options.signal, 'options.signal');
@@ -375,8 +362,8 @@ function drop(number, options = undefined) {
375362
}
376363

377364
function take(number, options = undefined) {
378-
if (options != null && typeof options !== 'object') {
379-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
365+
if (options != null) {
366+
validateObject(options, 'options');
380367
}
381368
if (options?.signal != null) {
382369
validateAbortSignal(options.signal, 'options.signal');

lib/internal/streams/readable.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -787,7 +787,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
787787
debug('onerror', er);
788788
unpipe();
789789
dest.removeListener('error', onerror);
790-
if (EE.listenerCount(dest, 'error') === 0) {
790+
if (dest.listenerCount('error') === 0) {
791791
const s = dest._writableState || dest._readableState;
792792
if (s && !s.errorEmitted) {
793793
// User incorrectly emitted 'error' directly on the stream.
@@ -852,7 +852,7 @@ function pipeOnDrain(src, dest) {
852852
}
853853

854854
if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
855-
EE.listenerCount(src, 'data')) {
855+
src.listenerCount('data')) {
856856
state.flowing = true;
857857
flow(src);
858858
}

0 commit comments

Comments
 (0)