Skip to content

Commit 43d9e01

Browse files
loopingzlance
authored andcommitted
feat: allow ensureDelivery to be able to ensure delivery on emit
Signed-off-by: Remi Cattiau <[email protected]>
1 parent d418a50 commit 43d9e01

File tree

5 files changed

+91
-12
lines changed

5 files changed

+91
-12
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ function sendWithAxios(message) {
107107

108108
const emit = emitterFor(sendWithAxios, { mode: Mode.BINARY });
109109
// Set the emit
110-
Emitter.getSingleton().on("event", emit);
110+
Emitter.on("cloudevent", emit);
111111

112112
...
113113
// In any part of the code will send the event

src/event/cloudevent.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,11 @@ export class CloudEvent implements CloudEventV1, CloudEventV03 {
171171
/**
172172
* Emit this CloudEvent through the application
173173
*
174-
* @return {CloudEvent} current CloudEvent object
174+
* @param {boolean} ensureDelivery fail the promise if one listener fail
175+
* @return {Promise<CloudEvent>} this
175176
*/
176-
public emit(): this {
177-
Emitter.emitEvent(this);
177+
public async emit(ensureDelivery = true): Promise<this> {
178+
await Emitter.emitEvent(this, ensureDelivery);
178179
return this;
179180
}
180181

src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { CloudEvent, Version } from "./event/cloudevent";
22
import { ValidationError } from "./event/validation";
33
import { CloudEventV03, CloudEventV03Attributes, CloudEventV1, CloudEventV1Attributes } from "./event/interfaces";
44

5-
import { Options, TransportFunction, EmitterFunction, emitterFor } from "./transport/emitter";
5+
import { Options, TransportFunction, EmitterFunction, emitterFor, Emitter } from "./transport/emitter";
66
import { Headers, Mode, Binding, HTTP, Message, Serializer, Deserializer } from "./message";
77

88
import CONSTANTS from "./constants";
@@ -28,6 +28,7 @@ export {
2828
TransportFunction,
2929
EmitterFunction,
3030
emitterFor,
31+
Emitter,
3132
Options,
3233
// From Constants
3334
CONSTANTS,

src/transport/emitter.ts

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ export class Emitter extends EventEmitter {
6767
/**
6868
* Singleton store
6969
*/
70-
static singleton: Emitter | undefined = undefined;
70+
static instance: Emitter | undefined = undefined;
7171

7272
/**
7373
* Create an Emitter
@@ -83,20 +83,42 @@ export class Emitter extends EventEmitter {
8383
*
8484
* @return {Emitter} return Emitter singleton
8585
*/
86-
static getSingleton(): Emitter {
87-
if (!Emitter.singleton) {
88-
Emitter.singleton = new Emitter();
86+
static getInstance(): Emitter {
87+
if (!Emitter.instance) {
88+
Emitter.instance = new Emitter();
8989
}
90-
return Emitter.singleton;
90+
return Emitter.instance;
91+
}
92+
93+
/**
94+
* Add a listener for eventing
95+
*
96+
* @param {string} event type to listen to
97+
* @param {Function} listener to call on event
98+
* @return {void}
99+
*/
100+
static on(event: "cloudevent" | "newListener" | "removeListener", listener: (...args: any[]) => void): void {
101+
this.getInstance().on(event, listener);
91102
}
92103

93104
/**
94105
* Emit an event inside this application
95106
*
96107
* @param {CloudEvent} event to emit
108+
* @param {boolean} ensureDelivery fail the promise if one listener fail
97109
* @return {void}
98110
*/
99-
static emitEvent(event: CloudEvent): void {
100-
this.getSingleton().emit("event", event);
111+
static async emitEvent(event: CloudEvent, ensureDelivery = true): Promise<void> {
112+
if (!ensureDelivery) {
113+
// Ensure delivery is disabled so we don't wait for Promise
114+
Emitter.getInstance().emit("cloudevent", event);
115+
} else {
116+
// Execute all listeners and wrap them in a Promise
117+
await Promise.all(
118+
Emitter.getInstance()
119+
.listeners("cloudevent")
120+
.map(async (l) => l(event)),
121+
);
122+
}
101123
}
102124
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import "mocha";
2+
3+
import { emitterFor, HTTP, Mode, Message, Emitter } from "../../src";
4+
5+
import { fixture, assertStructured } from "./emitter_factory_test";
6+
7+
import { rejects, doesNotReject } from "assert";
8+
9+
describe("Emitter Singleton", () => {
10+
it("emit a Node.js 'cloudevent' event as an EventEmitter", async () => {
11+
const msg: Message | unknown = await new Promise((resolve) => {
12+
const fn = async (message: Message) => {
13+
resolve(message);
14+
};
15+
const emitter = emitterFor(fn, { binding: HTTP, mode: Mode.STRUCTURED });
16+
Emitter.on("cloudevent", emitter);
17+
18+
fixture.emit(false);
19+
});
20+
let body: unknown = (<Message>(<unknown>msg)).body;
21+
if (typeof body === "string") {
22+
body = JSON.parse(body);
23+
}
24+
assertStructured({ ...(<any>body), ...(<Message>(<unknown>msg)).headers });
25+
});
26+
27+
it("emit a Node.js 'cloudevent' event as an EventEmitter with ensureDelivery", async () => {
28+
let msg: Message | unknown = undefined;
29+
const fn = async (message: Message) => {
30+
msg = message;
31+
};
32+
const emitter = emitterFor(fn, { binding: HTTP, mode: Mode.STRUCTURED });
33+
Emitter.on("cloudevent", emitter);
34+
await fixture.emit(true);
35+
let body: any = (<Message>msg).body;
36+
if (typeof body === "string") {
37+
body = JSON.parse(body);
38+
}
39+
assertStructured({ ...(<any>body), ...(<Message>(<unknown>msg)).headers });
40+
});
41+
42+
it("emit a Node.js 'cloudevent' event as an EventEmitter with ensureDelivery Error", async () => {
43+
const emitter = async () => {
44+
throw new Error("Not sent");
45+
};
46+
Emitter.on("cloudevent", emitter);
47+
// Should fail with emitWithEnsureDelivery
48+
await rejects(() => fixture.emit(true));
49+
// Should not fail with emitWithEnsureDelivery
50+
// Work locally but not on Github Actions
51+
if (!process.env.GITHUB_WORKFLOW) {
52+
await doesNotReject(() => fixture.emit(false));
53+
}
54+
});
55+
});

0 commit comments

Comments
 (0)