Skip to content

Commit b6e10ae

Browse files
committed
[supervisor] merge run tasks
1 parent 348f325 commit b6e10ae

File tree

8 files changed

+104
-15
lines changed

8 files changed

+104
-15
lines changed

components/supervisor/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
supervisor

components/supervisor/pkg/ports/ports.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -872,3 +872,13 @@ func defaultRoutableIP() string {
872872

873873
return addresses[0].(*net.IPNet).IP.String()
874874
}
875+
876+
func (pm *Manager) IsServed(port uint32) bool {
877+
served := pm.served
878+
for _, served := range served {
879+
if served.Port == port {
880+
return true
881+
}
882+
}
883+
return false
884+
}

components/supervisor/pkg/supervisor/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ type StaticConfig struct {
8585

8686
// SSHPort is the port we run the SSH server on
8787
SSHPort int `json:"sshPort"`
88+
89+
// RunEndpointPort is the port where to serve the run API endpoint on
90+
RunEndpointPort *int `json:"runEndpointPort,omitempty"`
8891
}
8992

9093
// Validate validates this configuration.

components/supervisor/pkg/supervisor/services.go

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"bytes"
99
"context"
1010
"errors"
11+
"fmt"
1112
"io"
1213
"os"
1314
"path/filepath"
@@ -244,10 +245,44 @@ func (s *statusService) TasksStatus(req *api.TasksStatusRequest, srv api.StatusS
244245
case <-s.Tasks.ready:
245246
}
246247

248+
var runStream api.StatusService_TasksStatusClient
249+
if !s.Tasks.runGP && s.Tasks.config.RunEndpointPort != nil {
250+
runPort := *s.Tasks.config.RunEndpointPort
251+
if s.Ports.IsServed(uint32(runPort)) {
252+
url := fmt.Sprintf("localhost:%d", *s.Tasks.config.RunEndpointPort)
253+
conn, err := grpc.DialContext(srv.Context(), url, grpc.WithTransportCredentials(insecure.NewCredentials()))
254+
if err != nil {
255+
log.WithError(err).Debug("cannot connect to run supervisor")
256+
} else {
257+
defer conn.Close()
258+
runStream, err = api.NewStatusServiceClient(conn).TasksStatus(srv.Context(), req)
259+
if err != nil {
260+
log.WithError(err).Debug("failed to stream run tasks state")
261+
}
262+
}
263+
}
264+
}
265+
266+
var runTasks []*api.TaskStatus
267+
var subTasks []*api.TaskStatus
268+
doSend := func() error {
269+
var tasks []*api.TaskStatus
270+
tasks = append(tasks, runTasks...)
271+
tasks = append(tasks, subTasks...)
272+
return srv.Send(&api.TasksStatusResponse{Tasks: tasks})
273+
}
274+
247275
if !req.Observe {
248-
return srv.Send(&api.TasksStatusResponse{
249-
Tasks: s.Tasks.Status(),
250-
})
276+
if runStream != nil {
277+
resp, err := runStream.Recv()
278+
if err != nil {
279+
log.WithError(err).Error("failed to receive run tasks state")
280+
} else {
281+
runTasks = resp.Tasks
282+
}
283+
}
284+
subTasks = s.Tasks.Status()
285+
return doSend()
251286
}
252287

253288
sub := s.Tasks.Subscribe()
@@ -256,15 +291,37 @@ func (s *statusService) TasksStatus(req *api.TasksStatusRequest, srv api.StatusS
256291
}
257292
defer sub.Close()
258293

294+
respChan := make(chan []*api.TaskStatus, 5)
295+
if runStream != nil {
296+
go func() {
297+
for {
298+
resp, err := runStream.Recv()
299+
if err != nil {
300+
return
301+
}
302+
respChan <- resp.GetTasks()
303+
}
304+
}()
305+
}
259306
for {
260307
select {
261308
case <-srv.Context().Done():
262309
return nil
310+
case update := <-respChan:
311+
if update == nil {
312+
return nil
313+
}
314+
runTasks = update
315+
err := doSend()
316+
if err != nil {
317+
return err
318+
}
263319
case update := <-sub.Updates():
264320
if update == nil {
265321
return nil
266322
}
267-
err := srv.Send(&api.TasksStatusResponse{Tasks: update})
323+
subTasks = update
324+
err := doSend()
268325
if err != nil {
269326
return err
270327
}
@@ -936,5 +993,8 @@ func (s *portService) RetryAutoExpose(ctx context.Context, req *api.RetryAutoExp
936993

937994
// ResourcesStatus provides workspace resources status information.
938995
func (s *statusService) ResourcesStatus(ctx context.Context, in *api.ResourcesStatuRequest) (*api.ResourcesStatusResponse, error) {
996+
if s.topService == nil {
997+
return nil, status.Errorf(codes.Unavailable, "top service is disabled in run mode")
998+
}
939999
return s.topService.data, nil
9401000
}

components/supervisor/pkg/supervisor/supervisor.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,9 @@ func Run(options ...RunOption) {
228228
if cfg.DesktopIDE != nil {
229229
internalPorts = append(internalPorts, desktopIDEPort)
230230
}
231+
if cfg.RunEndpointPort != nil {
232+
internalPorts = append(internalPorts, uint32(*cfg.RunEndpointPort))
233+
}
231234

232235
endpoint, host, err := cfg.GitpodAPIEndpoint()
233236
if err != nil {
@@ -269,14 +272,15 @@ func Run(options ...RunOption) {
269272
internalPorts...,
270273
)
271274

272-
topService := NewTopService()
273-
topService.Observe(ctx)
274-
275+
var topService *TopService
275276
supervisorMetrics := metrics.NewMetrics()
276277
var metricsReporter *metrics.GrpcMetricsReporter
277278
if opts.RunGP {
278279
cstate.MarkContentReady(csapi.WorkspaceInitFromOther)
279280
} else {
281+
topService := NewTopService()
282+
topService.Observe(ctx)
283+
280284
if !cfg.isHeadless() {
281285
go startAnalyze(ctx, cfg, gitpodConfigService, topService, gitpodService)
282286
}
@@ -315,7 +319,7 @@ func Run(options ...RunOption) {
315319
Gid: gitpodGID,
316320
}
317321

318-
taskManager := newTasksManager(cfg, termMuxSrv, cstate, nil, ideReady, desktopIdeReady)
322+
taskManager := newTasksManager(cfg, termMuxSrv, cstate, nil, ideReady, desktopIdeReady, opts.RunGP)
319323

320324
apiServices := []RegisterableService{
321325
&statusService{

components/supervisor/pkg/supervisor/tasks.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,10 @@ type tasksManager struct {
109109
reporter headlessTaskProgressReporter
110110
ideReady *ideReadyState
111111
desktopIdeReady *ideReadyState
112+
runGP bool
112113
}
113114

114-
func newTasksManager(config *Config, terminalService *terminal.MuxTerminalService, contentState ContentState, reporter headlessTaskProgressReporter, ideReady *ideReadyState, desktopIdeReady *ideReadyState) *tasksManager {
115+
func newTasksManager(config *Config, terminalService *terminal.MuxTerminalService, contentState ContentState, reporter headlessTaskProgressReporter, ideReady *ideReadyState, desktopIdeReady *ideReadyState, runGP bool) *tasksManager {
115116
return &tasksManager{
116117
config: config,
117118
terminalService: terminalService,
@@ -122,6 +123,7 @@ func newTasksManager(config *Config, terminalService *terminal.MuxTerminalServic
122123
storeLocation: logs.TerminalStoreLocation,
123124
ideReady: ideReady,
124125
desktopIdeReady: desktopIdeReady,
126+
runGP: runGP,
125127
}
126128
}
127129

@@ -203,11 +205,19 @@ func (tm *tasksManager) init(ctx context.Context) {
203205
for i, config := range *tasks {
204206
id := strconv.Itoa(i)
205207
presentation := &api.TaskPresentation{}
208+
209+
var name string
206210
if config.Name != nil {
207-
presentation.Name = *config.Name
208-
} else {
209-
presentation.Name = "Gitpod Task " + strconv.Itoa(i+1)
211+
name = *config.Name
212+
}
213+
if name == "" {
214+
name = "Gitpod Task " + strconv.Itoa(i+1)
210215
}
216+
if tm.runGP {
217+
name = "Run: " + name
218+
}
219+
presentation.Name = name
220+
211221
if config.OpenIn != nil {
212222
presentation.OpenIn = *config.OpenIn
213223
}

components/supervisor/pkg/supervisor/tasks_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func TestTaskManager(t *testing.T) {
224224
GitpodTasks: gitpodTasks,
225225
GitpodHeadless: strconv.FormatBool(test.Headless),
226226
},
227-
}, terminalService, contentState, &reporter, nil, nil)
227+
}, terminalService, contentState, &reporter, nil, nil, false)
228228
)
229229
taskManager.storeLocation = storeLocation
230230
contentState.MarkContentReady(test.Source)

components/supervisor/supervisor-config.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@
33
"desktopIdeConfigLocation": "/ide-desktop/supervisor-ide-config.json",
44
"frontendLocation": "/.supervisor/frontend/",
55
"apiEndpointPort": 22999,
6-
"sshPort": 23001
7-
}
6+
"sshPort": 23001,
7+
"runEndpointPort": 25000
8+
}

0 commit comments

Comments
 (0)