@@ -16,14 +16,41 @@ import { WorkspaceInstance } from "@gitpod/gitpod-protocol";
16
16
import * as grpc from '@grpc/grpc-js' ;
17
17
import { Config } from "../config" ;
18
18
import * as browserHeaders from "browser-headers" ;
19
- import { log } from '@gitpod/gitpod-protocol/lib/util/logging' ;
19
+ import { log , LogContext } from '@gitpod/gitpod-protocol/lib/util/logging' ;
20
20
import { TextDecoder } from "util" ;
21
21
import { WebsocketTransport } from "../util/grpc-web-ws-transport" ;
22
22
import { Deferred } from "@gitpod/gitpod-protocol/lib/util/deferred" ;
23
23
import { ListLogsRequest , ListLogsResponse , LogDownloadURLRequest , LogDownloadURLResponse } from '@gitpod/content-service/lib/headless-log_pb' ;
24
24
import { HEADLESS_LOG_DOWNLOAD_PATH_PREFIX } from "./headless-log-controller" ;
25
25
import { CachingHeadlessLogServiceClientProvider } from "@gitpod/content-service/lib/sugar" ;
26
26
27
+ export type HeadlessLogEndpoint = {
28
+ url : string ,
29
+ ownerToken ?: string ,
30
+ headers ?: { [ key : string ] : string } ,
31
+ } ;
32
+ export namespace HeadlessLogEndpoint {
33
+ export function authHeaders ( logCtx : LogContext , logEndpoint : HeadlessLogEndpoint ) : browserHeaders . BrowserHeaders | undefined {
34
+ const headers = new browserHeaders . BrowserHeaders ( logEndpoint . headers ) ;
35
+ if ( logEndpoint . ownerToken ) {
36
+ headers . set ( "x-gitpod-owner-token" , logEndpoint . ownerToken ) ;
37
+ }
38
+
39
+ if ( Object . keys ( headers . headersMap ) . length === 0 ) {
40
+ log . warn ( logCtx , "workspace logs: no ownerToken nor headers!" ) ;
41
+ return undefined ;
42
+ }
43
+
44
+ return headers ;
45
+ }
46
+ export function fromWithOwnerToken ( wsi : WorkspaceInstance ) : HeadlessLogEndpoint {
47
+ return {
48
+ url : wsi . ideUrl ,
49
+ ownerToken : wsi . status . ownerToken ,
50
+ }
51
+ }
52
+ }
53
+
27
54
@injectable ( )
28
55
export class HeadlessLogService {
29
56
static readonly SUPERVISOR_API_PATH = "/_supervisor/v1" ;
@@ -32,21 +59,22 @@ export class HeadlessLogService {
32
59
@inject ( Config ) protected readonly config : Config ;
33
60
@inject ( CachingHeadlessLogServiceClientProvider ) protected readonly headlessLogClientProvider : CachingHeadlessLogServiceClientProvider ;
34
61
35
- public async getHeadlessLogURLs ( userId : string , wsi : WorkspaceInstance , ownerId : string , maxTimeoutSecs : number = 30 ) : Promise < HeadlessLogUrls | undefined > {
62
+ public async getHeadlessLogURLs ( logCtx : LogContext , wsi : WorkspaceInstance , ownerId : string , maxTimeoutSecs : number = 30 ) : Promise < HeadlessLogUrls | undefined > {
36
63
if ( isSupervisorAvailableSoon ( wsi ) ) {
64
+ const logEndpoint = HeadlessLogEndpoint . fromWithOwnerToken ( wsi ) ;
37
65
const aborted = new Deferred < boolean > ( ) ;
38
66
setTimeout ( ( ) => aborted . resolve ( true ) , maxTimeoutSecs * 1000 ) ;
39
- const streamIds = await this . retryWhileInstanceIsRunning ( wsi , ( ) => this . supervisorListHeadlessLogs ( wsi ) , "list headless log streams" , aborted ) ;
67
+ const streamIds = await this . retryOnError ( ( ) => this . supervisorListHeadlessLogs ( logCtx , wsi . id , logEndpoint ) , "list headless log streams" , this . continueWhileRunning ( wsi . id ) , aborted ) ;
40
68
if ( streamIds !== undefined ) {
41
69
return streamIds ;
42
70
}
43
71
}
44
72
45
73
// we were unable to get a repsonse from supervisor - let's try content service next
46
- return await this . contentServiceListLogs ( userId , wsi , ownerId ) ;
74
+ return await this . contentServiceListLogs ( wsi , ownerId ) ;
47
75
}
48
76
49
- protected async contentServiceListLogs ( userId : string , wsi : WorkspaceInstance , ownerId : string ) : Promise < HeadlessLogUrls | undefined > {
77
+ protected async contentServiceListLogs ( wsi : WorkspaceInstance , ownerId : string ) : Promise < HeadlessLogUrls | undefined > {
50
78
const req = new ListLogsRequest ( ) ;
51
79
req . setOwnerId ( ownerId ) ;
52
80
req . setWorkspaceId ( wsi . workspaceId ) ;
@@ -74,19 +102,24 @@ export class HeadlessLogService {
74
102
} ;
75
103
}
76
104
77
- protected async supervisorListHeadlessLogs ( wsi : WorkspaceInstance ) : Promise < HeadlessLogUrls > {
78
- if ( wsi . ideUrl === "" ) {
105
+ protected async supervisorListHeadlessLogs ( logCtx : LogContext , instanceId : string , logEndpoint : HeadlessLogEndpoint ) : Promise < HeadlessLogUrls | undefined > {
106
+ const tasks = await this . supervisorListTasks ( logCtx , logEndpoint ) ;
107
+ return this . renderTasksHeadlessLogUrls ( logCtx , instanceId , tasks ) ;
108
+ }
109
+
110
+ protected async supervisorListTasks ( logCtx : LogContext , logEndpoint : HeadlessLogEndpoint ) : Promise < TaskStatus [ ] > {
111
+ if ( logEndpoint . url === "" ) {
79
112
// if ideUrl is not yet set we're too early and we deem the workspace not ready yet: retry later!
80
- throw new Error ( `instance's ${ wsi . id } has no ideUrl, yet` ) ;
113
+ throw new Error ( `instance's ${ logCtx . instanceId } has no ideUrl, yet` ) ;
81
114
}
82
115
83
116
const tasks = await new Promise < TaskStatus [ ] > ( ( resolve , reject ) => {
84
- const client = new StatusServiceClient ( toSupervisorURL ( wsi . ideUrl ) , {
117
+ const client = new StatusServiceClient ( toSupervisorURL ( logEndpoint . url ) , {
85
118
transport : WebsocketTransport ( ) ,
86
119
} ) ;
87
120
88
121
const req = new TasksStatusRequest ( ) ; // Note: Don't set observe here at all, else it won't work!
89
- const stream = client . tasksStatus ( req , authHeaders ( wsi ) ) ;
122
+ const stream = client . tasksStatus ( req , HeadlessLogEndpoint . authHeaders ( logCtx , logEndpoint ) ) ;
90
123
stream . on ( 'data' , ( resp : TasksStatusResponse ) => {
91
124
resolve ( resp . getTasksList ( ) ) ;
92
125
stream . cancel ( ) ;
@@ -99,7 +132,10 @@ export class HeadlessLogService {
99
132
}
100
133
} ) ;
101
134
} ) ;
135
+ return tasks ;
136
+ }
102
137
138
+ protected renderTasksHeadlessLogUrls ( logCtx : LogContext , instanceId : string , tasks : TaskStatus [ ] ) : HeadlessLogUrls {
103
139
// render URLs that point to server's /headless-logs/ endpoint which forwards calls to the running workspaces's supervisor
104
140
const streams : { [ id : string ] : string } = { } ;
105
141
for ( const task of tasks ) {
@@ -109,14 +145,14 @@ export class HeadlessLogService {
109
145
// this might be the case when there is no terminal for this task, yet.
110
146
// if we find any such case, we deem the workspace not ready yet, and try to reconnect later,
111
147
// to be sure to get hold of all terminals created.
112
- throw new Error ( `instance's ${ wsi . id } task ${ task . getId ( ) } has no terminal yet` ) ;
148
+ throw new Error ( `instance's ${ instanceId } task ${ task . getId ( ) } has no terminal yet` ) ;
113
149
}
114
150
if ( task . getState ( ) === TaskState . CLOSED ) {
115
151
// if a task has already been closed we can no longer access it's terminal, and have to skip it.
116
152
continue ;
117
153
}
118
154
streams [ taskId ] = this . config . hostUrl . with ( {
119
- pathname : `/headless-logs/${ wsi . id } /${ terminalId } ` ,
155
+ pathname : `/headless-logs/${ instanceId } /${ terminalId } ` ,
120
156
} ) . toString ( ) ;
121
157
}
122
158
return {
@@ -158,12 +194,29 @@ export class HeadlessLogService {
158
194
159
195
/**
160
196
* For now, simply stream the supervisor data
161
- *
162
- * @param workspace
197
+ * @param logCtx
198
+ * @param logEndpoint
199
+ * @param instanceId
200
+ * @param terminalID
201
+ * @param sink
202
+ * @param doContinue
203
+ * @param aborted
204
+ */
205
+ async streamWorkspaceLogWhileRunning ( logCtx : LogContext , logEndpoint : HeadlessLogEndpoint , instanceId : string , terminalID : string , sink : ( chunk : string ) => Promise < void > , aborted : Deferred < boolean > ) : Promise < void > {
206
+ await this . streamWorkspaceLog ( logCtx , logEndpoint , terminalID , sink , this . continueWhileRunning ( instanceId ) , aborted ) ;
207
+ }
208
+
209
+ /**
210
+ * For now, simply stream the supervisor data
211
+ * @param logCtx
212
+ * @param logEndpoint
163
213
* @param terminalID
214
+ * @param sink
215
+ * @param doContinue
216
+ * @param aborted
164
217
*/
165
- async streamWorkspaceLog ( wsi : WorkspaceInstance , terminalID : string , sink : ( chunk : string ) => Promise < void > , aborted : Deferred < boolean > ) : Promise < void > {
166
- const client = new TerminalServiceClient ( toSupervisorURL ( wsi . ideUrl ) , {
218
+ protected async streamWorkspaceLog ( logCtx : LogContext , logEndpoint : HeadlessLogEndpoint , terminalID : string , sink : ( chunk : string ) => Promise < void > , doContinue : ( ) => Promise < boolean > , aborted : Deferred < boolean > ) : Promise < void > {
219
+ const client = new TerminalServiceClient ( toSupervisorURL ( logEndpoint . url ) , {
167
220
transport : WebsocketTransport ( ) , // necessary because HTTPTransport causes caching issues
168
221
} ) ;
169
222
const req = new ListenTerminalRequest ( ) ;
@@ -172,10 +225,10 @@ export class HeadlessLogService {
172
225
let receivedDataYet = false ;
173
226
let stream : ResponseStream < ListenTerminalResponse > | undefined = undefined ;
174
227
aborted . promise . then ( ( ) => stream ?. cancel ( ) ) ;
175
- const doStream = ( cancelRetry : ( ) => void ) => new Promise < void > ( ( resolve , reject ) => {
228
+ const doStream = ( retry : ( doRetry ?: boolean ) => void ) => new Promise < void > ( ( resolve , reject ) => {
176
229
// [gpl] this is the very reason we cannot redirect the frontend to the supervisor URL: currently we only have ownerTokens for authentication
177
230
const decoder = new TextDecoder ( 'utf-8' )
178
- stream = client . listen ( req , authHeaders ( wsi ) ) ;
231
+ stream = client . listen ( req , HeadlessLogEndpoint . authHeaders ( logCtx , logEndpoint ) ) ;
179
232
stream . on ( 'data' , ( resp : ListenTerminalResponse ) => {
180
233
receivedDataYet = true ;
181
234
@@ -184,7 +237,7 @@ export class HeadlessLogService {
184
237
sink ( data )
185
238
. catch ( ( err ) => {
186
239
stream ?. cancel ( ) ; // If downstream reports an error: cancel connection to upstream
187
- log . debug ( { instanceId : wsi . id } , "stream cancelled" , err ) ;
240
+ log . debug ( logCtx , "stream cancelled" , err ) ;
188
241
} ) ;
189
242
} ) ;
190
243
stream . on ( 'end' , ( status ?: Status ) => {
@@ -201,58 +254,56 @@ export class HeadlessLogService {
201
254
return ;
202
255
}
203
256
204
- cancelRetry ( ) ;
257
+ retry ( false ) ;
205
258
reject ( err ) ;
206
259
} ) ;
207
260
} ) ;
208
- await this . retryWhileInstanceIsRunning ( wsi , doStream , "stream workspace logs" , aborted ) ;
261
+ await this . retryOnError ( doStream , "stream workspace logs" , doContinue , aborted ) ;
209
262
}
210
263
211
264
/**
212
265
* Retries op while the passed WorkspaceInstance is still starting. Retries are stopped if either:
213
- * - `op` calls `cancel( )` and an err is thrown, it is re-thrown by this method
266
+ * - `op` calls `retry(false )` and an err is thrown, it is re-thrown by this method
214
267
* - `aborted` resolves to `true`: `undefined` is returned
215
- * - if the instance enters the either STOPPING/STOPPED phases, we stop retrying, and return `undefined`
216
- * @param wsi
268
+ * - `(await while()) === true`: `undefined` is returned
217
269
* @param op
218
270
* @param description
271
+ * @param doContinue
219
272
* @param aborted
220
273
* @returns
221
274
*/
222
- protected async retryWhileInstanceIsRunning < T > ( wsi : WorkspaceInstance , op : ( cancel : ( ) => void ) => Promise < T > , description : string , aborted : Deferred < boolean > ) : Promise < T | undefined > {
223
- let cancelled = false ;
224
- const cancel = ( ) => { cancelled = true ; } ;
275
+ protected async retryOnError < T > ( op : ( cancel : ( ) => void ) => Promise < T > , description : string , doContinue : ( ) => Promise < boolean > , aborted : Deferred < boolean > ) : Promise < T | undefined > {
276
+ let retry = true ;
277
+ const retryFunction = ( doRetry : boolean = true ) => { retry = doRetry } ;
225
278
226
- let instance = wsi ;
227
- while ( ! cancelled && ! ( aborted . isResolved && ( await aborted . promise ) ) ) {
279
+ while ( retry && ! ( aborted . isResolved && ( await aborted . promise ) ) ) {
228
280
try {
229
- return await op ( cancel ) ;
281
+ return await op ( retryFunction ) ;
230
282
} catch ( err ) {
231
- if ( cancelled ) {
283
+ if ( ! retry ) {
232
284
throw err ;
233
285
}
234
286
235
- log . debug ( `unable to ${ description } ` , err ) ;
236
- const maybeInstance = await this . db . findInstanceById ( instance . id ) ;
237
- if ( ! maybeInstance ) {
287
+ const shouldContinue = await doContinue ( ) ;
288
+ if ( ! shouldContinue ) {
238
289
return undefined ;
239
290
}
240
- instance = maybeInstance ;
241
291
242
- if ( ! this . shouldRetry ( instance ) ) {
243
- return undefined ;
244
- }
245
- log . debug ( `re-trying ${ description } ...` ) ;
292
+ log . debug ( `unable to ${ description } , retrying...` , err ) ;
246
293
await new Promise ( ( resolve ) => setTimeout ( resolve , 2000 ) ) ;
247
294
continue ;
248
295
}
249
296
}
250
297
return undefined ;
251
298
}
252
299
253
- protected shouldRetry ( wsi : WorkspaceInstance ) : boolean {
254
- return isSupervisorAvailableSoon ( wsi ) ;
255
- }
300
+ protected continueWhileRunning ( instanceId : string ) : ( ) => Promise < boolean > {
301
+ const db = this . db ;
302
+ return async ( ) => {
303
+ const maybeInstance = await db . findInstanceById ( instanceId ) ;
304
+ return ! ! maybeInstance && isSupervisorAvailableSoon ( maybeInstance ) ;
305
+ }
306
+ } ;
256
307
}
257
308
258
309
function isSupervisorAvailableSoon ( wsi : WorkspaceInstance ) : boolean {
@@ -273,14 +324,3 @@ function toSupervisorURL(ideUrl: string): string {
273
324
u . pathname = HeadlessLogService . SUPERVISOR_API_PATH ;
274
325
return u . toString ( ) ;
275
326
}
276
-
277
- function authHeaders ( wsi : WorkspaceInstance ) : browserHeaders . BrowserHeaders | undefined {
278
- const ownerToken = wsi . status . ownerToken ;
279
- if ( ! ownerToken ) {
280
- log . warn ( { instanceId : wsi . id } , "workspace logs: owner token not found" ) ;
281
- return undefined ;
282
- }
283
- const headers = new browserHeaders . BrowserHeaders ( ) ;
284
- headers . set ( "x-gitpod-owner-token" , ownerToken ) ;
285
- return headers ;
286
- }
0 commit comments