Skip to content

[bridge] Extract WorkspaceInstanceController #13866

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions components/ws-manager-bridge/BUILD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ packages:
scripts:
- name: telepresence
script: |-
# next line ensures telepresence can handle the UID restriction in the bridge deployment
# links:
# - PR: https://github.com/telepresenceio/telepresence/pull/1323/files
# - comment mentioning this solution: https://github.com/telepresenceio/telepresence/issues/1309#issuecomment-615312753
export TELEPRESENCE_USE_OCP_IMAGE=1
telepresence --swap-deployment ws-manager-bridge \
--method inject-tcp \
--run node --inspect ./dist/ee/src/index.js | \
leeway run gitpod-core/components:dejson-log-output
--expose 18080:8080 \
--run yarn start-ee
258 changes: 12 additions & 246 deletions components/ws-manager-bridge/src/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,31 @@ import { inject, injectable, interfaces } from "inversify";
import { MessageBusIntegration } from "./messagebus-integration";
import {
Disposable,
WorkspaceInstance,
Queue,
WorkspaceInstancePort,
PortVisibility,
RunningWorkspaceInfo,
DisposableCollection,
} from "@gitpod/gitpod-protocol";
import {
WorkspaceStatus,
WorkspacePhase,
GetWorkspacesRequest,
WorkspaceConditionBool,
PortVisibility as WsManPortVisibility,
} from "@gitpod/ws-manager/lib";
import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db";
import { UserDB } from "@gitpod/gitpod-db/lib/user-db";
import { log, LogContext } from "@gitpod/gitpod-protocol/lib/util/logging";
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
import { IAnalyticsWriter } from "@gitpod/gitpod-protocol/lib/analytics";
import { TracedWorkspaceDB, TracedUserDB, DBWithTracing } from "@gitpod/gitpod-db/lib/traced-db";
import { TracedWorkspaceDB, DBWithTracing } from "@gitpod/gitpod-db/lib/traced-db";
import { PrometheusMetricsExporter } from "./prometheus-metrics-exporter";
import { ClientProvider, WsmanSubscriber } from "./wsman-subscriber";
import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb";
import { Configuration } from "./config";
import { WorkspaceCluster } from "@gitpod/gitpod-protocol/lib/workspace-cluster";
import { repeat } from "@gitpod/gitpod-protocol/lib/util/repeat";
import { PreparingUpdateEmulator, PreparingUpdateEmulatorFactory } from "./preparing-update-emulator";
import { performance } from "perf_hooks";
import { PrebuildUpdater } from "./prebuild-updater";
import { WorkspaceInstanceController } from "./workspace-instance-controller";

export const WorkspaceManagerBridgeFactory = Symbol("WorkspaceManagerBridgeFactory");

Expand All @@ -55,9 +51,6 @@ export class WorkspaceManagerBridge implements Disposable {
@inject(TracedWorkspaceDB)
protected readonly workspaceDB: DBWithTracing<WorkspaceDB>;

@inject(TracedUserDB)
protected readonly userDB: DBWithTracing<UserDB>;

@inject(MessageBusIntegration)
protected readonly messagebus: MessageBusIntegration;

Expand All @@ -76,6 +69,9 @@ export class WorkspaceManagerBridge implements Disposable {
@inject(PrebuildUpdater)
protected readonly prebuildUpdater: PrebuildUpdater;

@inject(WorkspaceInstanceController)
protected readonly workspaceInstanceController: WorkspaceInstanceController; // bound in "transient" mode: we expect to receive a fresh instance here

protected readonly disposables = new DisposableCollection();
protected readonly queues = new Map<string, Queue>();

Expand Down Expand Up @@ -105,7 +101,12 @@ export class WorkspaceManagerBridge implements Disposable {

log.debug(`Starting controller: ${cluster.name}`, logPayload);
// Control all workspace instances, either against ws-manager or configured timeouts
this.startController(clientProvider, controllerIntervalSeconds, this.config.controllerMaxDisconnectSeconds);
this.workspaceInstanceController.start(
cluster.name,
clientProvider,
controllerIntervalSeconds,
this.config.controllerMaxDisconnectSeconds,
);
} else {
// _DO NOT_ update the DB (another bridge is responsible for that)
// Still, listen to all updates, generate/derive new state and distribute it locally!
Expand Down Expand Up @@ -380,7 +381,7 @@ export class WorkspaceManagerBridge implements Disposable {
// yet. Just for this case we need to set it now.
instance.stoppingTime = now;
}
lifecycleHandler = () => this.onInstanceStopped({ span }, userId, instance);
lifecycleHandler = () => this.workspaceInstanceController.onStopped({ span }, userId, instance);
break;
}

Expand Down Expand Up @@ -432,237 +433,6 @@ export class WorkspaceManagerBridge implements Disposable {
}
}

protected startController(
clientProvider: ClientProvider,
controllerIntervalSeconds: number,
controllerMaxDisconnectSeconds: number,
) {
let disconnectStarted = Number.MAX_SAFE_INTEGER;
this.disposables.push(
repeat(async () => {
const span = TraceContext.startSpan("controlInstances");
const ctx = { span };
try {
const installation = this.cluster.name;
log.debug("Controlling instances...", { installation });

const nonStoppedInstances = await this.workspaceDB
.trace(ctx)
.findRunningInstancesWithWorkspaces(installation, undefined, true);

// Control running workspace instances against ws-manager
try {
await this.controlNonStoppedWSManagerManagedInstances(
ctx,
nonStoppedInstances,
clientProvider,
this.config.timeouts.pendingPhaseSeconds,
this.config.timeouts.stoppingPhaseSeconds,
);

disconnectStarted = Number.MAX_SAFE_INTEGER; // Reset disconnect period
} catch (err) {
if (durationLongerThanSeconds(disconnectStarted, controllerMaxDisconnectSeconds)) {
log.warn("Error while controlling installation's workspaces", err, {
installation: this.cluster.name,
});
} else if (disconnectStarted > Date.now()) {
disconnectStarted = Date.now();
}
}

// Control workspace instances against timeouts
await this.controlInstancesTimeouts(ctx, nonStoppedInstances);

log.debug("Done controlling instances.", { installation });
} catch (err) {
TraceContext.setError(ctx, err);
log.error("Error while controlling installation's workspaces", err, {
installation: this.cluster.name,
});
} finally {
span.finish();
}
}, controllerIntervalSeconds * 1000),
);
}

/**
* This methods controls all instances that we have currently marked as "running" in the DB.
* It checks whether they are still running with their respective ws-manager, and if not, marks them as stopped in the DB.
*/
protected async controlNonStoppedWSManagerManagedInstances(
parentCtx: TraceContext,
runningInstances: RunningWorkspaceInfo[],
clientProvider: ClientProvider,
pendingPhaseSeconds: number,
stoppingPhaseSeconds: number,
) {
const installation = this.config.installation;

const span = TraceContext.startSpan("controlNonStoppedWSManagerManagedInstances", parentCtx);
const ctx = { span };
try {
log.debug("Controlling non-stopped instances that are managed by WS Manager...", { installation });

const runningInstancesIdx = new Map<string, RunningWorkspaceInfo>();
runningInstances.forEach((i) => runningInstancesIdx.set(i.latestInstance.id, i));

const client = await clientProvider();
const actuallyRunningInstances = await client.getWorkspaces(ctx, new GetWorkspacesRequest());
actuallyRunningInstances.getStatusList().forEach((s) => runningInstancesIdx.delete(s.getId()));

// runningInstancesIdx only contains instances that ws-manager is not aware of
for (const [instanceId, ri] of runningInstancesIdx.entries()) {
const instance = ri.latestInstance;
const phase = instance.status.phase;

// When ws-manager is not aware of the following instances outside of the timeout duration,
// they should be marked as stopped.
// pending states timeout is 1 hour after creationTime.
// stopping states timeout is 1 hour after stoppingTime.
if (
phase === "running" ||
(phase === "pending" &&
durationLongerThanSeconds(Date.parse(instance.creationTime), pendingPhaseSeconds)) ||
(phase === "stopping" &&
instance.stoppingTime &&
durationLongerThanSeconds(Date.parse(instance.stoppingTime), stoppingPhaseSeconds))
) {
log.info(
{ instanceId, workspaceId: instance.workspaceId },
"Database says the instance is present, but ws-man does not know about it. Marking as stopped in database.",
{ installation, phase },
);
await this.markWorkspaceInstanceAsStopped(ctx, ri, new Date());
continue;
}

log.debug({ instanceId }, "Skipping instance", {
phase: phase,
creationTime: instance.creationTime,
region: instance.region,
});
}

log.debug("Done controlling running instances.", { installation });
} catch (err) {
TraceContext.setError(ctx, err);
throw err; // required by caller
}
}

/**
* This methods controls all instances of this installation during periods where ws-manager does not control them, but we have them in our DB.
* These currently are:
* - preparing
* - building
* It also covers these phases, as fallback, when - for whatever reason - we no longer receive updates from ws-manager.
* - unknown (fallback)
*/
protected async controlInstancesTimeouts(parentCtx: TraceContext, runningInstances: RunningWorkspaceInfo[]) {
const installation = this.config.installation;

const span = TraceContext.startSpan("controlDBInstances", parentCtx);
const ctx = { span };
try {
log.debug("Controlling DB instances...", { installation });

await Promise.all(runningInstances.map((info) => this.controlInstanceTimeouts(ctx, info)));

log.debug("Done controlling DB instances.", { installation });
} catch (err) {
log.error("Error while running controlDBInstances", err, {
installation: this.cluster.name,
});
TraceContext.setError(ctx, err);
} finally {
span.finish();
}
}

protected async controlInstanceTimeouts(parentCtx: TraceContext, info: RunningWorkspaceInfo) {
const logContext: LogContext = {
userId: info.workspace.ownerId,
workspaceId: info.workspace.id,
instanceId: info.latestInstance.id,
};
const ctx = TraceContext.childContext("controlDBInstance", parentCtx);
try {
const now = Date.now();
const creationTime = new Date(info.latestInstance.creationTime).getTime();
const timedOutInPreparing = now >= creationTime + this.config.timeouts.preparingPhaseSeconds * 1000;
const timedOutInBuilding = now >= creationTime + this.config.timeouts.buildingPhaseSeconds * 1000;
const timedOutInUnknown = now >= creationTime + this.config.timeouts.unknownPhaseSeconds * 1000;
const currentPhase = info.latestInstance.status.phase;

log.debug(logContext, "Controller: Checking for instances in the DB to mark as stopped", {
creationTime,
timedOutInPreparing,
currentPhase,
});

if (
(currentPhase === "preparing" && timedOutInPreparing) ||
(currentPhase === "building" && timedOutInBuilding) ||
(currentPhase === "unknown" && timedOutInUnknown)
) {
log.info(logContext, "Controller: Marking workspace instance as stopped", {
creationTime,
currentPhase,
});
await this.markWorkspaceInstanceAsStopped(ctx, info, new Date(now));
}
} catch (err) {
log.warn(logContext, "Controller: Error while marking workspace instance as stopped", err);
TraceContext.setError(ctx, err);
} finally {
ctx.span.finish();
}
}

protected async markWorkspaceInstanceAsStopped(ctx: TraceContext, info: RunningWorkspaceInfo, now: Date) {
const nowISO = now.toISOString();
if (!info.latestInstance.stoppingTime) {
info.latestInstance.stoppingTime = nowISO;
}
info.latestInstance.stoppedTime = nowISO;
info.latestInstance.status.message = `Stopped by ws-manager-bridge. Previously in phase ${info.latestInstance.status.phase}`;
this.prometheusExporter.increaseInstanceMarkedStoppedCounter(info.latestInstance.status.phase);
info.latestInstance.status.phase = "stopped";
await this.workspaceDB.trace(ctx).storeInstance(info.latestInstance);

// cleanup
// important: call this after the DB update
await this.onInstanceStopped(ctx, info.workspace.ownerId, info.latestInstance);

await this.messagebus.notifyOnInstanceUpdate(ctx, info.workspace.ownerId, info.latestInstance);
await this.prebuildUpdater.stopPrebuildInstance(ctx, info.latestInstance);
}

protected async onInstanceStopped(
ctx: TraceContext,
ownerUserID: string,
instance: WorkspaceInstance,
): Promise<void> {
const span = TraceContext.startSpan("onInstanceStopped", ctx);

try {
await this.userDB.trace({ span }).deleteGitpodTokensNamedLike(ownerUserID, `${instance.id}-%`);
this.analytics.track({
userId: ownerUserID,
event: "workspace_stopped",
messageId: `bridge-wsstopped-${instance.id}`,
properties: { instanceId: instance.id, workspaceId: instance.workspaceId },
});
} catch (err) {
TraceContext.setError({ span }, err);
throw err;
} finally {
span.finish();
}
}

public dispose() {
this.disposables.dispose();
}
Expand All @@ -687,10 +457,6 @@ const mapPortVisibility = (visibility: WsManPortVisibility | undefined): PortVis
}
};

const durationLongerThanSeconds = (time: number, durationSeconds: number, now: number = Date.now()) => {
return (now - time) / 1000 > durationSeconds;
};

/**
* Filter here to avoid overloading spans
* @param status
Expand Down
4 changes: 4 additions & 0 deletions components/ws-manager-bridge/src/container-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { DebugApp } from "@gitpod/gitpod-protocol/lib/util/debug-app";
import { Client } from "@gitpod/gitpod-protocol/lib/experiments/types";
import { getExperimentsClientForBackend } from "@gitpod/gitpod-protocol/lib/experiments/configcat-server";
import { ClusterSyncService } from "./cluster-sync-service";
import { WorkspaceInstanceController, WorkspaceInstanceControllerImpl } from "./workspace-instance-controller";

export const containerModule = new ContainerModule((bind) => {
bind(MessagebusConfiguration).toSelf().inSingletonScope();
Expand Down Expand Up @@ -91,4 +92,7 @@ export const containerModule = new ContainerModule((bind) => {
bind(DebugApp).toSelf().inSingletonScope();

bind(Client).toDynamicValue(getExperimentsClientForBackend).inSingletonScope();

// transient to make sure we're creating a separate instance every time we ask for it
bind(WorkspaceInstanceController).to(WorkspaceInstanceControllerImpl).inTransientScope();
});
Loading