Skip to content

Commit a4a8701

Browse files
committed
[bridge] Extract WorkspaceInstanceController
1 parent d439d81 commit a4a8701

File tree

3 files changed

+326
-246
lines changed

3 files changed

+326
-246
lines changed

components/ws-manager-bridge/src/bridge.ts

Lines changed: 12 additions & 246 deletions
Original file line numberDiff line numberDiff line change
@@ -8,35 +8,31 @@ import { inject, injectable, interfaces } from "inversify";
88
import { MessageBusIntegration } from "./messagebus-integration";
99
import {
1010
Disposable,
11-
WorkspaceInstance,
1211
Queue,
1312
WorkspaceInstancePort,
1413
PortVisibility,
15-
RunningWorkspaceInfo,
1614
DisposableCollection,
1715
} from "@gitpod/gitpod-protocol";
1816
import {
1917
WorkspaceStatus,
2018
WorkspacePhase,
21-
GetWorkspacesRequest,
2219
WorkspaceConditionBool,
2320
PortVisibility as WsManPortVisibility,
2421
} from "@gitpod/ws-manager/lib";
2522
import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db";
26-
import { UserDB } from "@gitpod/gitpod-db/lib/user-db";
2723
import { log, LogContext } from "@gitpod/gitpod-protocol/lib/util/logging";
2824
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
2925
import { IAnalyticsWriter } from "@gitpod/gitpod-protocol/lib/analytics";
30-
import { TracedWorkspaceDB, TracedUserDB, DBWithTracing } from "@gitpod/gitpod-db/lib/traced-db";
26+
import { TracedWorkspaceDB, DBWithTracing } from "@gitpod/gitpod-db/lib/traced-db";
3127
import { PrometheusMetricsExporter } from "./prometheus-metrics-exporter";
3228
import { ClientProvider, WsmanSubscriber } from "./wsman-subscriber";
3329
import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb";
3430
import { Configuration } from "./config";
3531
import { WorkspaceCluster } from "@gitpod/gitpod-protocol/lib/workspace-cluster";
36-
import { repeat } from "@gitpod/gitpod-protocol/lib/util/repeat";
3732
import { PreparingUpdateEmulator, PreparingUpdateEmulatorFactory } from "./preparing-update-emulator";
3833
import { performance } from "perf_hooks";
3934
import { PrebuildUpdater } from "./prebuild-updater";
35+
import { WorkspaceInstanceController } from "./workspace-instance-controller";
4036

4137
export const WorkspaceManagerBridgeFactory = Symbol("WorkspaceManagerBridgeFactory");
4238

@@ -55,9 +51,6 @@ export class WorkspaceManagerBridge implements Disposable {
5551
@inject(TracedWorkspaceDB)
5652
protected readonly workspaceDB: DBWithTracing<WorkspaceDB>;
5753

58-
@inject(TracedUserDB)
59-
protected readonly userDB: DBWithTracing<UserDB>;
60-
6154
@inject(MessageBusIntegration)
6255
protected readonly messagebus: MessageBusIntegration;
6356

@@ -76,6 +69,9 @@ export class WorkspaceManagerBridge implements Disposable {
7669
@inject(PrebuildUpdater)
7770
protected readonly prebuildUpdater: PrebuildUpdater;
7871

72+
@inject(WorkspaceInstanceController)
73+
protected readonly workspaceInstanceController: WorkspaceInstanceController; // bound in "transient" mode: we expect to receive a fresh instance here
74+
7975
protected readonly disposables = new DisposableCollection();
8076
protected readonly queues = new Map<string, Queue>();
8177

@@ -105,7 +101,12 @@ export class WorkspaceManagerBridge implements Disposable {
105101

106102
log.debug(`Starting controller: ${cluster.name}`, logPayload);
107103
// Control all workspace instances, either against ws-manager or configured timeouts
108-
this.startController(clientProvider, controllerIntervalSeconds, this.config.controllerMaxDisconnectSeconds);
104+
this.workspaceInstanceController.start(
105+
cluster.name,
106+
clientProvider,
107+
controllerIntervalSeconds,
108+
this.config.controllerMaxDisconnectSeconds,
109+
);
109110
} else {
110111
// _DO NOT_ update the DB (another bridge is responsible for that)
111112
// Still, listen to all updates, generate/derive new state and distribute it locally!
@@ -380,7 +381,7 @@ export class WorkspaceManagerBridge implements Disposable {
380381
// yet. Just for this case we need to set it now.
381382
instance.stoppingTime = now;
382383
}
383-
lifecycleHandler = () => this.onInstanceStopped({ span }, userId, instance);
384+
lifecycleHandler = () => this.workspaceInstanceController.onStopped({ span }, userId, instance);
384385
break;
385386
}
386387

@@ -432,237 +433,6 @@ export class WorkspaceManagerBridge implements Disposable {
432433
}
433434
}
434435

435-
protected startController(
436-
clientProvider: ClientProvider,
437-
controllerIntervalSeconds: number,
438-
controllerMaxDisconnectSeconds: number,
439-
) {
440-
let disconnectStarted = Number.MAX_SAFE_INTEGER;
441-
this.disposables.push(
442-
repeat(async () => {
443-
const span = TraceContext.startSpan("controlInstances");
444-
const ctx = { span };
445-
try {
446-
const installation = this.cluster.name;
447-
log.debug("Controlling instances...", { installation });
448-
449-
const nonStoppedInstances = await this.workspaceDB
450-
.trace(ctx)
451-
.findRunningInstancesWithWorkspaces(installation, undefined, true);
452-
453-
// Control running workspace instances against ws-manager
454-
try {
455-
await this.controlNonStoppedWSManagerManagedInstances(
456-
ctx,
457-
nonStoppedInstances,
458-
clientProvider,
459-
this.config.timeouts.pendingPhaseSeconds,
460-
this.config.timeouts.stoppingPhaseSeconds,
461-
);
462-
463-
disconnectStarted = Number.MAX_SAFE_INTEGER; // Reset disconnect period
464-
} catch (err) {
465-
if (durationLongerThanSeconds(disconnectStarted, controllerMaxDisconnectSeconds)) {
466-
log.warn("Error while controlling installation's workspaces", err, {
467-
installation: this.cluster.name,
468-
});
469-
} else if (disconnectStarted > Date.now()) {
470-
disconnectStarted = Date.now();
471-
}
472-
}
473-
474-
// Control workspace instances against timeouts
475-
await this.controlInstancesTimeouts(ctx, nonStoppedInstances);
476-
477-
log.debug("Done controlling instances.", { installation });
478-
} catch (err) {
479-
TraceContext.setError(ctx, err);
480-
log.error("Error while controlling installation's workspaces", err, {
481-
installation: this.cluster.name,
482-
});
483-
} finally {
484-
span.finish();
485-
}
486-
}, controllerIntervalSeconds * 1000),
487-
);
488-
}
489-
490-
/**
491-
* This methods controls all instances that we have currently marked as "running" in the DB.
492-
* It checks whether they are still running with their respective ws-manager, and if not, marks them as stopped in the DB.
493-
*/
494-
protected async controlNonStoppedWSManagerManagedInstances(
495-
parentCtx: TraceContext,
496-
runningInstances: RunningWorkspaceInfo[],
497-
clientProvider: ClientProvider,
498-
pendingPhaseSeconds: number,
499-
stoppingPhaseSeconds: number,
500-
) {
501-
const installation = this.config.installation;
502-
503-
const span = TraceContext.startSpan("controlNonStoppedWSManagerManagedInstances", parentCtx);
504-
const ctx = { span };
505-
try {
506-
log.debug("Controlling non-stopped instances that are managed by WS Manager...", { installation });
507-
508-
const runningInstancesIdx = new Map<string, RunningWorkspaceInfo>();
509-
runningInstances.forEach((i) => runningInstancesIdx.set(i.latestInstance.id, i));
510-
511-
const client = await clientProvider();
512-
const actuallyRunningInstances = await client.getWorkspaces(ctx, new GetWorkspacesRequest());
513-
actuallyRunningInstances.getStatusList().forEach((s) => runningInstancesIdx.delete(s.getId()));
514-
515-
// runningInstancesIdx only contains instances that ws-manager is not aware of
516-
for (const [instanceId, ri] of runningInstancesIdx.entries()) {
517-
const instance = ri.latestInstance;
518-
const phase = instance.status.phase;
519-
520-
// When ws-manager is not aware of the following instances outside of the timeout duration,
521-
// they should be marked as stopped.
522-
// pending states timeout is 1 hour after creationTime.
523-
// stopping states timeout is 1 hour after stoppingTime.
524-
if (
525-
phase === "running" ||
526-
(phase === "pending" &&
527-
durationLongerThanSeconds(Date.parse(instance.creationTime), pendingPhaseSeconds)) ||
528-
(phase === "stopping" &&
529-
instance.stoppingTime &&
530-
durationLongerThanSeconds(Date.parse(instance.stoppingTime), stoppingPhaseSeconds))
531-
) {
532-
log.info(
533-
{ instanceId, workspaceId: instance.workspaceId },
534-
"Database says the instance is present, but ws-man does not know about it. Marking as stopped in database.",
535-
{ installation, phase },
536-
);
537-
await this.markWorkspaceInstanceAsStopped(ctx, ri, new Date());
538-
continue;
539-
}
540-
541-
log.debug({ instanceId }, "Skipping instance", {
542-
phase: phase,
543-
creationTime: instance.creationTime,
544-
region: instance.region,
545-
});
546-
}
547-
548-
log.debug("Done controlling running instances.", { installation });
549-
} catch (err) {
550-
TraceContext.setError(ctx, err);
551-
throw err; // required by caller
552-
}
553-
}
554-
555-
/**
556-
* This methods controls all instances of this installation during periods where ws-manager does not control them, but we have them in our DB.
557-
* These currently are:
558-
* - preparing
559-
* - building
560-
* It also covers these phases, as fallback, when - for whatever reason - we no longer receive updates from ws-manager.
561-
* - unknown (fallback)
562-
*/
563-
protected async controlInstancesTimeouts(parentCtx: TraceContext, runningInstances: RunningWorkspaceInfo[]) {
564-
const installation = this.config.installation;
565-
566-
const span = TraceContext.startSpan("controlDBInstances", parentCtx);
567-
const ctx = { span };
568-
try {
569-
log.debug("Controlling DB instances...", { installation });
570-
571-
await Promise.all(runningInstances.map((info) => this.controlInstanceTimeouts(ctx, info)));
572-
573-
log.debug("Done controlling DB instances.", { installation });
574-
} catch (err) {
575-
log.error("Error while running controlDBInstances", err, {
576-
installation: this.cluster.name,
577-
});
578-
TraceContext.setError(ctx, err);
579-
} finally {
580-
span.finish();
581-
}
582-
}
583-
584-
protected async controlInstanceTimeouts(parentCtx: TraceContext, info: RunningWorkspaceInfo) {
585-
const logContext: LogContext = {
586-
userId: info.workspace.ownerId,
587-
workspaceId: info.workspace.id,
588-
instanceId: info.latestInstance.id,
589-
};
590-
const ctx = TraceContext.childContext("controlDBInstance", parentCtx);
591-
try {
592-
const now = Date.now();
593-
const creationTime = new Date(info.latestInstance.creationTime).getTime();
594-
const timedOutInPreparing = now >= creationTime + this.config.timeouts.preparingPhaseSeconds * 1000;
595-
const timedOutInBuilding = now >= creationTime + this.config.timeouts.buildingPhaseSeconds * 1000;
596-
const timedOutInUnknown = now >= creationTime + this.config.timeouts.unknownPhaseSeconds * 1000;
597-
const currentPhase = info.latestInstance.status.phase;
598-
599-
log.debug(logContext, "Controller: Checking for instances in the DB to mark as stopped", {
600-
creationTime,
601-
timedOutInPreparing,
602-
currentPhase,
603-
});
604-
605-
if (
606-
(currentPhase === "preparing" && timedOutInPreparing) ||
607-
(currentPhase === "building" && timedOutInBuilding) ||
608-
(currentPhase === "unknown" && timedOutInUnknown)
609-
) {
610-
log.info(logContext, "Controller: Marking workspace instance as stopped", {
611-
creationTime,
612-
currentPhase,
613-
});
614-
await this.markWorkspaceInstanceAsStopped(ctx, info, new Date(now));
615-
}
616-
} catch (err) {
617-
log.warn(logContext, "Controller: Error while marking workspace instance as stopped", err);
618-
TraceContext.setError(ctx, err);
619-
} finally {
620-
ctx.span.finish();
621-
}
622-
}
623-
624-
protected async markWorkspaceInstanceAsStopped(ctx: TraceContext, info: RunningWorkspaceInfo, now: Date) {
625-
const nowISO = now.toISOString();
626-
if (!info.latestInstance.stoppingTime) {
627-
info.latestInstance.stoppingTime = nowISO;
628-
}
629-
info.latestInstance.stoppedTime = nowISO;
630-
info.latestInstance.status.message = `Stopped by ws-manager-bridge. Previously in phase ${info.latestInstance.status.phase}`;
631-
this.prometheusExporter.increaseInstanceMarkedStoppedCounter(info.latestInstance.status.phase);
632-
info.latestInstance.status.phase = "stopped";
633-
await this.workspaceDB.trace(ctx).storeInstance(info.latestInstance);
634-
635-
// cleanup
636-
// important: call this after the DB update
637-
await this.onInstanceStopped(ctx, info.workspace.ownerId, info.latestInstance);
638-
639-
await this.messagebus.notifyOnInstanceUpdate(ctx, info.workspace.ownerId, info.latestInstance);
640-
await this.prebuildUpdater.stopPrebuildInstance(ctx, info.latestInstance);
641-
}
642-
643-
protected async onInstanceStopped(
644-
ctx: TraceContext,
645-
ownerUserID: string,
646-
instance: WorkspaceInstance,
647-
): Promise<void> {
648-
const span = TraceContext.startSpan("onInstanceStopped", ctx);
649-
650-
try {
651-
await this.userDB.trace({ span }).deleteGitpodTokensNamedLike(ownerUserID, `${instance.id}-%`);
652-
this.analytics.track({
653-
userId: ownerUserID,
654-
event: "workspace_stopped",
655-
messageId: `bridge-wsstopped-${instance.id}`,
656-
properties: { instanceId: instance.id, workspaceId: instance.workspaceId },
657-
});
658-
} catch (err) {
659-
TraceContext.setError({ span }, err);
660-
throw err;
661-
} finally {
662-
span.finish();
663-
}
664-
}
665-
666436
public dispose() {
667437
this.disposables.dispose();
668438
}
@@ -687,10 +457,6 @@ const mapPortVisibility = (visibility: WsManPortVisibility | undefined): PortVis
687457
}
688458
};
689459

690-
const durationLongerThanSeconds = (time: number, durationSeconds: number, now: number = Date.now()) => {
691-
return (now - time) / 1000 > durationSeconds;
692-
};
693-
694460
/**
695461
* Filter here to avoid overloading spans
696462
* @param status

components/ws-manager-bridge/src/container-module.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import { DebugApp } from "@gitpod/gitpod-protocol/lib/util/debug-app";
3838
import { Client } from "@gitpod/gitpod-protocol/lib/experiments/types";
3939
import { getExperimentsClientForBackend } from "@gitpod/gitpod-protocol/lib/experiments/configcat-server";
4040
import { ClusterSyncService } from "./cluster-sync-service";
41+
import { WorkspaceInstanceController, WorkspaceInstanceControllerImpl } from "./workspace-instance-controller";
4142

4243
export const containerModule = new ContainerModule((bind) => {
4344
bind(MessagebusConfiguration).toSelf().inSingletonScope();
@@ -91,4 +92,7 @@ export const containerModule = new ContainerModule((bind) => {
9192
bind(DebugApp).toSelf().inSingletonScope();
9293

9394
bind(Client).toDynamicValue(getExperimentsClientForBackend).inSingletonScope();
95+
96+
// transient to make sure we're creating a separate instance every time we ask for it
97+
bind(WorkspaceInstanceController).to(WorkspaceInstanceControllerImpl).inTransientScope();
9498
});

0 commit comments

Comments
 (0)