Skip to content

Commit 905ee6c

Browse files
committed
[supervisor] merge run tasks
1 parent 348f325 commit 905ee6c

File tree

11 files changed

+248
-24
lines changed

11 files changed

+248
-24
lines changed

components/supervisor/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
supervisor

components/supervisor/pkg/ports/ports.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -872,3 +872,13 @@ func defaultRoutableIP() string {
872872

873873
return addresses[0].(*net.IPNet).IP.String()
874874
}
875+
876+
func (pm *Manager) IsServed(port uint32) bool {
877+
served := pm.served
878+
for _, served := range served {
879+
if served.Port == port {
880+
return true
881+
}
882+
}
883+
return false
884+
}

components/supervisor/pkg/run/run.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2+
// Licensed under the GNU Affero General Public License (AGPL).
3+
// See License.AGPL.txt in the project root for license information.
4+
5+
package run
6+
7+
import (
8+
"context"
9+
"fmt"
10+
"sync"
11+
12+
"github.com/gitpod-io/gitpod/supervisor/api"
13+
"github.com/gitpod-io/gitpod/supervisor/pkg/ports"
14+
"google.golang.org/grpc"
15+
"google.golang.org/grpc/credentials/insecure"
16+
)
17+
18+
type Option struct {
19+
Port uint32
20+
Ports *ports.Manager
21+
}
22+
23+
type Client struct {
24+
option *Option
25+
26+
conn *grpc.ClientConn
27+
closeOnce sync.Once
28+
29+
Status api.StatusServiceClient
30+
Terminal api.TerminalServiceClient
31+
}
32+
33+
func New(ctx context.Context, option *Option) (*Client, error) {
34+
url := fmt.Sprintf("localhost:%d", option.Port)
35+
conn, err := grpc.DialContext(ctx, url, grpc.WithTransportCredentials(insecure.NewCredentials()))
36+
if err != nil {
37+
return nil, err
38+
}
39+
return &Client{
40+
option: option,
41+
conn: conn,
42+
Status: api.NewStatusServiceClient(conn),
43+
Terminal: api.NewTerminalServiceClient(conn),
44+
}, nil
45+
}
46+
47+
func (client *Client) Close() {
48+
client.closeOnce.Do(func() {
49+
client.conn.Close()
50+
})
51+
}
52+
53+
func (client *Client) Available() bool {
54+
if client == nil {
55+
return false
56+
}
57+
return client.option.Ports.IsServed(client.option.Port)
58+
}

components/supervisor/pkg/supervisor/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ type StaticConfig struct {
8585

8686
// SSHPort is the port we run the SSH server on
8787
SSHPort int `json:"sshPort"`
88+
89+
// RunEndpointPort is the port where to serve the run API endpoint on
90+
RunEndpointPort *int `json:"runEndpointPort,omitempty"`
8891
}
8992

9093
// Validate validates this configuration.

components/supervisor/pkg/supervisor/services.go

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
csapi "github.com/gitpod-io/gitpod/content-service/api"
2727
"github.com/gitpod-io/gitpod/supervisor/api"
2828
"github.com/gitpod-io/gitpod/supervisor/pkg/ports"
29+
"github.com/gitpod-io/gitpod/supervisor/pkg/run"
2930
)
3031

3132
// RegisterableService can register a service.
@@ -98,6 +99,7 @@ type statusService struct {
9899
ideReady *ideReadyState
99100
desktopIdeReady *ideReadyState
100101
topService *TopService
102+
runClient *run.Client
101103

102104
api.UnimplementedStatusServiceServer
103105
}
@@ -244,10 +246,37 @@ func (s *statusService) TasksStatus(req *api.TasksStatusRequest, srv api.StatusS
244246
case <-s.Tasks.ready:
245247
}
246248

249+
var (
250+
runStream api.StatusService_TasksStatusClient
251+
err error
252+
)
253+
if s.runClient.Available() {
254+
runStream, err = s.runClient.Status.TasksStatus(srv.Context(), req)
255+
if err != nil {
256+
log.WithError(err).Error("failed to stream run tasks state")
257+
}
258+
}
259+
260+
var runTasks []*api.TaskStatus
261+
var subTasks []*api.TaskStatus
262+
doSend := func() error {
263+
var tasks []*api.TaskStatus
264+
tasks = append(tasks, runTasks...)
265+
tasks = append(tasks, subTasks...)
266+
return srv.Send(&api.TasksStatusResponse{Tasks: tasks})
267+
}
268+
247269
if !req.Observe {
248-
return srv.Send(&api.TasksStatusResponse{
249-
Tasks: s.Tasks.Status(),
250-
})
270+
if runStream != nil {
271+
resp, err := runStream.Recv()
272+
if err != nil {
273+
log.WithError(err).Error("failed to receive run tasks state")
274+
} else {
275+
runTasks = resp.Tasks
276+
}
277+
}
278+
subTasks = s.Tasks.Status()
279+
return doSend()
251280
}
252281

253282
sub := s.Tasks.Subscribe()
@@ -256,15 +285,37 @@ func (s *statusService) TasksStatus(req *api.TasksStatusRequest, srv api.StatusS
256285
}
257286
defer sub.Close()
258287

288+
respChan := make(chan []*api.TaskStatus, 5)
289+
if runStream != nil {
290+
go func() {
291+
for {
292+
resp, err := runStream.Recv()
293+
if err != nil {
294+
return
295+
}
296+
respChan <- resp.GetTasks()
297+
}
298+
}()
299+
}
259300
for {
260301
select {
261302
case <-srv.Context().Done():
262303
return nil
304+
case update := <-respChan:
305+
if update == nil {
306+
return nil
307+
}
308+
runTasks = update
309+
err := doSend()
310+
if err != nil {
311+
return err
312+
}
263313
case update := <-sub.Updates():
264314
if update == nil {
265315
return nil
266316
}
267-
err := srv.Send(&api.TasksStatusResponse{Tasks: update})
317+
subTasks = update
318+
err := doSend()
268319
if err != nil {
269320
return err
270321
}
@@ -936,5 +987,8 @@ func (s *portService) RetryAutoExpose(ctx context.Context, req *api.RetryAutoExp
936987

937988
// ResourcesStatus provides workspace resources status information.
938989
func (s *statusService) ResourcesStatus(ctx context.Context, in *api.ResourcesStatuRequest) (*api.ResourcesStatusResponse, error) {
990+
if s.topService == nil {
991+
return nil, status.Errorf(codes.Unavailable, "top service is disabled in run mode")
992+
}
939993
return s.topService.data, nil
940994
}

components/supervisor/pkg/supervisor/supervisor.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import (
5757
"github.com/gitpod-io/gitpod/supervisor/pkg/dropwriter"
5858
"github.com/gitpod-io/gitpod/supervisor/pkg/metrics"
5959
"github.com/gitpod-io/gitpod/supervisor/pkg/ports"
60+
"github.com/gitpod-io/gitpod/supervisor/pkg/run"
6061
"github.com/gitpod-io/gitpod/supervisor/pkg/serverapi"
6162
"github.com/gitpod-io/gitpod/supervisor/pkg/terminal"
6263

@@ -228,6 +229,9 @@ func Run(options ...RunOption) {
228229
if cfg.DesktopIDE != nil {
229230
internalPorts = append(internalPorts, desktopIDEPort)
230231
}
232+
if cfg.RunEndpointPort != nil {
233+
internalPorts = append(internalPorts, uint32(*cfg.RunEndpointPort))
234+
}
231235

232236
endpoint, host, err := cfg.GitpodAPIEndpoint()
233237
if err != nil {
@@ -269,14 +273,28 @@ func Run(options ...RunOption) {
269273
internalPorts...,
270274
)
271275

272-
topService := NewTopService()
273-
topService.Observe(ctx)
274-
276+
var runClient *run.Client
277+
var topService *TopService
275278
supervisorMetrics := metrics.NewMetrics()
276279
var metricsReporter *metrics.GrpcMetricsReporter
277280
if opts.RunGP {
278281
cstate.MarkContentReady(csapi.WorkspaceInitFromOther)
279282
} else {
283+
if cfg.RunEndpointPort != nil {
284+
runClient, err = run.New(ctx, &run.Option{
285+
Port: uint32(*cfg.RunEndpointPort),
286+
Ports: portMgmt,
287+
})
288+
if err != nil {
289+
log.WithError(err).Error("cannot connect to run supervisor")
290+
} else {
291+
defer runClient.Close()
292+
}
293+
}
294+
295+
topService := NewTopService()
296+
topService.Observe(ctx)
297+
280298
if !cfg.isHeadless() {
281299
go startAnalyze(ctx, cfg, gitpodConfigService, topService, gitpodService)
282300
}
@@ -295,7 +313,7 @@ func Run(options ...RunOption) {
295313
}
296314

297315
termMux := terminal.NewMux()
298-
termMuxSrv := terminal.NewMuxTerminalService(termMux)
316+
termMuxSrv := terminal.NewMuxTerminalService(termMux, runClient)
299317
termMuxSrv.DefaultWorkdir = cfg.RepoRoot
300318
if cfg.WorkspaceRoot != "" {
301319
termMuxSrv.DefaultWorkdirProvider = func() string {
@@ -315,7 +333,7 @@ func Run(options ...RunOption) {
315333
Gid: gitpodGID,
316334
}
317335

318-
taskManager := newTasksManager(cfg, termMuxSrv, cstate, nil, ideReady, desktopIdeReady)
336+
taskManager := newTasksManager(cfg, termMuxSrv, cstate, nil, ideReady, desktopIdeReady, opts.RunGP)
319337

320338
apiServices := []RegisterableService{
321339
&statusService{
@@ -325,6 +343,7 @@ func Run(options ...RunOption) {
325343
ideReady: ideReady,
326344
desktopIdeReady: desktopIdeReady,
327345
topService: topService,
346+
runClient: runClient,
328347
},
329348
termMuxSrv,
330349
RegistrableTokenService{Service: tokenService},

components/supervisor/pkg/supervisor/tasks.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,10 @@ type tasksManager struct {
109109
reporter headlessTaskProgressReporter
110110
ideReady *ideReadyState
111111
desktopIdeReady *ideReadyState
112+
runGP bool
112113
}
113114

114-
func newTasksManager(config *Config, terminalService *terminal.MuxTerminalService, contentState ContentState, reporter headlessTaskProgressReporter, ideReady *ideReadyState, desktopIdeReady *ideReadyState) *tasksManager {
115+
func newTasksManager(config *Config, terminalService *terminal.MuxTerminalService, contentState ContentState, reporter headlessTaskProgressReporter, ideReady *ideReadyState, desktopIdeReady *ideReadyState, runGP bool) *tasksManager {
115116
return &tasksManager{
116117
config: config,
117118
terminalService: terminalService,
@@ -122,6 +123,7 @@ func newTasksManager(config *Config, terminalService *terminal.MuxTerminalServic
122123
storeLocation: logs.TerminalStoreLocation,
123124
ideReady: ideReady,
124125
desktopIdeReady: desktopIdeReady,
126+
runGP: runGP,
125127
}
126128
}
127129

@@ -203,11 +205,19 @@ func (tm *tasksManager) init(ctx context.Context) {
203205
for i, config := range *tasks {
204206
id := strconv.Itoa(i)
205207
presentation := &api.TaskPresentation{}
208+
209+
var name string
206210
if config.Name != nil {
207-
presentation.Name = *config.Name
208-
} else {
209-
presentation.Name = "Gitpod Task " + strconv.Itoa(i+1)
211+
name = *config.Name
212+
}
213+
if name == "" {
214+
name = "Gitpod Task " + strconv.Itoa(i+1)
210215
}
216+
if tm.runGP {
217+
name = "Run: " + name
218+
}
219+
presentation.Name = name
220+
211221
if config.OpenIn != nil {
212222
presentation.OpenIn = *config.OpenIn
213223
}

components/supervisor/pkg/supervisor/tasks_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,15 +216,15 @@ func TestTaskManager(t *testing.T) {
216216
}
217217

218218
var (
219-
terminalService = terminal.NewMuxTerminalService(terminal.NewMux())
219+
terminalService = terminal.NewMuxTerminalService(terminal.NewMux(), nil)
220220
contentState = NewInMemoryContentState("")
221221
reporter = testHeadlessTaskProgressReporter{}
222222
taskManager = newTasksManager(&Config{
223223
WorkspaceConfig: WorkspaceConfig{
224224
GitpodTasks: gitpodTasks,
225225
GitpodHeadless: strconv.FormatBool(test.Headless),
226226
},
227-
}, terminalService, contentState, &reporter, nil, nil)
227+
}, terminalService, contentState, &reporter, nil, nil, false)
228228
)
229229
taskManager.storeLocation = storeLocation
230230
contentState.MarkContentReady(test.Source)

0 commit comments

Comments
 (0)