Skip to content

Commit b0e20dc

Browse files
committed
lib: first pass at a 4.0 Messages implementation
1 parent 73f0bec commit b0e20dc

17 files changed

+500
-1693
lines changed

src/index.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ import { ValidationError } from "./event/validation";
33
import { CloudEventV03, CloudEventV03Attributes, CloudEventV1, CloudEventV1Attributes } from "./event/interfaces";
44

55
import { Emitter, TransportOptions } from "./transport/emitter";
6-
import { Receiver, Mode } from "./transport/receiver";
6+
import { Receiver } from "./transport/receiver";
77
import { Protocol } from "./transport/protocols";
8-
import { Headers, headersFor } from "./transport/http/headers";
8+
import { Headers, Mode, Binding, HTTP, Message, Serializer, Deserializer } from "./messages";
99

1010
import CONSTANTS from "./constants";
1111

@@ -18,14 +18,19 @@ export {
1818
CloudEventV1Attributes,
1919
Version,
2020
ValidationError,
21+
// From messages
22+
Headers,
23+
Mode,
24+
Binding,
25+
Message,
26+
Deserializer,
27+
Serializer,
28+
HTTP,
2129
// From transport
2230
Emitter,
2331
Receiver,
24-
Mode,
2532
Protocol,
2633
TransportOptions,
27-
Headers,
28-
headersFor,
2934
// From Constants
3035
CONSTANTS,
3136
};

src/transport/http/versions.ts renamed to src/messages/http/headers.ts

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,97 @@
11
import { PassThroughParser, DateParser, MappedParser } from "../../parsers";
2+
import { ValidationError, CloudEvent } from "../..";
3+
import { Headers } from "../";
4+
import { Version } from "../../event/cloudevent";
25
import CONSTANTS from "../../constants";
36

7+
export const allowedContentTypes = [CONSTANTS.DEFAULT_CONTENT_TYPE, CONSTANTS.MIME_JSON, CONSTANTS.MIME_OCTET_STREAM];
8+
export const requiredHeaders = [
9+
CONSTANTS.CE_HEADERS.ID,
10+
CONSTANTS.CE_HEADERS.SOURCE,
11+
CONSTANTS.CE_HEADERS.TYPE,
12+
CONSTANTS.CE_HEADERS.SPEC_VERSION,
13+
];
14+
15+
/**
16+
* Validates cloud event headers and their values
17+
* @param {Headers} headers event transport headers for validation
18+
* @throws {ValidationError} if the headers are invalid
19+
* @return {boolean} true if headers are valid
20+
*/
21+
export function validate(headers: Headers): Headers {
22+
const sanitizedHeaders = sanitize(headers);
23+
24+
// if content-type exists, be sure it's an allowed type
25+
const contentTypeHeader = sanitizedHeaders[CONSTANTS.HEADER_CONTENT_TYPE];
26+
const noContentType = !allowedContentTypes.includes(contentTypeHeader);
27+
if (contentTypeHeader && noContentType) {
28+
throw new ValidationError("invalid content type", [sanitizedHeaders[CONSTANTS.HEADER_CONTENT_TYPE]]);
29+
}
30+
31+
requiredHeaders
32+
.filter((required: string) => !sanitizedHeaders[required])
33+
.forEach((required: string) => {
34+
throw new ValidationError(`header '${required}' not found`);
35+
});
36+
37+
if (!sanitizedHeaders[CONSTANTS.HEADER_CONTENT_TYPE]) {
38+
sanitizedHeaders[CONSTANTS.HEADER_CONTENT_TYPE] = CONSTANTS.MIME_JSON;
39+
}
40+
41+
return sanitizedHeaders;
42+
}
43+
44+
/**
45+
* Returns the HTTP headers that will be sent for this event when the HTTP transmission
46+
* mode is "binary". Events sent over HTTP in structured mode only have a single CE header
47+
* and that is "ce-id", corresponding to the event ID.
48+
* @param {CloudEvent} event a CloudEvent
49+
* @returns {Object} the headers that will be sent for the event
50+
*/
51+
export function headersFor(event: CloudEvent): Headers {
52+
const headers: Headers = {};
53+
let headerMap: Readonly<{ [key: string]: MappedParser }>;
54+
if (event.specversion === Version.V1) {
55+
headerMap = v1headerMap;
56+
} else {
57+
headerMap = v03headerMap;
58+
}
59+
60+
// iterate over the event properties - generate a header for each
61+
Object.getOwnPropertyNames(event).forEach((property) => {
62+
const value = event[property];
63+
if (value) {
64+
const map: MappedParser | undefined = headerMap[property] as MappedParser;
65+
if (map) {
66+
headers[map.name] = map.parser.parse(value as string) as string;
67+
} else if (property !== CONSTANTS.DATA_ATTRIBUTE && property !== `${CONSTANTS.DATA_ATTRIBUTE}_base64`) {
68+
headers[`${CONSTANTS.EXTENSIONS_PREFIX}${property}`] = value as string;
69+
}
70+
}
71+
});
72+
// Treat time specially, since it's handled with getters and setters in CloudEvent
73+
if (event.time) {
74+
headers[CONSTANTS.CE_HEADERS.TIME] = event.time as string;
75+
}
76+
return headers;
77+
}
78+
79+
/**
80+
* Sanitizes incoming headers by lowercasing them and potentially removing
81+
* encoding from the content-type header.
82+
* @param {Headers} headers HTTP headers as key/value pairs
83+
* @returns {Headers} the sanitized headers
84+
*/
85+
export function sanitize(headers: Headers): Headers {
86+
const sanitized: Headers = {};
87+
88+
Array.from(Object.keys(headers))
89+
.filter((header) => Object.hasOwnProperty.call(headers, header))
90+
.forEach((header) => (sanitized[header.toLowerCase()] = headers[header]));
91+
92+
return sanitized;
93+
}
94+
495
function parser(name: string, parser = new PassThroughParser()): MappedParser {
596
return { name: name, parser: parser };
697
}

src/messages/http/index.ts

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
import { CloudEvent, CloudEventV03, CloudEventV1, CONSTANTS, Mode, Version } from "../..";
2+
import { Message, Headers } from "..";
3+
4+
import { headersFor, sanitize, v03structuredParsers, v1binaryParsers, v1structuredParsers, validate } from "./headers";
5+
import { asData, isBase64, isString, isStringOrObjectOrThrow, ValidationError } from "../../event/validation";
6+
import { validateCloudEvent } from "../../event/spec";
7+
import { Base64Parser, JSONParser, MappedParser, Parser, parserByContentType } from "../../parsers";
8+
9+
// implements Serializer
10+
export function binary(event: CloudEvent): Message {
11+
const contentType: Headers = { [CONSTANTS.HEADER_CONTENT_TYPE]: CONSTANTS.DEFAULT_CONTENT_TYPE };
12+
const headers: Headers = headersFor(event);
13+
return {
14+
headers: { ...contentType, ...headers },
15+
body: asData(event.data, event.datacontenttype as string),
16+
};
17+
}
18+
19+
// implements Serializer
20+
export function structured(event: CloudEvent): Message {
21+
return {
22+
headers: {
23+
[CONSTANTS.HEADER_CONTENT_TYPE]: CONSTANTS.DEFAULT_CE_CONTENT_TYPE,
24+
},
25+
body: event.toString(),
26+
};
27+
}
28+
29+
/**
30+
* Converts a Message to a CloudEvent
31+
*
32+
* @param {Message} message the incoming message
33+
* @return {CloudEvent} A new {CloudEvent} instance
34+
*/
35+
export function deserialize(message: Message): CloudEvent {
36+
const cleanHeaders: Headers = sanitize(message.headers);
37+
const mode: Mode = getMode(cleanHeaders);
38+
let version = getVersion(mode, cleanHeaders, message.body);
39+
if (version !== Version.V03 && version !== Version.V1) {
40+
console.error(`Unknown spec version ${version}. Default to ${Version.V1}`);
41+
version = Version.V1;
42+
}
43+
switch (mode) {
44+
case Mode.BINARY:
45+
return parseBinary(message, version);
46+
case Mode.STRUCTURED:
47+
return parseStructured(message, version);
48+
default:
49+
throw new ValidationError("Unknown Message mode");
50+
}
51+
}
52+
53+
/**
54+
* Determines the HTTP transport mode (binary or structured) based
55+
* on the incoming HTTP headers.
56+
* @param {Headers} headers the incoming HTTP headers
57+
* @returns {Mode} the transport mode
58+
*/
59+
function getMode(headers: Headers): Mode {
60+
const contentType = headers[CONSTANTS.HEADER_CONTENT_TYPE];
61+
if (contentType && contentType.startsWith(CONSTANTS.MIME_CE)) {
62+
return Mode.STRUCTURED;
63+
}
64+
if (headers[CONSTANTS.CE_HEADERS.ID]) {
65+
return Mode.BINARY;
66+
}
67+
throw new ValidationError("no cloud event detected");
68+
}
69+
70+
/**
71+
* Determines the version of an incoming CloudEvent based on the
72+
* HTTP headers or HTTP body, depending on transport mode.
73+
* @param {Mode} mode the HTTP transport mode
74+
* @param {Headers} headers the incoming HTTP headers
75+
* @param {Record<string, unknown>} body the HTTP request body
76+
* @returns {Version} the CloudEvent specification version
77+
*/
78+
function getVersion(mode: Mode, headers: Headers, body: string | Record<string, string>) {
79+
if (mode === Mode.BINARY) {
80+
// Check the headers for the version
81+
const versionHeader = headers[CONSTANTS.CE_HEADERS.SPEC_VERSION];
82+
if (versionHeader) {
83+
return versionHeader;
84+
}
85+
} else {
86+
// structured mode - the version is in the body
87+
return typeof body === "string" ? JSON.parse(body).specversion : (body as CloudEvent).specversion;
88+
}
89+
return Version.V1;
90+
}
91+
92+
/**
93+
* Parses an incoming HTTP Message, converting it to a {CloudEvent}
94+
* instance if it conforms to the Cloud Event specification for this receiver.
95+
*
96+
* @param {Message} message the incoming HTTP Message
97+
* @param {Version} version the spec version of the incoming event
98+
* @returns {CloudEvent} an instance of CloudEvent representing the incoming request
99+
* @throws {ValidationError} of the event does not conform to the spec
100+
*/
101+
function parseBinary(message: Message, version: Version): CloudEvent {
102+
const headers = message.headers;
103+
let body = message.body;
104+
105+
if (!headers) throw new ValidationError("headers is null or undefined");
106+
if (body) {
107+
isStringOrObjectOrThrow(body, new ValidationError("payload must be an object or a string"));
108+
}
109+
110+
if (
111+
headers[CONSTANTS.CE_HEADERS.SPEC_VERSION] &&
112+
headers[CONSTANTS.CE_HEADERS.SPEC_VERSION] !== Version.V03 &&
113+
headers[CONSTANTS.CE_HEADERS.SPEC_VERSION] !== Version.V1
114+
) {
115+
throw new ValidationError(`invalid spec version ${headers[CONSTANTS.CE_HEADERS.SPEC_VERSION]}`);
116+
}
117+
118+
body = isString(body) && isBase64(body) ? Buffer.from(body as string, "base64").toString() : body;
119+
120+
// Clone and low case all headers names
121+
const sanitizedHeaders = validate(headers);
122+
123+
const eventObj: { [key: string]: unknown | string | Record<string, unknown> } = {};
124+
const parserMap: Record<string, MappedParser> = version === Version.V1 ? v1binaryParsers : v1binaryParsers;
125+
126+
for (const header in parserMap) {
127+
if (sanitizedHeaders[header]) {
128+
const mappedParser: MappedParser = parserMap[header];
129+
eventObj[mappedParser.name] = mappedParser.parser.parse(sanitizedHeaders[header]);
130+
delete sanitizedHeaders[header];
131+
}
132+
}
133+
134+
let parsedPayload;
135+
136+
if (body) {
137+
const parser = parserByContentType[eventObj.datacontenttype as string];
138+
if (!parser) {
139+
throw new ValidationError(`no parser found for content type ${eventObj.datacontenttype}`);
140+
}
141+
parsedPayload = parser.parse(body);
142+
}
143+
144+
// Every unprocessed header can be an extension
145+
for (const header in sanitizedHeaders) {
146+
if (header.startsWith(CONSTANTS.EXTENSIONS_PREFIX)) {
147+
eventObj[header.substring(CONSTANTS.EXTENSIONS_PREFIX.length)] = headers[header];
148+
}
149+
}
150+
// At this point, if the datacontenttype is application/json and the datacontentencoding is base64
151+
// then the data has already been decoded as a string, then parsed as JSON. We don't need to have
152+
// the datacontentencoding property set - in fact, it's incorrect to do so.
153+
if (eventObj.datacontenttype === CONSTANTS.MIME_JSON && eventObj.datacontentencoding === CONSTANTS.ENCODING_BASE64) {
154+
delete eventObj.datacontentencoding;
155+
}
156+
157+
const cloudevent = new CloudEvent({ ...eventObj, data: parsedPayload } as CloudEventV1 | CloudEventV03);
158+
validateCloudEvent(cloudevent);
159+
return cloudevent;
160+
}
161+
162+
/**
163+
* Creates a new CloudEvent instance based on the provided payload and headers.
164+
*
165+
* @param {Message} message the incoming Message
166+
* @param {Version} version the spec version of this message (v1 or v03)
167+
* @returns {CloudEvent} a new CloudEvent instance for the provided headers and payload
168+
* @throws {ValidationError} if the payload and header combination do not conform to the spec
169+
*/
170+
function parseStructured(message: Message, version: Version): CloudEvent {
171+
let payload = message.body;
172+
const headers = message.headers;
173+
174+
if (!payload) throw new ValidationError("payload is null or undefined");
175+
if (!headers) throw new ValidationError("headers is null or undefined");
176+
isStringOrObjectOrThrow(payload, new ValidationError("payload must be an object or a string"));
177+
178+
if (
179+
headers[CONSTANTS.CE_HEADERS.SPEC_VERSION] &&
180+
headers[CONSTANTS.CE_HEADERS.SPEC_VERSION] != Version.V03 &&
181+
headers[CONSTANTS.CE_HEADERS.SPEC_VERSION] != Version.V1
182+
) {
183+
throw new ValidationError(`invalid spec version ${headers[CONSTANTS.CE_HEADERS.SPEC_VERSION]}`);
184+
}
185+
186+
payload = isString(payload) && isBase64(payload) ? Buffer.from(payload as string, "base64").toString() : payload;
187+
188+
// Clone and low case all headers names
189+
const sanitizedHeaders = sanitize(headers);
190+
191+
const contentType = sanitizedHeaders[CONSTANTS.HEADER_CONTENT_TYPE];
192+
const parser: Parser = contentType ? parserByContentType[contentType] : new JSONParser();
193+
if (!parser) throw new ValidationError(`invalid content type ${sanitizedHeaders[CONSTANTS.HEADER_CONTENT_TYPE]}`);
194+
const incoming = { ...(parser.parse(payload) as Record<string, unknown>) };
195+
196+
const eventObj: { [key: string]: unknown } = {};
197+
const parserMap: Record<string, MappedParser> = version === Version.V1 ? v1structuredParsers : v03structuredParsers;
198+
199+
for (const key in parserMap) {
200+
const property = incoming[key];
201+
if (property) {
202+
const parser: MappedParser = parserMap[key];
203+
eventObj[parser.name] = parser.parser.parse(property as string);
204+
}
205+
delete incoming[key];
206+
}
207+
208+
// extensions are what we have left after processing all other properties
209+
for (const key in incoming) {
210+
eventObj[key] = incoming[key];
211+
}
212+
213+
// ensure data content is correctly encoded
214+
if (eventObj.data && eventObj.datacontentencoding) {
215+
if (eventObj.datacontentencoding === CONSTANTS.ENCODING_BASE64 && !isBase64(eventObj.data)) {
216+
throw new ValidationError("invalid payload");
217+
} else if (eventObj.datacontentencoding === CONSTANTS.ENCODING_BASE64) {
218+
const dataParser = new Base64Parser();
219+
eventObj.data = JSON.parse(dataParser.parse(eventObj.data as string));
220+
delete eventObj.datacontentencoding;
221+
}
222+
}
223+
224+
const cloudevent = new CloudEvent(eventObj as CloudEventV1 | CloudEventV03);
225+
226+
// Validates the event
227+
validateCloudEvent(cloudevent);
228+
return cloudevent;
229+
}

0 commit comments

Comments
 (0)