|
4 | 4 | * See License-AGPL.txt in the project root for license information.
|
5 | 5 | */
|
6 | 6 |
|
7 |
| -import { WorkspaceDB } from "@gitpod/gitpod-db/lib"; |
8 |
| -import { Snapshot } from "@gitpod/gitpod-protocol"; |
9 | 7 | import { inject, injectable } from "inversify";
|
| 8 | +import { v4 as uuidv4 } from 'uuid'; |
| 9 | +import { WorkspaceDB } from "@gitpod/gitpod-db/lib"; |
| 10 | +import { Disposable, GitpodServer, Snapshot } from "@gitpod/gitpod-protocol"; |
| 11 | +import { SafePromise } from "@gitpod/gitpod-protocol/lib/util/safe-promise"; |
10 | 12 | import { StorageClient } from "../../../src/storage/storage-client";
|
| 13 | +import { ConsensusLeaderQorum } from "../../../src/consensus/consensus-leader-quorum"; |
| 14 | +import { log } from "@gitpod/gitpod-protocol/lib/util/logging"; |
11 | 15 |
|
12 | 16 | const SNAPSHOT_TIMEOUT_SECONDS = 60 * 30;
|
| 17 | +const SNAPSHOT_POLL_INTERVAL_SECONDS = 5; |
| 18 | +const SNAPSHOT_DB_POLL_INTERVAL_SECONDS = 60 * 5; |
| 19 | + |
| 20 | +export interface WaitForSnapshotOptions { |
| 21 | + workspaceOwner: string; |
| 22 | + snapshot: Snapshot; |
| 23 | +} |
13 | 24 |
|
| 25 | +/** |
| 26 | + * SnapshotService hosts all code that's necessary to create snapshots and drive them to completion. |
| 27 | + * To guarantee every snapshot reacheds an end state ('error' or 'available') it regularly polls the DB to pick up and drive those as well. |
| 28 | + */ |
14 | 29 | @injectable()
|
15 | 30 | export class SnapshotService {
|
16 | 31 | @inject(WorkspaceDB) protected readonly workspaceDb: WorkspaceDB;
|
17 | 32 | @inject(StorageClient) protected readonly storageClient: StorageClient;
|
| 33 | + @inject(ConsensusLeaderQorum) protected readonly leaderQuorum: ConsensusLeaderQorum; |
18 | 34 |
|
19 |
| - public async driveSnapshot(snapshotWorkspaceOwner: string, _snapshot: Snapshot): Promise<void> { |
20 |
| - const { id: snapshotId, bucketId, originalWorkspaceId, creationTime } = _snapshot; |
21 |
| - const start = new Date(creationTime).getTime(); |
22 |
| - while (start + SNAPSHOT_TIMEOUT_SECONDS < Date.now()) { |
23 |
| - await new Promise((resolve) => setTimeout(resolve, 3000)); |
| 35 | + protected readonly runningSnapshots: Map<string, Promise<void>> = new Map(); |
24 | 36 |
|
25 |
| - // pending: check if we're done: |
26 |
| - const exists = await this.storageClient.workspaceSnapshotExists(snapshotWorkspaceOwner, originalWorkspaceId, bucketId); |
27 |
| - if (exists) { |
28 |
| - await this.workspaceDb.updateSnapshot({ |
29 |
| - id: snapshotId, |
30 |
| - state: 'available', |
31 |
| - availableTime: new Date().toISOString(), |
32 |
| - }); |
33 |
| - return |
| 37 | + public async start(): Promise<Disposable> { |
| 38 | + const timer = setInterval(() => this.pickupAndDriveFromDbIfWeAreLeader().catch(log.error), SNAPSHOT_DB_POLL_INTERVAL_SECONDS * 1000); |
| 39 | + return { |
| 40 | + dispose: () => clearInterval(timer) |
| 41 | + } |
| 42 | + } |
| 43 | + |
| 44 | + public async pickupAndDriveFromDbIfWeAreLeader() { |
| 45 | + if (!await this.leaderQuorum.areWeLeader()) { |
| 46 | + return |
| 47 | + } |
| 48 | + |
| 49 | + log.info("snapshots: we're leading the quorum. picking up pending snapshots and driving them home."); |
| 50 | + const step = 50; // make sure we're not flooding ourselves |
| 51 | + const { snapshots: pendingSnapshots, total } = await this.workspaceDb.findSnapshotsWithState('pending', 0, step); |
| 52 | + if (total > step) { |
| 53 | + log.warn("snapshots: looks like we have more pending snapshots then we can handle!"); |
| 54 | + } |
| 55 | + |
| 56 | + for (const snapshot of pendingSnapshots) { |
| 57 | + const workspace = await this.workspaceDb.findById(snapshot.originalWorkspaceId); |
| 58 | + if (!workspace) { |
| 59 | + log.error({ workspaceId: snapshot.originalWorkspaceId }, `snapshots: unable to find workspace for snapshot`, { snapshotId: snapshot.id }); |
| 60 | + continue; |
34 | 61 | }
|
35 | 62 |
|
| 63 | + SafePromise.catchAndLog(this.driveSnapshotCached({ workspaceOwner: workspace.ownerId, snapshot }), { workspaceId: snapshot.originalWorkspaceId }); |
| 64 | + } |
| 65 | + } |
| 66 | + |
| 67 | + public async createSnapshot(options: GitpodServer.TakeSnapshotOptions, snapshotUrl: string): Promise<Snapshot> { |
| 68 | + const id = uuidv4() |
| 69 | + return await this.workspaceDb.storeSnapshot({ |
| 70 | + id, |
| 71 | + creationTime: new Date().toISOString(), |
| 72 | + state: 'pending', |
| 73 | + bucketId: snapshotUrl, |
| 74 | + originalWorkspaceId: options.workspaceId, |
| 75 | + layoutData: options.layoutData, |
| 76 | + }); |
| 77 | + } |
| 78 | + |
| 79 | + public async waitForSnapshot(opts: WaitForSnapshotOptions): Promise<void> { |
| 80 | + return await this.driveSnapshotCached(opts); |
| 81 | + } |
| 82 | + |
| 83 | + protected async driveSnapshotCached(opts: WaitForSnapshotOptions): Promise<void> { |
| 84 | + const running = this.runningSnapshots.get(opts.snapshot.id); |
| 85 | + if (running) { |
| 86 | + return running; |
| 87 | + } |
| 88 | + |
| 89 | + const started = this.driveSnapshot(opts); |
| 90 | + this.runningSnapshots.set(opts.snapshot.id, started); |
| 91 | + started.finally(() => this.runningSnapshots.delete(opts.snapshot.id)); |
| 92 | + return started; |
| 93 | + } |
| 94 | + |
| 95 | + protected async driveSnapshot(opts: WaitForSnapshotOptions): Promise<void> { |
| 96 | + if (opts.snapshot.state === 'available') { |
| 97 | + return; |
| 98 | + } |
| 99 | + if (opts.snapshot.state === 'error') { |
| 100 | + throw new Error(`snapshot error: ${opts.snapshot.message}`); |
| 101 | + } |
| 102 | + |
| 103 | + const { id: snapshotId, bucketId, originalWorkspaceId, creationTime } = opts.snapshot; |
| 104 | + const start = new Date(creationTime).getTime(); |
| 105 | + while (start + (SNAPSHOT_TIMEOUT_SECONDS * 1000) > Date.now()) { |
| 106 | + await new Promise((resolve) => setTimeout(resolve, SNAPSHOT_POLL_INTERVAL_SECONDS * 1000)); |
| 107 | + |
| 108 | + // did somebody else complete that snapshot? |
36 | 109 | const snapshot = await this.workspaceDb.findSnapshotById(snapshotId);
|
37 | 110 | if (!snapshot) {
|
38 | 111 | throw new Error(`no snapshot with id '${snapshotId}' found.`)
|
39 | 112 | }
|
40 |
| - |
41 | 113 | if (snapshot.state === 'available') {
|
42 | 114 | return;
|
43 | 115 | }
|
44 | 116 | if (snapshot.state === 'error') {
|
45 | 117 | throw new Error(`snapshot error: ${snapshot.message}`);
|
46 | 118 | }
|
| 119 | + |
| 120 | + // pending: check if the snapshot is there |
| 121 | + const exists = await this.storageClient.workspaceSnapshotExists(opts.workspaceOwner, originalWorkspaceId, bucketId); |
| 122 | + if (exists) { |
| 123 | + await this.workspaceDb.updateSnapshot({ |
| 124 | + id: snapshotId, |
| 125 | + state: 'available', |
| 126 | + availableTime: new Date().toISOString(), |
| 127 | + }); |
| 128 | + return; |
| 129 | + } |
47 | 130 | }
|
48 | 131 |
|
49 | 132 | // took too long
|
|
0 commit comments