Skip to content

Commit 6548fcb

Browse files
committed
lib: make diagnostics_channel async iterable
1 parent 7ebae13 commit 6548fcb

File tree

2 files changed

+104
-1
lines changed

2 files changed

+104
-1
lines changed

lib/diagnostics_channel.js

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
const {
44
ArrayPrototypeIndexOf,
55
ArrayPrototypePush,
6+
ArrayPrototypeShift,
67
ArrayPrototypeSplice,
78
ObjectCreate,
89
ObjectGetPrototypeOf,
910
ObjectSetPrototypeOf,
11+
Promise,
12+
PromiseResolve,
13+
SymbolAsyncIterator,
1014
SymbolHasInstance,
1115
WeakRefPrototypeGet
1216
} = primordials;
@@ -21,8 +25,72 @@ const { triggerUncaughtException } = internalBinding('errors');
2125

2226
const { WeakReference } = internalBinding('util');
2327

28+
class AsyncIterableChannel {
29+
constructor(channel) {
30+
this.channel = channel;
31+
this.events = [];
32+
this.waiting = [];
33+
34+
this.subscriber = (message) => {
35+
const resolve = ArrayPrototypeShift(this.waiting);
36+
if (resolve) {
37+
return resolve({
38+
value: message,
39+
done: false
40+
});
41+
}
42+
43+
ArrayPrototypePush(this.events, message);
44+
};
45+
46+
channel.subscribe(this.subscriber);
47+
}
48+
49+
[SymbolAsyncIterator]() {
50+
return this;
51+
}
52+
53+
return() {
54+
const data = { done: true };
55+
this.done = true;
56+
57+
this.channel.unsubscribe(this.subscriber);
58+
59+
for (let i = 0; i < this.waiting.length; i++) {
60+
const resolve = this.waiting[i];
61+
resolve(data);
62+
}
63+
64+
return PromiseResolve(data);
65+
}
66+
67+
next() {
68+
const event = ArrayPrototypeShift(this.events);
69+
if (event) {
70+
return PromiseResolve({
71+
value: event,
72+
done: false
73+
});
74+
}
75+
76+
if (this.done) {
77+
return PromiseResolve({
78+
done: true
79+
});
80+
}
81+
82+
return new Promise((resolve) => {
83+
ArrayPrototypePush(this.waiting, resolve);
84+
});
85+
}
86+
}
87+
2488
// TODO(qard): should there be a C++ channel interface?
2589
class ActiveChannel {
90+
[SymbolAsyncIterator]() {
91+
return new AsyncIterableChannel(this);
92+
}
93+
2694
subscribe(subscription) {
2795
if (typeof subscription !== 'function') {
2896
throw new ERR_INVALID_ARG_TYPE('subscription', ['function'],
@@ -71,7 +139,11 @@ class Channel {
71139
static [SymbolHasInstance](instance) {
72140
const prototype = ObjectGetPrototypeOf(instance);
73141
return prototype === Channel.prototype ||
74-
prototype === ActiveChannel.prototype;
142+
prototype === ActiveChannel.prototype;
143+
}
144+
145+
[SymbolAsyncIterator]() {
146+
return new AsyncIterableChannel(this);
75147
}
76148

77149
subscribe(subscription) {
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const dc = require('diagnostics_channel');
5+
const assert = require('assert');
6+
7+
const input = {
8+
foo: 'bar'
9+
};
10+
11+
const channel = dc.channel('test');
12+
13+
const done = common.mustCall();
14+
15+
async function main() {
16+
for await (const message of channel) {
17+
assert.strictEqual(channel.hasSubscribers, true);
18+
assert.strictEqual(message, input);
19+
break;
20+
}
21+
22+
// Make sure the subscription is cleaned up when breaking the loop!
23+
assert.strictEqual(channel.hasSubscribers, false);
24+
done();
25+
}
26+
27+
main();
28+
29+
setTimeout(common.mustCall(() => {
30+
channel.publish(input);
31+
}), 1);

0 commit comments

Comments
 (0)