Skip to content

Commit 3eacd33

Browse files
committed
[ws-manager-bridge] Emulate InstanceUpdates for 'Preparing' phase to decouple messagebus
1 parent e212a85 commit 3eacd33

File tree

11 files changed

+143
-4
lines changed

11 files changed

+143
-4
lines changed

chart/templates/ws-manager-bridge-configmap.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ data:
3131
"stoppingPhaseSeconds": 3600,
3232
"unknownPhaseSeconds": 600
3333
},
34+
"emulatePreparingIntervalSeconds": "10",
3435
"staticBridges": {{ index (include "ws-manager-list" (dict "root" . "gp" $.Values "comp" .Values.components.server) | fromYaml) "manager" | default list | toJson }}
3536
}
3637
{{- end -}}

components/gitpod-db/src/typeorm/entity/db-workspace-instance.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { Transformer } from "../transformer";
1313

1414
@Entity()
1515
@Index("ind_find_wsi_ws_in_period", ['workspaceId', 'startedTime', 'stoppedTime']) // findInstancesWithWorkspaceInPeriod
16+
@Index("ind_phasePersisted_region", ['phasePersisted', 'region']) // findInstancesByPhaseAndRegion
1617
// on DB but not Typeorm: @Index("ind_lastModified", ["_lastModified"]) // DBSync
1718
export class DBWorkspaceInstance implements WorkspaceInstance {
1819
@PrimaryColumn(TypeORM.UUID_COLUMN_TYPE)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* Copyright (c) 2022 Gitpod GmbH. All rights reserved.
3+
* Licensed under the GNU Affero General Public License (AGPL).
4+
* See License-AGPL.txt in the project root for license information.
5+
*/
6+
7+
import {MigrationInterface, QueryRunner} from "typeorm";
8+
import { indexExists } from "./helper/helper";
9+
10+
const TABLE_NAME = "d_b_workspace_instance";
11+
const INDEX_NAME = "ind_phasePersisted_region";
12+
13+
export class InstancesByPhaseAndRegion1645019483643 implements MigrationInterface {
14+
15+
public async up(queryRunner: QueryRunner): Promise<void> {
16+
if(!(await indexExists(queryRunner, TABLE_NAME, INDEX_NAME))) {
17+
await queryRunner.query(`CREATE INDEX ${INDEX_NAME} ON ${TABLE_NAME} (phasePersisted, region)`);
18+
}
19+
}
20+
21+
public async down(queryRunner: QueryRunner): Promise<void> {
22+
}
23+
24+
}

components/gitpod-db/src/typeorm/workspace-db-impl.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,15 @@ export abstract class AbstractTypeORMWorkspaceDBImpl implements WorkspaceDB {
852852
return <WorkspaceAndInstance>(res);
853853
}
854854

855+
async findInstancesByPhaseAndRegion(phase: string, region: string): Promise<WorkspaceInstance[]> {
856+
const repo = await this.getWorkspaceInstanceRepo();
857+
// uses index: ind_phasePersisted_region
858+
const qb = repo.createQueryBuilder("wsi")
859+
.where("wsi.phasePersisted = :phase", { phase })
860+
.andWhere("wsi.region = :region", { region });
861+
return qb.getMany();
862+
}
863+
855864
async findPrebuiltWorkspacesByProject(projectId: string, branch?: string, limit?: number): Promise<PrebuiltWorkspace[]> {
856865
const repo = await this.getPrebuiltWorkspaceRepo();
857866

components/gitpod-db/src/workspace-db.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ export interface WorkspaceDB {
8383
findAllWorkspaces(offset: number, limit: number, orderBy: keyof Workspace, orderDir: "ASC" | "DESC", ownerId?: string, searchTerm?: string, minCreationTime?: Date, maxCreationDateTime?: Date, type?: WorkspaceType): Promise<{ total: number, rows: Workspace[] }>;
8484
findAllWorkspaceAndInstances(offset: number, limit: number, orderBy: keyof WorkspaceAndInstance, orderDir: "ASC" | "DESC", query?: AdminGetWorkspacesQuery, searchTerm?: string): Promise<{ total: number, rows: WorkspaceAndInstance[] }>;
8585
findWorkspaceAndInstance(id: string): Promise<WorkspaceAndInstance | undefined>;
86+
findInstancesByPhaseAndRegion(phase: string, region: string): Promise<WorkspaceInstance[]>;
8687

8788
findAllWorkspaceInstances(offset: number, limit: number, orderBy: keyof WorkspaceInstance, orderDir: "ASC" | "DESC", ownerId?: string, minCreationTime?: Date, maxCreationTime?: Date, onlyRunning?: boolean, type?: WorkspaceType): Promise<{ total: number, rows: WorkspaceInstance[] }>;
8889

components/installer/pkg/components/ws-manager-bridge/configmap.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ func configmap(ctx *common.RenderContext) ([]runtime.Object, error) {
3030
StoppingPhaseSeconds: 3600,
3131
UnknownPhaseSeconds: 600,
3232
},
33-
StaticBridges: WSManagerList(),
33+
EmulatePreparingIntervalSeconds: 10,
34+
StaticBridges: WSManagerList(),
3435
}
3536

3637
fc, err := common.ToJSONString(wsmbcfg)

components/installer/pkg/components/ws-manager-bridge/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type Configuration struct {
1313
ControllerIntervalSeconds int32 `json:"controllerIntervalSeconds"`
1414
ControllerMaxDisconnectSeconds int32 `json:"controllerMaxDisconnectSeconds"`
1515
MaxTimeToRunningPhaseSeconds int32 `json:"maxTimeToRunningPhaseSeconds"`
16+
EmulatePreparingIntervalSeconds int32 `json:"emulatePreparingIntervalSeconds"`
1617
Timeouts Timeouts `json:"timeouts"`
1718
}
1819

components/ws-manager-bridge/src/bridge.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import { inject, injectable } from "inversify";
88
import { MessageBusIntegration } from "./messagebus-integration";
9-
import { Disposable, WorkspaceInstance, Queue, WorkspaceInstancePort, PortVisibility, RunningWorkspaceInfo } from "@gitpod/gitpod-protocol";
9+
import { Disposable, WorkspaceInstance, Queue, WorkspaceInstancePort, PortVisibility, RunningWorkspaceInfo, DisposableCollection } from "@gitpod/gitpod-protocol";
1010
import { WorkspaceStatus, WorkspacePhase, GetWorkspacesRequest, WorkspaceConditionBool, PortVisibility as WsManPortVisibility, WorkspaceType, PromisifiedWorkspaceManagerClient } from "@gitpod/ws-manager/lib";
1111
import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db";
1212
import { UserDB } from "@gitpod/gitpod-db/lib/user-db";
@@ -20,6 +20,7 @@ import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb";
2020
import { Configuration } from "./config";
2121
import { WorkspaceCluster } from "@gitpod/gitpod-protocol/lib/workspace-cluster";
2222
import { repeat } from "@gitpod/gitpod-protocol/lib/util/repeat";
23+
import { PreparingUpdateEmulator } from "./preparing-update-emulator";
2324

2425
export const WorkspaceManagerBridgeFactory = Symbol("WorkspaceManagerBridgeFactory");
2526

@@ -53,7 +54,7 @@ export class WorkspaceManagerBridge implements Disposable {
5354
@inject(IAnalyticsWriter)
5455
protected readonly analytics: IAnalyticsWriter;
5556

56-
protected readonly disposables: Disposable[] = [];
57+
protected readonly disposables = new DisposableCollection();
5758
protected readonly queues = new Map<string, Queue>();
5859

5960
protected cluster: WorkspaceClusterInfo;
@@ -85,6 +86,11 @@ export class WorkspaceManagerBridge implements Disposable {
8586
// _DO NOT_ update the DB (another bridge is responsible for that)
8687
// Still, listen to all updates, generate/derive new state and distribute it locally!
8788
startStatusUpdateHandler(false);
89+
90+
// emulate WorkspaceInstance updates for all Workspaces in the "preparing" phase in this cluster
91+
const updateEmulator = new PreparingUpdateEmulator();
92+
this.disposables.push(updateEmulator);
93+
updateEmulator.start(cluster.name);
8894
}
8995
log.info(`started bridge to cluster.`, logPayload);
9096
}
@@ -366,7 +372,7 @@ export class WorkspaceManagerBridge implements Disposable {
366372
}
367373

368374
public dispose() {
369-
this.disposables.forEach(d => d.dispose());
375+
this.disposables.dispose();
370376
}
371377

372378
}

components/ws-manager-bridge/src/config.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,7 @@ export interface Configuration {
3636
stoppingPhaseSeconds: number;
3737
unknownPhaseSeconds: number;
3838
}
39+
40+
// emulatePreparingIntervalSeconds configures how often we check for Workspaces in phase "preparing" for clusters we do not govern
41+
emulatePreparingIntervalSeconds: number;
3942
}

components/ws-manager-bridge/src/container-module.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { newAnalyticsWriterFromEnv } from '@gitpod/gitpod-protocol/lib/util/anal
2525
import { MetaInstanceController } from './meta-instance-controller';
2626
import { IClientCallMetrics } from '@gitpod/content-service/lib/client-call-metrics';
2727
import { PrometheusClientCallMetrics } from "@gitpod/gitpod-protocol/lib/messaging/client-call-metrics";
28+
import { PreparingUpdateEmulator } from './preparing-update-emulator';
2829

2930
export const containerModule = new ContainerModule(bind => {
3031

@@ -68,4 +69,6 @@ export const containerModule = new ContainerModule(bind => {
6869
}).inSingletonScope();
6970

7071
bind(IAnalyticsWriter).toDynamicValue(newAnalyticsWriterFromEnv).inSingletonScope();
72+
73+
bind(PreparingUpdateEmulator).toSelf().inSingletonScope();
7174
});
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* Copyright (c) 2020 Gitpod GmbH. All rights reserved.
3+
* Licensed under the GNU Affero General Public License (AGPL).
4+
* See License-AGPL.txt in the project root for license information.
5+
*/
6+
import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db";
7+
import { Disposable, DisposableCollection, WorkspaceInstance } from "@gitpod/gitpod-protocol";
8+
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
9+
import { repeat } from "@gitpod/gitpod-protocol/lib/util/repeat";
10+
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
11+
import { inject, injectable } from "inversify";
12+
import { Configuration } from "./config";
13+
import { MessageBusIntegration } from "./messagebus-integration";
14+
import { GarbageCollectedCache } from "@gitpod/gitpod-protocol/lib/util/garbage-collected-cache";
15+
import * as crypto from 'crypto';
16+
17+
interface CacheEntry {
18+
instance: WorkspaceInstance,
19+
userId: string,
20+
hash: string,
21+
}
22+
23+
/**
24+
* The purpose of this class is to emulate WorkspaceInstance updates for workspaces instances that are not governed by this bridge.
25+
* It does so by polling the DB for the specific region, and if anything changed, push that update into the local messagebus.
26+
* This is a work-around to enable decoupling cross-cluster messagebus instances from each other.
27+
*/
28+
@injectable()
29+
export class PreparingUpdateEmulator implements Disposable {
30+
31+
@inject(Configuration) protected readonly config: Configuration;
32+
@inject(WorkspaceDB) protected readonly workspaceDb: WorkspaceDB;
33+
@inject(MessageBusIntegration) protected readonly messagebus: MessageBusIntegration;
34+
35+
36+
protected readonly cachedResponses = new GarbageCollectedCache<CacheEntry>(600, 150);
37+
protected readonly disposables = new DisposableCollection();
38+
39+
start(region: string) {
40+
this.disposables.push(
41+
repeat(async () => {
42+
const span = TraceContext.startSpan("preparingUpdateEmulatorRun");
43+
const ctx = {span};
44+
try {
45+
const instances = await this.workspaceDb.findInstancesByPhaseAndRegion("preparing", region);
46+
span.setTag("preparingUpdateEmulatorRun.nrOfInstances", instances.length);
47+
for (const instance of instances) {
48+
const hash = hasher(instance);
49+
const entry = this.cachedResponses.get(instance.id);
50+
if (entry && entry.hash === hash) {
51+
continue;
52+
}
53+
54+
let userId = entry?.userId;
55+
if (!userId) {
56+
const ws = await this.workspaceDb.findById(instance.workspaceId);
57+
if (!ws) {
58+
log.debug({ instanceId: instance.id, workspaceId: instance.workspaceId }, "no workspace found for workspace instance");
59+
continue;
60+
}
61+
userId = ws.ownerId;
62+
}
63+
64+
await this.messagebus.notifyOnInstanceUpdate(ctx, userId, instance);
65+
this.cachedResponses.set(instance.id, {
66+
instance,
67+
hash,
68+
userId,
69+
});
70+
}
71+
} catch (err) {
72+
TraceContext.setError(ctx, err);
73+
} finally {
74+
span.finish();
75+
}
76+
}, this.config.emulatePreparingIntervalSeconds * 1000)
77+
);
78+
}
79+
80+
dispose() {
81+
this.disposables.dispose();
82+
}
83+
}
84+
85+
function hasher(o: {}): string {
86+
return crypto.createHash('md5')
87+
.update(JSON.stringify(o))
88+
.digest('hex');
89+
}

0 commit comments

Comments
 (0)