Skip to content

Commit f42b36b

Browse files
committed
[ws-manager-bridge] Make non-governing bridges distribute updates locally
1 parent f6f05da commit f42b36b

File tree

2 files changed

+50
-27
lines changed

2 files changed

+50
-27
lines changed

components/ws-manager-bridge/ee/src/bridge.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
1010
import { WorkspaceStatus, WorkspaceType, WorkspacePhase } from "@gitpod/ws-manager/lib";
1111
import { HeadlessWorkspaceEvent, HeadlessWorkspaceEventType } from "@gitpod/gitpod-protocol/lib/headless-workspace-log";
1212
import { WorkspaceInstance } from "@gitpod/gitpod-protocol";
13-
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
13+
import { log, LogContext } from "@gitpod/gitpod-protocol/lib/util/logging";
1414

1515
@injectable()
1616
export class WorkspaceManagerBridgeEE extends WorkspaceManagerBridge {
@@ -38,15 +38,14 @@ export class WorkspaceManagerBridgeEE extends WorkspaceManagerBridge {
3838
}
3939
}
4040

41-
protected async updatePrebuiltWorkspace(ctx: TraceContext, status: WorkspaceStatus.AsObject) {
41+
protected async updatePrebuiltWorkspace(ctx: TraceContext, userId: string, status: WorkspaceStatus.AsObject, writeToDB: boolean) {
4242
if (status.spec && status.spec.type != WorkspaceType.PREBUILD) {
4343
return;
4444
}
4545

4646
const instanceId = status.id!;
4747
const workspaceId = status.metadata!.metaId!;
48-
const userId = status.metadata!.owner!;
49-
const logCtx = { instanceId, workspaceId, userId };
48+
const logCtx: LogContext = { instanceId, workspaceId, userId };
5049

5150
const span = TraceContext.startSpan("updatePrebuiltWorkspace", ctx);
5251
try {
@@ -56,12 +55,15 @@ export class WorkspaceManagerBridgeEE extends WorkspaceManagerBridge {
5655
TraceContext.setError({span}, new Error("headless workspace without prebuild"));
5756
return
5857
}
58+
span.setTag("updatePrebuiltWorkspace.prebuildId", prebuild.id);
5959

6060
if (prebuild.state === 'queued') {
6161
// We've received an update from ws-man for this workspace, hence it must be running.
6262
prebuild.state = "building";
6363

64-
await this.workspaceDB.trace({span}).storePrebuiltWorkspace(prebuild);
64+
if (writeToDB) {
65+
await this.workspaceDB.trace({span}).storePrebuiltWorkspace(prebuild);
66+
}
6567
await this.messagebus.notifyHeadlessUpdate({span}, userId, workspaceId, <HeadlessWorkspaceEvent>{
6668
type: HeadlessWorkspaceEventType.Started,
6769
workspaceID: workspaceId,
@@ -92,15 +94,19 @@ export class WorkspaceManagerBridgeEE extends WorkspaceManagerBridge {
9294
prebuild.snapshot = status.conditions!.snapshot;
9395
headlessUpdateType = HeadlessWorkspaceEventType.FinishedSuccessfully;
9496
}
95-
await this.workspaceDB.trace({span}).storePrebuiltWorkspace(prebuild);
9697

98+
if (writeToDB) {
99+
await this.workspaceDB.trace({span}).storePrebuiltWorkspace(prebuild);
100+
}
101+
102+
// notify updates
103+
// headless update
97104
await this.messagebus.notifyHeadlessUpdate({span}, userId, workspaceId, <HeadlessWorkspaceEvent>{
98105
type: headlessUpdateType,
99106
workspaceID: workspaceId,
100107
});
101-
}
102108

103-
{ // notify about prebuild updated
109+
// prebuild info
104110
const info = (await this.workspaceDB.trace({span}).findPrebuildInfos([prebuild.id]))[0];
105111
if (info) {
106112
this.messagebus.notifyOnPrebuildUpdate({ info, status: prebuild.state });

components/ws-manager-bridge/src/bridge.ts

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,22 +59,32 @@ export class WorkspaceManagerBridge implements Disposable {
5959
protected cluster: WorkspaceClusterInfo;
6060

6161
public start(cluster: WorkspaceClusterInfo, clientProvider: ClientProvider) {
62-
const logPayload = { name: cluster.name, url: cluster.url };
62+
const logPayload = { name: cluster.name, url: cluster.url, govern: cluster.govern };
6363
log.info(`starting bridge to cluster...`, logPayload);
6464
this.cluster = cluster;
6565

66-
if (cluster.govern) {
67-
log.debug(`starting DB updater: ${cluster.name}`, logPayload);
68-
/* no await */ this.startDatabaseUpdater(clientProvider, logPayload)
66+
const startStatusUpdateHandler = (writeToDB: boolean) => {
67+
log.debug(`starting status update handler: ${cluster.name}`, logPayload);
68+
/* no await */ this.startStatusUpdateHandler(clientProvider, writeToDB, logPayload)
6969
// this is a mere safe-guard: we do not expect the code inside to fail
70-
.catch(err => log.error("cannot start database updater", err));
70+
.catch(err => log.error("cannot start status update handler", err));
71+
};
7172

73+
if (cluster.govern) {
74+
// notify servers and _update the DB_
75+
startStatusUpdateHandler(true);
76+
77+
// the actual "governing" part
7278
const controllerInterval = this.config.controllerIntervalSeconds;
7379
if (controllerInterval <= 0) {
7480
throw new Error("controllerInterval <= 0!");
7581
}
7682
log.debug(`starting controller: ${cluster.name}`, logPayload);
7783
this.startController(clientProvider, controllerInterval, this.config.controllerMaxDisconnectSeconds);
84+
} else {
85+
// _DO NOT_ update the DB (another bridge is responsible for that)
86+
// Still, listen to all updates, generate/derive new state and distribute it locally!
87+
startStatusUpdateHandler(false);
7888
}
7989
log.info(`started bridge to cluster.`, logPayload);
8090
}
@@ -83,15 +93,15 @@ export class WorkspaceManagerBridge implements Disposable {
8393
this.dispose();
8494
}
8595

86-
protected async startDatabaseUpdater(clientProvider: ClientProvider, logPayload: {}): Promise<void> {
96+
protected async startStatusUpdateHandler(clientProvider: ClientProvider, writeToDB: boolean, logPayload: {}): Promise<void> {
8797
const subscriber = new WsmanSubscriber(clientProvider);
8898
this.disposables.push(subscriber);
8999

90100
const onReconnect = (ctx: TraceContext, s: WorkspaceStatus[]) => {
91-
s.forEach(sx => this.serializeMessagesByInstanceId<WorkspaceStatus>(ctx, sx, m => m.getId(), (ctx, msg) => this.handleStatusUpdate(ctx, msg)))
101+
s.forEach(sx => this.serializeMessagesByInstanceId<WorkspaceStatus>(ctx, sx, m => m.getId(), (ctx, msg) => this.handleStatusUpdate(ctx, msg, writeToDB)))
92102
};
93103
const onStatusUpdate = (ctx: TraceContext, s: WorkspaceStatus) => {
94-
this.serializeMessagesByInstanceId<WorkspaceStatus>(ctx, s, msg => msg.getId(), (ctx, s) => this.handleStatusUpdate(ctx, s))
104+
this.serializeMessagesByInstanceId<WorkspaceStatus>(ctx, s, msg => msg.getId(), (ctx, s) => this.handleStatusUpdate(ctx, s, writeToDB))
95105
};
96106
await subscriber.subscribe({ onReconnect, onStatusUpdate }, logPayload);
97107
}
@@ -110,7 +120,7 @@ export class WorkspaceManagerBridge implements Disposable {
110120
this.queues.set(instanceId, q);
111121
}
112122

113-
protected async handleStatusUpdate(ctx: TraceContext, rawStatus: WorkspaceStatus) {
123+
protected async handleStatusUpdate(ctx: TraceContext, rawStatus: WorkspaceStatus, writeToDB: boolean) {
114124
const status = rawStatus.toObject();
115125
if (!status.spec || !status.metadata || !status.conditions) {
116126
log.warn("Received invalid status update", status);
@@ -123,6 +133,7 @@ export class WorkspaceManagerBridge implements Disposable {
123133

124134
const span = TraceContext.startSpan("handleStatusUpdate", ctx);
125135
span.setTag("status", JSON.stringify(filterStatus(status)));
136+
span.setTag("writeToDB", writeToDB);
126137
try {
127138
// Beware of the ID mapping here: What's a workspace to the ws-manager is a workspace instance to the rest of the system.
128139
// The workspace ID of ws-manager is the workspace instance ID in the database.
@@ -246,20 +257,26 @@ export class WorkspaceManagerBridge implements Disposable {
246257
break;
247258
}
248259

249-
await this.updatePrebuiltWorkspace({span}, status);
250-
251260
span.setTag("after", JSON.stringify(instance));
252-
await this.workspaceDB.trace({span}).storeInstance(instance);
253-
await this.messagebus.notifyOnInstanceUpdate({span}, userId, instance);
254261

255-
// important: call this after the DB update
256-
await this.cleanupProbeWorkspace({span}, status);
262+
// now notify all prebuild listeners about updates - and update DB if needed
263+
await this.updatePrebuiltWorkspace({span}, userId, status, writeToDB);
257264

258-
if (!!lifecycleHandler) {
259-
await lifecycleHandler();
265+
if (writeToDB) {
266+
await this.workspaceDB.trace(ctx).storeInstance(instance);
267+
268+
// cleanup
269+
// important: call this after the DB update
270+
await this.cleanupProbeWorkspace(ctx, status);
271+
272+
if (!!lifecycleHandler) {
273+
await lifecycleHandler();
274+
}
260275
}
276+
await this.messagebus.notifyOnInstanceUpdate(ctx, userId, instance);
277+
261278
} catch (e) {
262-
TraceContext.setError({ span }, e);
279+
TraceContext.setError({span}, e);
263280
throw e;
264281
} finally {
265282
span.finish();
@@ -321,7 +338,7 @@ export class WorkspaceManagerBridge implements Disposable {
321338
// probes are an EE feature - we just need the hook here
322339
}
323340

324-
protected async updatePrebuiltWorkspace(ctx: TraceContext, status: WorkspaceStatus.AsObject) {
341+
protected async updatePrebuiltWorkspace(ctx: TraceContext, userId: string, status: WorkspaceStatus.AsObject, writeToDB: boolean) {
325342
// prebuilds are an EE feature - we just need the hook here
326343
}
327344

0 commit comments

Comments
 (0)