Skip to content

Commit 0798f07

Browse files
committed
feat: allow ensureDelivery to be able to ensure delivery on emit
Signed-off-by: Remi Cattiau <[email protected]>
1 parent 1671f66 commit 0798f07

File tree

4 files changed

+90
-12
lines changed

4 files changed

+90
-12
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ function sendWithAxios(message) {
107107

108108
const emit = emitterFor(sendWithAxios, { mode: Mode.BINARY });
109109
// Set the emit
110-
Emitter.getSingleton().on("event", emit);
110+
Emitter.on("cloudevent", emit);
111111

112112
...
113113
// In any part of the code will send the event

src/event/cloudevent.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,11 @@ export class CloudEvent implements CloudEventV1, CloudEventV03 {
171171
/**
172172
* Emit this CloudEvent through the application
173173
*
174-
* @return {CloudEvent} current CloudEvent object
174+
* @param {boolean} ensureDelivery fail the promise if one listener fail
175+
* @return {Promise<CloudEvent>} this
175176
*/
176-
public emit(): this {
177-
Emitter.emitEvent(this);
177+
public async emit(ensureDelivery = false): Promise<this> {
178+
await Emitter.emitEvent(this, ensureDelivery);
178179
return this;
179180
}
180181

src/transport/emitter.ts

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ export class Emitter extends EventEmitter {
9898
/**
9999
* Singleton store
100100
*/
101-
static singleton: Emitter | undefined = undefined;
101+
static instance: Emitter | undefined = undefined;
102102
url?: string;
103103
protocol: Protocol;
104104
binaryEmitter: EmitterFunction;
@@ -113,7 +113,9 @@ export class Emitter extends EventEmitter {
113113
* @param {Object} [options] The configuration options for this event. Options
114114
* provided will be passed along to Node.js `http.request()`.
115115
* https://nodejs.org/api/http.html#http_http_request_options_callback
116-
* @deprecated Will be removed in 4.0.0. Consider using the emitterFactory
116+
* @deprecated in 4.0.0 this class functionality will be limited to events propagated
117+
* throughout the Node.js process. To emit events over a transport protocol, consider using the
118+
* emitterFactory
117119
*/
118120
constructor(options: TransportOptions = { protocol: Protocol.HTTPBinary }) {
119121
super();
@@ -153,20 +155,42 @@ export class Emitter extends EventEmitter {
153155
*
154156
* @return {Emitter} return Emitter singleton
155157
*/
156-
static getSingleton(): Emitter {
157-
if (!Emitter.singleton) {
158-
Emitter.singleton = new Emitter();
158+
static getInstance(): Emitter {
159+
if (!Emitter.instance) {
160+
Emitter.instance = new Emitter();
159161
}
160-
return Emitter.singleton;
162+
return Emitter.instance;
163+
}
164+
165+
/**
166+
* Add a listener for eventing
167+
*
168+
* @param {string} event type to listen to
169+
* @param {Function} listener to call on event
170+
* @return {void}
171+
*/
172+
static on(event: "cloudevent" | "newListener" | "removeListener", listener: (...args: any[]) => void): void {
173+
this.getInstance().on(event, listener);
161174
}
162175

163176
/**
164177
* Emit an event inside this application
165178
*
166179
* @param {CloudEvent} event to emit
180+
* @param {boolean} ensureDelivery fail the promise if one listener fail
167181
* @return {void}
168182
*/
169-
static emitEvent(event: CloudEvent): void {
170-
this.getSingleton().emit("event", event);
183+
static async emitEvent(event: CloudEvent, ensureDelivery = false): Promise<void> {
184+
if (!ensureDelivery) {
185+
// Ensure delivery is disabled so we don't wait for Promise
186+
Emitter.getInstance().emit("cloudevent", event);
187+
} else {
188+
// Execute all listeners and wrap them in a Promise
189+
await Promise.all(
190+
Emitter.getInstance()
191+
.listeners("cloudevent")
192+
.map(async (l) => l(event)),
193+
);
194+
}
171195
}
172196
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import "mocha";
2+
3+
import { emitterFor, HTTP, Mode, Message, Emitter } from "../../src";
4+
5+
import { fixture, assertStructured } from "./emitter_factory_test";
6+
7+
import { rejects, doesNotReject } from "assert";
8+
9+
describe("Emitter Singleton", () => {
10+
it("emit", async () => {
11+
let msg: Message | unknown = undefined;
12+
const fn = async (message: Message) => {
13+
msg = message;
14+
};
15+
const emitter = emitterFor(fn, { binding: HTTP, mode: Mode.STRUCTURED });
16+
Emitter.on("cloudevent", emitter);
17+
fixture.emit();
18+
while (msg === undefined) {
19+
await new Promise((resolve) => setTimeout(resolve, 10));
20+
}
21+
let body: unknown = (<Message>(<unknown>msg)).body;
22+
if (typeof body === "string") {
23+
body = JSON.parse(body);
24+
}
25+
assertStructured({ ...(<any>body), ...(<Message>(<unknown>msg)).headers });
26+
});
27+
28+
it("emitWithEnsureDelivery", async () => {
29+
let msg: Message | unknown = undefined;
30+
const fn = async (message: Message) => {
31+
msg = message;
32+
};
33+
const emitter = emitterFor(fn, { binding: HTTP, mode: Mode.STRUCTURED });
34+
Emitter.on("cloudevent", emitter);
35+
await fixture.emit(true);
36+
let body: any = (<Message>msg).body;
37+
if (typeof body === "string") {
38+
body = JSON.parse(body);
39+
}
40+
assertStructured({ ...(<any>body), ...(<Message>(<unknown>msg)).headers });
41+
});
42+
43+
it("emitWithEnsureDeliveryError", async () => {
44+
const emitter = async () => {
45+
throw new Error("Not sent");
46+
};
47+
Emitter.on("cloudevent", emitter);
48+
// Should fail with emitWithEnsureDelivery
49+
await rejects(() => fixture.emit(true));
50+
// Should not fail with emitWithEnsureDelivery
51+
await doesNotReject(() => fixture.emit(false));
52+
});
53+
});

0 commit comments

Comments
 (0)