Skip to content

Commit 891c2f8

Browse files
Creates eventarc channel in before deploying custom event function (#5226)
* Ensure eventarc channels exist on deploy * create channel before deploying custom event fn * add unit tests * minor comment edits * remove confusing comment about channel name Co-authored-by: Thomas Bouldin <[email protected]>
1 parent 270f419 commit 891c2f8

File tree

5 files changed

+239
-2
lines changed

5 files changed

+239
-2
lines changed

src/api.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ export const dynamicLinksKey = utils.envOverride(
5858
"FIREBASE_DYNAMIC_LINKS_KEY",
5959
"AIzaSyB6PtY5vuiSB8MNgt20mQffkOlunZnHYiQ"
6060
);
61+
export const eventarcOrigin = utils.envOverride("EVENTARC_URL", "https://eventarc.googleapis.com");
6162
export const firebaseApiOrigin = utils.envOverride(
6263
"FIREBASE_API_URL",
6364
"https://firebase.googleapis.com"

src/deploy/functions/release/fabricator.ts

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ import { SourceTokenScraper } from "./sourceTokenScraper";
66
import { Timer } from "./timer";
77
import { assertExhaustive } from "../../../functional";
88
import { getHumanFriendlyRuntimeName } from "../runtimes";
9-
import { functionsOrigin, functionsV2Origin } from "../../../api";
9+
import { eventarcOrigin, functionsOrigin, functionsV2Origin } from "../../../api";
1010
import { logger } from "../../../logger";
1111
import * as args from "../args";
1212
import * as backend from "../backend";
1313
import * as cloudtasks from "../../../gcp/cloudtasks";
1414
import * as deploymentTool from "../../../deploymentTool";
1515
import * as gcf from "../../../gcp/cloudfunctions";
1616
import * as gcfV2 from "../../../gcp/cloudfunctionsv2";
17+
import * as eventarc from "../../../gcp/eventarc";
1718
import * as helper from "../functionsDeployHelper";
1819
import * as planner from "./planner";
1920
import * as poller from "../../../operation-poller";
@@ -41,6 +42,13 @@ const gcfV2PollerOptions: Omit<poller.OperationPollerOptions, "operationResource
4142
maxBackoff: 10_000,
4243
};
4344

45+
const eventarcPollerOptions: Omit<poller.OperationPollerOptions, "operationResourceName"> = {
46+
apiOrigin: eventarcOrigin,
47+
apiVersion: "v1",
48+
masterTimeout: 25 * 60 * 1_000, // 25 minutes is the maximum build time for a function
49+
maxBackoff: 10_000,
50+
};
51+
4452
export interface FabricatorArgs {
4553
executor: Executor;
4654
functionExecutor: Executor;
@@ -296,6 +304,37 @@ export class Fabricator {
296304
.catch(rethrowAs(endpoint, "create topic"));
297305
}
298306

307+
// Like Pub/Sub, GCF requires a channel to exist before allowing the function
308+
// to be created. Like Pub/Sub we currently only support setting the name
309+
// of a channel, so we can do this once during createFunction alone. But if
310+
// Eventarc adds new features that we indulge in (e.g. 2P event providers)
311+
// things will get much more complicated. We'll have to make sure we keep
312+
// up to date on updates, and we will also have to worry about channels leftover
313+
// after deletion possibly incurring bills due to events still being sent.
314+
const channel = apiFunction.eventTrigger?.channel;
315+
if (channel) {
316+
await this.executor
317+
.run(async () => {
318+
try {
319+
const op: { name: string } = await eventarc.createChannel({ name: channel });
320+
return await poller.pollOperation<eventarc.Channel>({
321+
...eventarcPollerOptions,
322+
pollerName: `create-${channel}-${endpoint.region}-${endpoint.id}`,
323+
operationResourceName: op.name,
324+
});
325+
} catch (err: any) {
326+
// if error status is 409, the channel already exists and we can deploy safely
327+
if (err.status === 409) {
328+
return;
329+
}
330+
throw new FirebaseError("Unexpected error creating Eventarc channel", {
331+
original: err as Error,
332+
});
333+
}
334+
})
335+
.catch(rethrowAs(endpoint, "upsert eventarc channel"));
336+
}
337+
299338
const resultFunction = await this.functionExecutor
300339
.run(async () => {
301340
const op: { name: string } = await gcfV2.createFunction(apiFunction);
@@ -571,6 +610,10 @@ export class Fabricator {
571610
} else if (backend.isBlockingTriggered(endpoint)) {
572611
await this.unregisterBlockingTrigger(endpoint);
573612
}
613+
// N.B. Like Pub/Sub topics, we don't delete Eventarc channels because we
614+
// don't know if there are any subscriers or not. If we start supporting 2P
615+
// channels, we might need to revist this or else the events will still get
616+
// published and the customer will still get charged.
574617
}
575618

576619
async upsertScheduleV1(endpoint: backend.Endpoint & backend.ScheduleTriggered): Promise<void> {

src/deploy/functions/release/reporter.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export type OperationType =
2525
| "upsert schedule"
2626
| "delete schedule"
2727
| "upsert task queue"
28+
| "upsert eventarc channel"
2829
| "disable task queue"
2930
| "create topic"
3031
| "delete topic"

src/gcp/eventarc.ts

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import { Client } from "../apiv2";
2+
import { eventarcOrigin } from "../api";
3+
import { last } from "lodash";
4+
import { fieldMasks } from "./proto";
5+
6+
export const API_VERSION = "v1";
7+
8+
export interface Channel {
9+
name: string;
10+
11+
/** Server-assigned uinique identifier. Format is a UUID4 */
12+
uid?: string;
13+
14+
createTime?: string;
15+
updateTime?: string;
16+
17+
/** If set, the channel will grant publish permissions to the 2P provider. */
18+
provider?: string;
19+
20+
// BEGIN oneof transport
21+
pubsubTopic?: string;
22+
// END oneof transport
23+
24+
state?: "PENDING" | "ACTIVE" | "INACTIVE";
25+
26+
/** When the channel is `PENDING`, this token must be sent to the provider */
27+
activationToken?: string;
28+
29+
cryptoKeyName?: string;
30+
}
31+
32+
interface OperationMetadata {
33+
createTime: string;
34+
target: string;
35+
verb: string;
36+
requestedCancellation: boolean;
37+
apiVersion: string;
38+
}
39+
40+
interface Operation {
41+
name: string;
42+
metadata: OperationMetadata;
43+
done: boolean;
44+
}
45+
46+
const client = new Client({
47+
urlPrefix: eventarcOrigin,
48+
auth: true,
49+
apiVersion: API_VERSION,
50+
});
51+
52+
/**
53+
* Gets a Channel.
54+
*/
55+
export async function getChannel(name: string): Promise<Channel | undefined> {
56+
const res = await client.get<Channel>(name);
57+
if (res.status === 404) {
58+
return undefined;
59+
}
60+
return res.body;
61+
}
62+
63+
/**
64+
* Creates a channel.
65+
*/
66+
export async function createChannel(channel: Channel): Promise<Operation> {
67+
// const body: Partial<Channel> = cloneDeep(channel);
68+
const pathParts = channel.name.split("/");
69+
70+
const res = await client.post<Channel, Operation>(pathParts.slice(0, -1).join("/"), channel, {
71+
queryParams: { channelId: last(pathParts)! },
72+
});
73+
return res.body;
74+
}
75+
76+
/**
77+
* Updates a channel to match the new spec.
78+
* Only set fields are updated.
79+
*/
80+
export async function updateChannel(channel: Channel): Promise<Channel> {
81+
const res = await client.put<Channel, Channel>(channel.name, channel, {
82+
queryParams: {
83+
updateMask: fieldMasks(channel).join(","),
84+
},
85+
});
86+
return res.body;
87+
}
88+
89+
/**
90+
* Deletes a channel.
91+
*/
92+
export async function deleteChannel(name: string): Promise<void> {
93+
await client.delete(name);
94+
}

src/test/deploy/functions/release/fabricator.spec.ts

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import * as reporter from "../../../../deploy/functions/release/reporter";
66
import * as executor from "../../../../deploy/functions/release/executor";
77
import * as gcfNSV2 from "../../../../gcp/cloudfunctionsv2";
88
import * as gcfNS from "../../../../gcp/cloudfunctions";
9+
import * as eventarcNS from "../../../../gcp/eventarc";
910
import * as pollerNS from "../../../../operation-poller";
1011
import * as pubsubNS from "../../../../gcp/pubsub";
1112
import * as schedulerNS from "../../../../gcp/cloudscheduler";
@@ -24,6 +25,7 @@ describe("Fabricator", () => {
2425
// Stub all GCP APIs to make sure this test is hermetic
2526
let gcf: sinon.SinonStubbedInstance<typeof gcfNS>;
2627
let gcfv2: sinon.SinonStubbedInstance<typeof gcfNSV2>;
28+
let eventarc: sinon.SinonStubbedInstance<typeof eventarcNS>;
2729
let poller: sinon.SinonStubbedInstance<typeof pollerNS>;
2830
let pubsub: sinon.SinonStubbedInstance<typeof pubsubNS>;
2931
let scheduler: sinon.SinonStubbedInstance<typeof schedulerNS>;
@@ -35,6 +37,7 @@ describe("Fabricator", () => {
3537
beforeEach(() => {
3638
gcf = sinon.stub(gcfNS);
3739
gcfv2 = sinon.stub(gcfNSV2);
40+
eventarc = sinon.stub(eventarcNS);
3841
poller = sinon.stub(pollerNS);
3942
pubsub = sinon.stub(pubsubNS);
4043
scheduler = sinon.stub(schedulerNS);
@@ -58,6 +61,10 @@ describe("Fabricator", () => {
5861
gcfv2.createFunction.rejects(new Error("unexpected gcfv2.createFunction"));
5962
gcfv2.updateFunction.rejects(new Error("unexpected gcfv2.updateFunction"));
6063
gcfv2.deleteFunction.rejects(new Error("unexpected gcfv2.deleteFunction"));
64+
eventarc.createChannel.rejects(new Error("unexpected eventarc.createChannel"));
65+
eventarc.deleteChannel.rejects(new Error("unexpected eventarc.deleteChannel"));
66+
eventarc.getChannel.rejects(new Error("unexpected eventarc.getChannel"));
67+
eventarc.updateChannel.rejects(new Error("unexpected eventarc.updateChannel"));
6168
run.getIamPolicy.rejects(new Error("unexpected run.getIamPolicy"));
6269
run.setIamPolicy.rejects(new Error("unexpected run.setIamPolicy"));
6370
run.setInvokerCreate.rejects(new Error("unexpected run.setInvokerCreate"));
@@ -481,6 +488,97 @@ describe("Fabricator", () => {
481488
);
482489
});
483490

491+
it("handles already existing eventarc channels", async () => {
492+
eventarc.createChannel.callsFake(({ name }) => {
493+
expect(name).to.equal("channel");
494+
const err = new Error("Already exists");
495+
(err as any).status = 409;
496+
return Promise.reject(err);
497+
});
498+
gcfv2.createFunction.resolves({ name: "op", done: false });
499+
poller.pollOperation.resolves({ serviceConfig: { service: "service" } });
500+
501+
const ep = endpoint(
502+
{
503+
eventTrigger: {
504+
eventType: "custom.test.event",
505+
channel: "channel",
506+
retry: false,
507+
},
508+
},
509+
{
510+
platform: "gcfv2",
511+
}
512+
);
513+
514+
await fab.createV2Function(ep);
515+
expect(eventarc.createChannel).to.have.been.called;
516+
expect(gcfv2.createFunction).to.have.been.called;
517+
});
518+
519+
it("creates channels if necessary", async () => {
520+
const channelName = "channel";
521+
eventarc.createChannel.callsFake(({ name }) => {
522+
expect(name).to.equal(channelName);
523+
return Promise.resolve({
524+
name: "op-resource-name",
525+
metadata: {
526+
createTime: "",
527+
target: "",
528+
verb: "",
529+
requestedCancellation: false,
530+
apiVersion: "",
531+
},
532+
done: false,
533+
});
534+
});
535+
gcfv2.createFunction.resolves({ name: "op", done: false });
536+
poller.pollOperation.resolves({ serviceConfig: { service: "service" } });
537+
538+
const ep = endpoint(
539+
{
540+
eventTrigger: {
541+
eventType: "custom.test.event",
542+
channel: channelName,
543+
retry: false,
544+
},
545+
},
546+
{
547+
platform: "gcfv2",
548+
}
549+
);
550+
551+
await fab.createV2Function(ep);
552+
expect(eventarc.createChannel).to.have.been.calledOnceWith({ name: channelName });
553+
expect(poller.pollOperation).to.have.been.called;
554+
});
555+
556+
it("wraps errors thrown while creating channels", async () => {
557+
eventarc.createChannel.callsFake(() => {
558+
const err = new Error("🤷‍♂️");
559+
(err as any).status = 400;
560+
return Promise.reject(err);
561+
});
562+
563+
const ep = endpoint(
564+
{
565+
eventTrigger: {
566+
eventType: "custom.test.event",
567+
channel: "channel",
568+
retry: false,
569+
},
570+
},
571+
{
572+
platform: "gcfv2",
573+
}
574+
);
575+
576+
await expect(fab.createV2Function(ep)).to.eventually.be.rejectedWith(
577+
reporter.DeploymentError,
578+
"upsert eventarc channel"
579+
);
580+
});
581+
484582
it("throws on create function failure", async () => {
485583
gcfv2.createFunction.rejects(new Error("Server failure"));
486584

@@ -1168,7 +1266,7 @@ describe("Fabricator", () => {
11681266
await fab.setTrigger(endpoint({ httpsTrigger: {} }));
11691267
});
11701268

1171-
it("does nothing for event triggers", async () => {
1269+
it("does nothing for event triggers without channels", async () => {
11721270
// all APIs throw by default
11731271
const ep = endpoint({
11741272
eventTrigger: {

0 commit comments

Comments
 (0)