Skip to content

Commit a205441

Browse files
committed
add EventEmitter.on to async iterate over events
Port of nodejs/node#27994
1 parent 48e3d18 commit a205441

File tree

3 files changed

+334
-0
lines changed

3 files changed

+334
-0
lines changed

events.js

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ function EventEmitter() {
5555
}
5656
module.exports = EventEmitter;
5757
module.exports.once = once;
58+
module.exports.on = on;
5859

5960
// Backwards-compat with node 0.10.x
6061
EventEmitter.EventEmitter = EventEmitter;
@@ -467,6 +468,109 @@ function once(emitter, name) {
467468
});
468469
}
469470

471+
function createIterResult(value, done) {
472+
return { value: value, done: done };
473+
}
474+
475+
var AsyncIteratorPrototype = undefined;
476+
477+
function on(emitter, event) {
478+
// Initialize it on first run
479+
if (AsyncIteratorPrototype === undefined) {
480+
var asyncGenerator;
481+
try {
482+
asyncGenerator = Function('return async function*() {};')();
483+
} catch (err) {}
484+
if (asyncGenerator) {
485+
AsyncIteratorPrototype = Object.getPrototypeOf(
486+
Object.getPrototypeOf(asyncGenerator).prototype);
487+
} else {
488+
AsyncIteratorPrototype = null;
489+
}
490+
}
491+
492+
var unconsumedEvents = [];
493+
var unconsumedPromises = [];
494+
var error = null;
495+
var finished = false;
496+
var iterator = {
497+
next: function next() {
498+
// First, we consume all unread events
499+
var value = unconsumedEvents.shift();
500+
if (value) {
501+
return Promise.resolve(createIterResult(value, false));
502+
}
503+
504+
// Then we error, if an error happened
505+
// This happens one time if at all, because after 'error'
506+
// we stop listening
507+
if (error) {
508+
var p = Promise.reject(error);
509+
error = null;
510+
return p;
511+
}
512+
513+
// If the iterator is finished, resolve to done
514+
if (finished) {
515+
return Promise.resolve(createIterResult(undefined, true));
516+
}
517+
return new Promise(function (resolve, reject) {
518+
unconsumedPromises.push({ resolve: resolve, reject: reject });
519+
});
520+
},
521+
'return': function _return() {
522+
emitter.removeListener(event, eventHandler);
523+
emitter.removeListener('error', errorHandler);
524+
finished = true;
525+
526+
for (var i = 0, l = unconsumedPromises.length; i < l; i++) {
527+
unconsumedPromises[i].resolve(createIterResult(undefined, true));
528+
}
529+
return Promise.resolve(createIterResult(undefined, true));
530+
},
531+
'throw': function _throw(err) {
532+
if (!err || !(err instanceof Error)) {
533+
throw new TypeError('The "EventEmitter.AsyncIterator" property must be an instance of Error. Received ' + typeof err);
534+
}
535+
error = err;
536+
emitter.removeListener(event, eventHandler);
537+
emitter.removeListener('error', errorHandler);
538+
}
539+
};
540+
541+
iterator[Symbol.asyncIterator] = function () { return this; };
542+
543+
Object.setPrototypeOf(iterator, AsyncIteratorPrototype);
544+
545+
emitter.on(event, eventHandler);
546+
emitter.on('error', errorHandler);
547+
548+
return iterator;
549+
550+
function eventHandler() {
551+
var args = [].slice.call(arguments);
552+
var promise = unconsumedPromises.shift();
553+
if (promise) {
554+
promise.resolve(createIterResult(args, false));
555+
} else {
556+
unconsumedEvents.push(args);
557+
}
558+
}
559+
560+
function errorHandler(err) {
561+
finished = true;
562+
563+
var toError = unconsumedPromises.shift();
564+
if (toError) {
565+
toError.reject(err);
566+
} else {
567+
// The next time we call next()
568+
error = err;
569+
}
570+
iterator.return();
571+
}
572+
}
573+
470574
function addErrorHandlerIfEventEmitter(emitter, handler, flags) {
471575
if (typeof emitter.on === 'function') {
472576
eventTargetAgnosticAddListener(emitter, 'error', handler, flags);

tests/index.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ if (functionsHaveNames()) {
5050
require('./modify-in-emit.js');
5151
require('./num-args.js');
5252
require('./once.js');
53+
if (typeof Promise === 'function' && hasSymbols() && Symbol.asyncIterator) {
54+
require('./on-async-iterator.js');
55+
} else {
56+
// Async iterator support is not available.
57+
test('./on-async-iterator.js', { skip: true }, function () {});
58+
}
5359
require('./prepend.js');
5460
require('./set-max-listeners-side-effects.js');
5561
require('./special-event-names.js');

tests/on-async-iterator.js

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
'use strict';
2+
3+
var common = require('./common');
4+
var assert = require('assert');
5+
var EventEmitter = require('../').EventEmitter;
6+
var on = require('../').on;
7+
8+
async function basic() {
9+
var ee = new EventEmitter();
10+
process.nextTick(function () {
11+
ee.emit('foo', 'bar');
12+
// 'bar' is a spurious event, we are testing
13+
// that it does not show up in the iterable
14+
ee.emit('bar', 24);
15+
ee.emit('foo', 42);
16+
});
17+
18+
var iterable = on(ee, 'foo');
19+
20+
var expected = [['bar'], [42]];
21+
22+
for await (var event of iterable) {
23+
var current = expected.shift();
24+
25+
assert.deepStrictEqual(current, event);
26+
27+
if (expected.length === 0) {
28+
break;
29+
}
30+
}
31+
assert.strictEqual(ee.listenerCount('foo'), 0);
32+
assert.strictEqual(ee.listenerCount('error'), 0);
33+
}
34+
35+
async function error() {
36+
var ee = new EventEmitter();
37+
var _err = new Error('kaboom');
38+
process.nextTick(function () {
39+
ee.emit('error', _err);
40+
});
41+
42+
var iterable = on(ee, 'foo');
43+
let looped = false;
44+
let thrown = false;
45+
46+
try {
47+
// eslint-disable-next-line no-unused-vars
48+
for await (var event of iterable) {
49+
looped = true;
50+
}
51+
} catch (err) {
52+
thrown = true;
53+
assert.strictEqual(err, _err);
54+
}
55+
assert.strictEqual(thrown, true);
56+
assert.strictEqual(looped, false);
57+
}
58+
59+
async function errorDelayed() {
60+
var ee = new EventEmitter();
61+
var _err = new Error('kaboom');
62+
process.nextTick(function () {
63+
ee.emit('foo', 42);
64+
ee.emit('error', _err);
65+
});
66+
67+
var iterable = on(ee, 'foo');
68+
var expected = [[42]];
69+
let thrown = false;
70+
71+
try {
72+
for await (var event of iterable) {
73+
var current = expected.shift();
74+
assert.deepStrictEqual(current, event);
75+
}
76+
} catch (err) {
77+
thrown = true;
78+
assert.strictEqual(err, _err);
79+
}
80+
assert.strictEqual(thrown, true);
81+
assert.strictEqual(ee.listenerCount('foo'), 0);
82+
assert.strictEqual(ee.listenerCount('error'), 0);
83+
}
84+
85+
async function throwInLoop() {
86+
var ee = new EventEmitter();
87+
var _err = new Error('kaboom');
88+
89+
process.nextTick(function () {
90+
ee.emit('foo', 42);
91+
});
92+
93+
try {
94+
for await (var event of on(ee, 'foo')) {
95+
assert.deepStrictEqual(event, [42]);
96+
throw _err;
97+
}
98+
} catch (err) {
99+
assert.strictEqual(err, _err);
100+
}
101+
102+
assert.strictEqual(ee.listenerCount('foo'), 0);
103+
assert.strictEqual(ee.listenerCount('error'), 0);
104+
}
105+
106+
async function next() {
107+
var ee = new EventEmitter();
108+
var iterable = on(ee, 'foo');
109+
110+
process.nextTick(function() {
111+
ee.emit('foo', 'bar');
112+
ee.emit('foo', 42);
113+
iterable.return();
114+
});
115+
116+
var results = await Promise.all([
117+
iterable.next(),
118+
iterable.next(),
119+
iterable.next()
120+
]);
121+
122+
assert.deepStrictEqual(results, [{
123+
value: ['bar'],
124+
done: false
125+
}, {
126+
value: [42],
127+
done: false
128+
}, {
129+
value: undefined,
130+
done: true
131+
}]);
132+
133+
assert.deepStrictEqual(await iterable.next(), {
134+
value: undefined,
135+
done: true
136+
});
137+
}
138+
139+
async function nextError() {
140+
var ee = new EventEmitter();
141+
var iterable = on(ee, 'foo');
142+
var _err = new Error('kaboom');
143+
process.nextTick(function() {
144+
ee.emit('error', _err);
145+
});
146+
var results = await Promise.allSettled([
147+
iterable.next(),
148+
iterable.next(),
149+
iterable.next()
150+
]);
151+
assert.deepStrictEqual(results, [{
152+
status: 'rejected',
153+
reason: _err
154+
}, {
155+
status: 'fulfilled',
156+
value: {
157+
value: undefined,
158+
done: true
159+
}
160+
}, {
161+
status: 'fulfilled',
162+
value: {
163+
value: undefined,
164+
done: true
165+
}
166+
}]);
167+
assert.strictEqual(ee.listeners('error').length, 0);
168+
}
169+
170+
async function iterableThrow() {
171+
var ee = new EventEmitter();
172+
var iterable = on(ee, 'foo');
173+
174+
process.nextTick(function () {
175+
ee.emit('foo', 'bar');
176+
ee.emit('foo', 42); // lost in the queue
177+
iterable.throw(_err);
178+
});
179+
180+
var _err = new Error('kaboom');
181+
let thrown = false;
182+
183+
assert.throws(function () {
184+
// No argument
185+
iterable.throw();
186+
}, {
187+
message: 'The "EventEmitter.AsyncIterator" property must be' +
188+
' an instance of Error. Received undefined',
189+
name: 'TypeError'
190+
});
191+
192+
var expected = [['bar'], [42]];
193+
194+
try {
195+
for await (var event of iterable) {
196+
assert.deepStrictEqual(event, expected.shift());
197+
}
198+
} catch (err) {
199+
thrown = true;
200+
assert.strictEqual(err, _err);
201+
}
202+
assert.strictEqual(thrown, true);
203+
assert.strictEqual(expected.length, 0);
204+
assert.strictEqual(ee.listenerCount('foo'), 0);
205+
assert.strictEqual(ee.listenerCount('error'), 0);
206+
}
207+
208+
async function run() {
209+
var funcs = [
210+
basic,
211+
error,
212+
errorDelayed,
213+
throwInLoop,
214+
next,
215+
nextError,
216+
iterableThrow,
217+
];
218+
219+
for (var fn of funcs) {
220+
await fn();
221+
}
222+
}
223+
224+
module.exports = run();

0 commit comments

Comments
 (0)