@@ -59,22 +59,32 @@ export class WorkspaceManagerBridge implements Disposable {
59
59
protected cluster : WorkspaceClusterInfo ;
60
60
61
61
public start ( cluster : WorkspaceClusterInfo , clientProvider : ClientProvider ) {
62
- const logPayload = { name : cluster . name , url : cluster . url } ;
62
+ const logPayload = { name : cluster . name , url : cluster . url , govern : cluster . govern } ;
63
63
log . info ( `starting bridge to cluster...` , logPayload ) ;
64
64
this . cluster = cluster ;
65
65
66
- if ( cluster . govern ) {
67
- log . debug ( `starting DB updater : ${ cluster . name } ` , logPayload ) ;
68
- /* no await */ this . startDatabaseUpdater ( clientProvider , logPayload )
66
+ const startStatusUpdateHandler = ( writeToDB : boolean ) => {
67
+ log . debug ( `starting status update handler : ${ cluster . name } ` , logPayload ) ;
68
+ /* no await */ this . startStatusUpdateHandler ( clientProvider , writeToDB , logPayload )
69
69
// this is a mere safe-guard: we do not expect the code inside to fail
70
- . catch ( err => log . error ( "cannot start database updater" , err ) ) ;
70
+ . catch ( err => log . error ( "cannot start status update handler" , err ) ) ;
71
+ } ;
71
72
73
+ if ( cluster . govern ) {
74
+ // notify servers and _update the DB_
75
+ startStatusUpdateHandler ( true ) ;
76
+
77
+ // the actual "governing" part
72
78
const controllerInterval = this . config . controllerIntervalSeconds ;
73
79
if ( controllerInterval <= 0 ) {
74
80
throw new Error ( "controllerInterval <= 0!" ) ;
75
81
}
76
82
log . debug ( `starting controller: ${ cluster . name } ` , logPayload ) ;
77
83
this . startController ( clientProvider , controllerInterval , this . config . controllerMaxDisconnectSeconds ) ;
84
+ } else {
85
+ // _DO NOT_ update the DB (another bridge is responsible for that)
86
+ // Still, listen to all updates, generate/derive new state and distribute it locally!
87
+ startStatusUpdateHandler ( false ) ;
78
88
}
79
89
log . info ( `started bridge to cluster.` , logPayload ) ;
80
90
}
@@ -83,15 +93,15 @@ export class WorkspaceManagerBridge implements Disposable {
83
93
this . dispose ( ) ;
84
94
}
85
95
86
- protected async startDatabaseUpdater ( clientProvider : ClientProvider , logPayload : { } ) : Promise < void > {
96
+ protected async startStatusUpdateHandler ( clientProvider : ClientProvider , writeToDB : boolean , logPayload : { } ) : Promise < void > {
87
97
const subscriber = new WsmanSubscriber ( clientProvider ) ;
88
98
this . disposables . push ( subscriber ) ;
89
99
90
100
const onReconnect = ( ctx : TraceContext , s : WorkspaceStatus [ ] ) => {
91
- s . forEach ( sx => this . serializeMessagesByInstanceId < WorkspaceStatus > ( ctx , sx , m => m . getId ( ) , ( ctx , msg ) => this . handleStatusUpdate ( ctx , msg ) ) )
101
+ s . forEach ( sx => this . serializeMessagesByInstanceId < WorkspaceStatus > ( ctx , sx , m => m . getId ( ) , ( ctx , msg ) => this . handleStatusUpdate ( ctx , msg , writeToDB ) ) )
92
102
} ;
93
103
const onStatusUpdate = ( ctx : TraceContext , s : WorkspaceStatus ) => {
94
- this . serializeMessagesByInstanceId < WorkspaceStatus > ( ctx , s , msg => msg . getId ( ) , ( ctx , s ) => this . handleStatusUpdate ( ctx , s ) )
104
+ this . serializeMessagesByInstanceId < WorkspaceStatus > ( ctx , s , msg => msg . getId ( ) , ( ctx , s ) => this . handleStatusUpdate ( ctx , s , writeToDB ) )
95
105
} ;
96
106
await subscriber . subscribe ( { onReconnect, onStatusUpdate } , logPayload ) ;
97
107
}
@@ -110,7 +120,7 @@ export class WorkspaceManagerBridge implements Disposable {
110
120
this . queues . set ( instanceId , q ) ;
111
121
}
112
122
113
- protected async handleStatusUpdate ( ctx : TraceContext , rawStatus : WorkspaceStatus ) {
123
+ protected async handleStatusUpdate ( ctx : TraceContext , rawStatus : WorkspaceStatus , writeToDB : boolean ) {
114
124
const status = rawStatus . toObject ( ) ;
115
125
if ( ! status . spec || ! status . metadata || ! status . conditions ) {
116
126
log . warn ( "Received invalid status update" , status ) ;
@@ -246,26 +256,40 @@ export class WorkspaceManagerBridge implements Disposable {
246
256
break ;
247
257
}
248
258
249
- await this . updatePrebuiltWorkspace ( { span} , status ) ;
250
-
251
259
span . setTag ( "after" , JSON . stringify ( instance ) ) ;
252
- await this . workspaceDB . trace ( { span} ) . storeInstance ( instance ) ;
253
- await this . messagebus . notifyOnInstanceUpdate ( { span} , userId , instance ) ;
254
260
255
- // important: call this after the DB update
256
- await this . cleanupProbeWorkspace ( { span} , status ) ;
257
-
258
- if ( ! ! lifecycleHandler ) {
259
- await lifecycleHandler ( ) ;
261
+ // now notify all listeners about updates - and update DB if needed
262
+ if ( writeToDB ) {
263
+ await this . writePrebuiltUpdateToDB ( { span} , userId , status ) ;
264
+ await this . writeStatusUpdateToDB ( { span} , userId , instance , status , lifecycleHandler ) ;
260
265
}
266
+ await this . notifyOnPrebuiltUpdate ( { span} , userId , status ) ;
267
+ await this . notifyOnInstanceUpdate ( { span} , userId , instance ) ;
268
+
261
269
} catch ( e ) {
262
- TraceContext . setError ( { span } , e ) ;
270
+ TraceContext . setError ( { span} , e ) ;
263
271
throw e ;
264
272
} finally {
265
273
span . finish ( ) ;
266
274
}
267
275
}
268
276
277
+ protected async writeStatusUpdateToDB ( ctx : TraceContext , userId : string , instance : WorkspaceInstance , status : WorkspaceStatus . AsObject , lifecycleHandler : ( ( ) => Promise < void > ) | undefined ) : Promise < void > {
278
+ await this . workspaceDB . trace ( ctx ) . storeInstance ( instance ) ;
279
+
280
+ // cleanup
281
+ // important: call this after the DB update
282
+ await this . cleanupProbeWorkspace ( ctx , status ) ;
283
+
284
+ if ( ! ! lifecycleHandler ) {
285
+ await lifecycleHandler ( ) ;
286
+ }
287
+ }
288
+
289
+ protected async notifyOnInstanceUpdate ( ctx : TraceContext , userId : string , instance : WorkspaceInstance ) {
290
+ await this . messagebus . notifyOnInstanceUpdate ( ctx , userId , instance ) ;
291
+ }
292
+
269
293
protected startController ( clientProvider : ClientProvider , controllerIntervalSeconds : number , controllerMaxDisconnectSeconds : number , maxTimeToRunningPhaseSeconds = 60 * 60 ) {
270
294
let disconnectStarted = Number . MAX_SAFE_INTEGER ;
271
295
this . disposables . push (
@@ -321,7 +345,11 @@ export class WorkspaceManagerBridge implements Disposable {
321
345
// probes are an EE feature - we just need the hook here
322
346
}
323
347
324
- protected async updatePrebuiltWorkspace ( ctx : TraceContext , status : WorkspaceStatus . AsObject ) {
348
+ protected async writePrebuiltUpdateToDB ( ctx : TraceContext , userId : string , status : WorkspaceStatus . AsObject ) {
349
+ // prebuilds are an EE feature - we just need the hook here
350
+ }
351
+
352
+ protected async notifyOnPrebuiltUpdate ( ctx : TraceContext , userId : string , status : WorkspaceStatus . AsObject ) {
325
353
// prebuilds are an EE feature - we just need the hook here
326
354
}
327
355
@@ -334,7 +362,7 @@ export class WorkspaceManagerBridge implements Disposable {
334
362
335
363
try {
336
364
await this . userDB . trace ( { span} ) . deleteGitpodTokensNamedLike ( ownerUserID , `${ instance . id } -%` ) ;
337
- await this . analytics . track ( {
365
+ this . analytics . track ( {
338
366
userId : ownerUserID ,
339
367
event : "workspace-stopped" ,
340
368
messageId : `bridge-wsstopped-${ instance . id } ` ,
0 commit comments