Skip to content

Commit e334b6e

Browse files
authored
feat: add emitterFactory and friends (#342)
* feat: add emitterFactory and friends This commit adds an emitterFactory function that returns an EmitterFunction object. The EmitterFunction may be used to emit events over a supported network transport layer. Currently, only HTTP is supported. Parameters provided to the emitterFactory are the transport Binding (only HTTP supported), the encoding mode (Mode.BINARY or Mode.STRUCTURED), and a TransportFunction. The implementation for emitBinary and emitStructured has been replaced with this simple pattern and those two functions have been removed. Example: ```js // The endpoint URL that will receive the event const sink = 'https://my-event-sink'; // A function that uses Axios to send a message over HTTP function axiosEmitter(message: Message, options?: Options): Promise<unknown> { return axios.post(sink, message.body, { headers: message.headers, ...options }); } // Create an event emitter const emit = emitterFactory(HTTP, Mode.BINARY, axiosEmitter); // Emit an event, sending it to the endpoint URL emit(new CloudEvent{ source: '/example', type: 'example' }); ``` Signed-off-by: Lance Ball <[email protected]>
1 parent a9114b7 commit e334b6e

13 files changed

+799
-688
lines changed

package-lock.json

Lines changed: 463 additions & 103 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,10 @@
107107
"@types/axios": "^0.14.0",
108108
"@types/chai": "^4.2.11",
109109
"@types/cucumber": "^6.0.1",
110+
"@types/got": "^9.6.11",
110111
"@types/mocha": "^7.0.2",
111112
"@types/node": "^13.13.9",
113+
"@types/superagent": "^4.1.10",
112114
"@types/uuid": "^8.0.0",
113115
"@typescript-eslint/eslint-plugin": "^3.4.0",
114116
"@typescript-eslint/parser": "^3.4.0",
@@ -123,12 +125,14 @@
123125
"eslint-plugin-import": "^2.20.2",
124126
"eslint-plugin-node": "^11.1.0",
125127
"eslint-plugin-prettier": "^3.1.4",
128+
"got": "^11.7.0",
126129
"http-parser-js": "^0.5.2",
127130
"mocha": "~7.1.1",
128131
"nock": "~12.0.3",
129132
"nyc": "~15.0.0",
130133
"prettier": "^2.0.5",
131134
"standard-version": "^9.0.0",
135+
"superagent": "^6.1.0",
132136
"ts-node": "^8.10.2",
133137
"typedoc": "^0.18.0",
134138
"typedoc-clarity-theme": "~1.1.0",

src/index.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,14 @@ import { CloudEvent, Version } from "./event/cloudevent";
22
import { ValidationError } from "./event/validation";
33
import { CloudEventV03, CloudEventV03Attributes, CloudEventV1, CloudEventV1Attributes } from "./event/interfaces";
44

5-
import { Emitter, TransportOptions } from "./transport/emitter";
5+
import {
6+
Emitter,
7+
TransportOptions,
8+
Options,
9+
TransportFunction,
10+
EmitterFunction,
11+
emitterFor,
12+
} from "./transport/emitter";
613
import { Receiver } from "./transport/receiver";
714
import { Protocol } from "./transport/protocols";
815
import { Headers, Mode, Binding, HTTP, Message, Serializer, Deserializer, headersFor } from "./message";
@@ -32,6 +39,10 @@ export {
3239
Receiver, // TODO: Deprecated. Remove for 4.0
3340
Protocol, // TODO: Deprecated. Remove for 4.0
3441
TransportOptions, // TODO: Deprecated. Remove for 4.0
42+
TransportFunction,
43+
EmitterFunction,
44+
emitterFor,
45+
Options,
3546
// From Constants
3647
CONSTANTS,
3748
};

src/message/http/index.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@ import { Base64Parser, JSONParser, MappedParser, Parser, parserByContentType } f
88
// implements Serializer
99
export function binary(event: CloudEvent): Message {
1010
const contentType: Headers = { [CONSTANTS.HEADER_CONTENT_TYPE]: CONSTANTS.DEFAULT_CONTENT_TYPE };
11-
const headers: Headers = headersFor(event);
11+
const headers: Headers = { ...contentType, ...headersFor(event) };
12+
let body = asData(event.data, event.datacontenttype as string);
13+
if (typeof body === "object") {
14+
body = JSON.stringify(body);
15+
}
1216
return {
13-
headers: { ...contentType, ...headers },
14-
body: asData(event.data, event.datacontenttype as string),
17+
headers,
18+
body,
1519
};
1620
}
1721

src/transport/emitter.ts

Lines changed: 71 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import { CloudEvent } from "../event/cloudevent";
2-
import { emitBinary, emitStructured } from "./http";
2+
import { axiosEmitter } from "./http";
33
import { Protocol } from "./protocols";
4-
import { AxiosResponse } from "axios";
54
import { Agent } from "http";
5+
import { HTTP, Message, Mode } from "../message";
66

77
/**
88
* Options supplied to the Emitter when sending an event.
99
* In addition to url and protocol, TransportOptions may
1010
* also accept custom options that will be passed to the
1111
* Node.js http functions.
12+
* @deprecated will be removed in 4.0.0
1213
*/
1314
export interface TransportOptions {
1415
/**
@@ -26,8 +27,62 @@ export interface TransportOptions {
2627
[key: string]: string | Record<string, unknown> | Protocol | Agent | undefined;
2728
}
2829

29-
interface EmitterFunction {
30-
(event: CloudEvent, options: TransportOptions): Promise<AxiosResponse>;
30+
/**
31+
* Options is an additional, optional dictionary of options that may
32+
* be passed to an EmitterFunction and TransportFunction
33+
*/
34+
export interface Options {
35+
[key: string]: string | Record<string, unknown> | unknown;
36+
}
37+
38+
/**
39+
* EmitterFunction is an invokable interface returned by the emitterFactory
40+
* function. Invoke an EmitterFunction with a CloudEvent and optional transport
41+
* options to send the event as a Message across supported transports.
42+
*/
43+
export interface EmitterFunction {
44+
(event: CloudEvent, options?: Options): Promise<unknown>;
45+
}
46+
47+
/**
48+
* TransportFunction is an invokable interface provided to the emitterFactory.
49+
* A TransportFunction's responsiblity is to send a JSON encoded event Message
50+
* across the wire.
51+
*/
52+
export interface TransportFunction {
53+
(message: Message, options?: Options): Promise<unknown>;
54+
}
55+
56+
/**
57+
* emitterFactory creates and returns an EmitterFunction using the supplied
58+
* TransportFunction. The returned EmitterFunction will invoke the Binding's
59+
* `binary` or `structured` function to convert a CloudEvent into a JSON
60+
* Message based on the Mode provided, and invoke the TransportFunction with
61+
* the Message and any supplied options.
62+
*
63+
* @param {TransportFunction} fn a TransportFunction that can accept an event Message
64+
* @param { {Binding, Mode} } options network binding and message serialization options
65+
* @param {Binding} options.binding a transport binding, e.g. HTTP
66+
* @param {Mode} options.mode the encoding mode (Mode.BINARY or Mode.STRUCTURED)
67+
* @returns {EmitterFunction} an EmitterFunction to send events with
68+
*/
69+
export function emitterFor(fn: TransportFunction, options = { binding: HTTP, mode: Mode.BINARY }): EmitterFunction {
70+
if (!fn) {
71+
throw new TypeError("A TransportFunction is required");
72+
}
73+
const { binding, mode } = options;
74+
return function emit(event: CloudEvent, options?: Options): Promise<unknown> {
75+
options = options || {};
76+
77+
switch (mode) {
78+
case Mode.BINARY:
79+
return fn(binding.binary(event), options);
80+
case Mode.STRUCTURED:
81+
return fn(binding.structured(event), options);
82+
default:
83+
throw new TypeError(`Unexpected transport mode: ${mode}`);
84+
}
85+
};
3186
}
3287

3388
/**
@@ -36,19 +91,21 @@ interface EmitterFunction {
3691
*
3792
* @see https://github.com/cloudevents/spec/blob/v1.0/http-protocol-binding.md
3893
* @see https://github.com/cloudevents/spec/blob/v1.0/http-protocol-binding.md#13-content-modes
94+
* @deprecated Will be removed in 4.0.0. Consider using the emitterFactory
95+
*
3996
*/
4097
export class Emitter {
4198
url?: string;
4299
protocol: Protocol;
43-
emitter: EmitterFunction;
100+
binaryEmitter: EmitterFunction;
101+
structuredEmitter: EmitterFunction;
44102

45103
constructor(options: TransportOptions = { protocol: Protocol.HTTPBinary }) {
46104
this.protocol = options.protocol as Protocol;
47105
this.url = options.url;
48-
this.emitter = emitBinary;
49-
if (this.protocol === Protocol.HTTPStructured) {
50-
this.emitter = emitStructured;
51-
}
106+
107+
this.binaryEmitter = emitterFor(axiosEmitter(this.url as string));
108+
this.structuredEmitter = emitterFor(axiosEmitter(this.url as string), { binding: HTTP, mode: Mode.STRUCTURED });
52109
}
53110

54111
/**
@@ -63,15 +120,15 @@ export class Emitter {
63120
* In that case, it will be used as the recipient endpoint. The endpoint can
64121
* be overridden by providing a URL here.
65122
* @returns {Promise} Promise with an eventual response from the receiver
66-
* @deprecated Will be removed in 4.0.0. Consider using the Message interface with HTTP.[binary|structured](event)
123+
* @deprecated Will be removed in 4.0.0. Consider using the emitterFactory
67124
*/
68-
send(event: CloudEvent, options?: TransportOptions): Promise<AxiosResponse> {
125+
send(event: CloudEvent, options?: TransportOptions): Promise<unknown> {
69126
options = options || {};
70127
options.url = options.url || this.url;
71128
if (options.protocol != this.protocol) {
72-
if (this.protocol === Protocol.HTTPBinary) return emitBinary(event, options);
73-
return emitStructured(event, options);
129+
if (this.protocol === Protocol.HTTPBinary) return this.binaryEmitter(event, options);
130+
return this.structuredEmitter(event, options);
74131
}
75-
return this.emitter(event, options);
132+
return this.binaryEmitter(event, options);
76133
}
77134
}

src/transport/http/binary_emitter.ts

Lines changed: 0 additions & 32 deletions
This file was deleted.

src/transport/http/index.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,17 @@
1-
export * from "./binary_emitter";
2-
export * from "./structured_emitter";
1+
import { Message, Options } from "../..";
2+
import axios from "axios";
3+
4+
export function axiosEmitter(sink: string) {
5+
return function (message: Message, options?: Options): Promise<unknown> {
6+
options = { ...options };
7+
const headers = {
8+
...message.headers,
9+
...(options.headers as Record<string, string>),
10+
};
11+
delete options.headers;
12+
return axios.post(sink, message.body, {
13+
headers: headers,
14+
...options,
15+
});
16+
};
17+
}

src/transport/http/structured_emitter.ts

Lines changed: 0 additions & 20 deletions
This file was deleted.

0 commit comments

Comments
 (0)