Skip to content

Commit 77ed9d5

Browse files
mustard-mhjeanp413
andcommitted
Use workspaceStatus stream rpc in supervisor
Co-authored-by: mustard <[email protected]> Co-authored-by: Jean Pierre <[email protected]>
1 parent af9c9bb commit 77ed9d5

File tree

1 file changed

+139
-94
lines changed

1 file changed

+139
-94
lines changed

components/supervisor/pkg/serverapi/publicapi.go

Lines changed: 139 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"crypto/tls"
1010
"errors"
1111
"fmt"
12-
"reflect"
12+
"io"
1313
"time"
1414

1515
backoff "github.com/cenkalti/backoff/v4"
@@ -61,15 +61,14 @@ type Service struct {
6161
token string
6262
ownerID string
6363

64-
lastServerInstance *gitpod.WorkspaceInstance
65-
6664
// gitpodService server API
6765
gitpodService gitpod.APIInterface
6866
// publicAPIConn public API publicAPIConn
6967
publicAPIConn *grpc.ClientConn
7068
publicApiMetrics *grpc_prometheus.ClientMetrics
7169

7270
previousUsingPublicAPI bool
71+
onUsingPublicAPI chan bool
7372
}
7473

7574
var _ APIInterface = (*Service)(nil)
@@ -110,17 +109,29 @@ func NewServerApiService(ctx context.Context, cfg *ServiceConfig, tknsrv api.Tok
110109
cfg: cfg,
111110
experiments: experiments.NewClient(),
112111
publicApiMetrics: grpc_prometheus.NewClientMetrics(),
112+
onUsingPublicAPI: make(chan bool),
113113
}
114114

115+
// schedule get public api configcat value for instance updates traffic switching
116+
go func() {
117+
ticker := time.NewTicker(time.Second * 1)
118+
for {
119+
select {
120+
case <-ctx.Done():
121+
ticker.Stop()
122+
case <-ticker.C:
123+
service.usePublicAPI(ctx)
124+
}
125+
}
126+
}()
127+
115128
service.publicApiMetrics.EnableClientHandlingTimeHistogram(
116129
// it should be aligned with https://github.com/gitpod-io/gitpod/blob/84ed1a0672d91446ba33cb7b504cfada769271a8/install/installer/pkg/components/ide-metrics/configmap.go#L315
117130
grpc_prometheus.WithHistogramBuckets([]float64{0.1, 0.2, 0.5, 1, 2, 5, 10}),
118131
)
119132

120133
// public api
121134
service.tryConnToPublicAPI()
122-
// listen to server instance update
123-
go service.listenInstanceUpdate(ctx, cfg.InstanceID)
124135

125136
if wsInfo, err := gitpodService.GetWorkspace(ctx, cfg.WorkspaceID); err != nil {
126137
log.WithError(err).Error("cannot get workspace info")
@@ -135,6 +146,12 @@ func (s *Service) tryConnToPublicAPI() {
135146
log.WithField("endpoint", endpoint).Info("connecting to PublicAPI...")
136147
opts := []grpc.DialOption{
137148
grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS13})),
149+
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient([]grpc.StreamClientInterceptor{
150+
func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
151+
withAuth := metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+s.token)
152+
return streamer(withAuth, desc, cc, method, opts...)
153+
},
154+
}...)),
138155
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient([]grpc.UnaryClientInterceptor{
139156
s.publicApiMetrics.UnaryClientInterceptor(),
140157
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
@@ -150,15 +167,6 @@ func (s *Service) tryConnToPublicAPI() {
150167
}
151168
}
152169

153-
func (s *Service) persistServerAPIChannelWhenStart(ctx context.Context) bool {
154-
if s.publicAPIConn == nil || s.ownerID == "" {
155-
return true
156-
}
157-
return experiments.SupervisorPersistServerAPIChannelWhenStart(ctx, s.experiments, experiments.Attributes{
158-
UserID: s.ownerID,
159-
})
160-
}
161-
162170
func (s *Service) usePublicAPI(ctx context.Context) bool {
163171
if s.publicAPIConn == nil || s.ownerID == "" {
164172
return false
@@ -173,6 +181,7 @@ func (s *Service) usePublicAPI(ctx context.Context) bool {
173181
log.Info("switch to use ServerAPI")
174182
}
175183
s.previousUsingPublicAPI = usePublicAPI
184+
s.onUsingPublicAPI <- usePublicAPI
176185
}
177186
return usePublicAPI
178187
}
@@ -236,108 +245,115 @@ func (s *Service) OpenPort(ctx context.Context, workspaceID string, port *gitpod
236245
return port, nil
237246
}
238247

239-
func (s *Service) listenInstanceUpdate(ctx context.Context, instanceID string) {
240-
for {
241-
uptChan, err := backoff.RetryWithData(
242-
func() (<-chan *gitpod.WorkspaceInstance, error) {
243-
return s.gitpodService.InstanceUpdates(ctx, instanceID)
244-
},
245-
backoff.NewExponentialBackOff(),
246-
)
247-
if err != nil {
248-
log.WithError(err).Error("failed to get workspace instance chan several retries")
249-
continue
248+
// InstanceUpdates implements protocol.APIInterface
249+
func (s *Service) InstanceUpdates(ctx context.Context, instanceID string, workspaceID string) (<-chan *gitpod.WorkspaceInstance, error) {
250+
if s == nil {
251+
return nil, errNotConnected
252+
}
253+
254+
updateChan := make(chan *gitpod.WorkspaceInstance)
255+
errChan := make(chan error)
256+
processUpdate := func(usePublicAPI bool) context.CancelFunc {
257+
childCtx, cancel := context.WithCancel(ctx)
258+
if usePublicAPI {
259+
go s.publicAPIInstanceUpdate(childCtx, workspaceID, updateChan, errChan)
260+
} else {
261+
go s.serverInstanceUpdate(childCtx, instanceID, updateChan, errChan)
250262
}
263+
return cancel
264+
}
265+
go func() {
266+
cancel := processUpdate(s.usePublicAPI(ctx))
267+
defer func() {
268+
cancel()
269+
close(updateChan)
270+
}()
251271
for {
252272
select {
253273
case <-ctx.Done():
254274
return
255-
case instance := <-uptChan:
256-
s.lastServerInstance = instance
275+
case usePublicAPI := <-s.onUsingPublicAPI:
276+
cancel()
277+
cancel = processUpdate(usePublicAPI)
278+
case err := <-errChan:
279+
if errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) {
280+
continue
281+
}
282+
log.WithField("method", "InstanceUpdates").WithError(err).Error("failed to listen")
283+
cancel()
284+
time.Sleep(time.Second * 2)
285+
cancel = processUpdate(s.usePublicAPI(ctx))
257286
}
258287
}
259-
}
288+
}()
289+
290+
return updateChan, nil
260291
}
261292

262-
func (s *Service) getWorkspaceInfo(ctx context.Context, instanceID, workspaceID string) (*gitpod.WorkspaceInstance, error) {
263-
getData := func() (*gitpod.WorkspaceInstance, error) {
264-
if !s.usePublicAPI(ctx) {
265-
return s.lastServerInstance, nil
266-
}
293+
func (s *Service) publicAPIInstanceUpdate(ctx context.Context, workspaceID string, updateChan chan *gitpod.WorkspaceInstance, errChan chan error) {
294+
resp, err := backoff.RetryWithData(func() (v1.WorkspacesService_StreamWorkspaceStatusClient, error) {
267295
service := v1.NewWorkspacesServiceClient(s.publicAPIConn)
268-
resp, err := service.GetWorkspace(ctx, &v1.GetWorkspaceRequest{
296+
resp, err := service.StreamWorkspaceStatus(ctx, &v1.StreamWorkspaceStatusRequest{
269297
WorkspaceId: workspaceID,
270298
})
271299
if err != nil {
272-
log.WithField("method", "GetWorkspace").WithError(err).Error("failed to call PublicAPI")
273-
return nil, err
300+
log.WithError(err).Info("backoff failed to get workspace service client of PublicAPI, try again")
274301
}
275-
instance := &gitpod.WorkspaceInstance{
276-
CreationTime: resp.Result.Status.Instance.CreatedAt.String(),
277-
ID: resp.Result.Status.Instance.InstanceId,
278-
Status: &gitpod.WorkspaceInstanceStatus{
279-
ExposedPorts: []*gitpod.WorkspaceInstancePort{},
280-
Message: resp.Result.Status.Instance.Status.Message,
281-
// OwnerToken: "", not used so ignore
282-
Phase: resp.Result.Status.Instance.Status.Phase.String(),
283-
Timeout: resp.Result.Status.Instance.Status.Conditions.Timeout,
284-
Version: int(resp.Result.Status.Instance.Status.StatusVersion),
285-
},
286-
WorkspaceID: resp.Result.WorkspaceId,
287-
}
288-
for _, port := range resp.Result.Status.Instance.Status.Ports {
289-
info := &gitpod.WorkspaceInstancePort{
290-
Port: float64(port.Port),
291-
URL: port.Url,
302+
return resp, err
303+
}, connBackoff)
304+
if err != nil {
305+
log.WithField("method", "StreamWorkspaceStatus").WithError(err).Error("failed to call PublicAPI")
306+
errChan <- err
307+
return
308+
}
309+
log.WithField("method", "StreamWorkspaceStatus").Info("start to listen on publicAPI instanceUpdates")
310+
for {
311+
resp, err := resp.Recv()
312+
if err != nil {
313+
if err != io.EOF {
314+
log.WithField("method", "StreamWorkspaceStatus").WithError(err).Error("failed to receive status update")
292315
}
293-
if port.Policy == v1.PortPolicy_PORT_POLICY_PUBLIC {
294-
info.Visibility = gitpod.PortVisibilityPublic
295-
} else {
296-
info.Visibility = gitpod.PortVisibilityPrivate
316+
if ctx.Err() != nil {
317+
return
297318
}
298-
instance.Status.ExposedPorts = append(instance.Status.ExposedPorts, info)
319+
errChan <- err
320+
return
299321
}
300-
return instance, nil
322+
updateChan <- workspaceStatusToWorkspaceInstance(resp.Result)
301323
}
302-
exp := &backoff.ExponentialBackOff{
303-
InitialInterval: 2 * time.Second,
304-
RandomizationFactor: 0.5,
305-
Multiplier: 1.5,
306-
MaxInterval: 30 * time.Second,
307-
MaxElapsedTime: 0,
308-
Stop: backoff.Stop,
309-
Clock: backoff.SystemClock,
310-
}
311-
return backoff.RetryWithData(getData, exp)
312324
}
313325

314-
// InstanceUpdates implements protocol.APIInterface
315-
func (s *Service) InstanceUpdates(ctx context.Context, instanceID string, workspaceID string) (<-chan *gitpod.WorkspaceInstance, error) {
316-
if s == nil {
317-
return nil, errNotConnected
326+
func (s *Service) serverInstanceUpdate(ctx context.Context, instanceID string, updateChan chan *gitpod.WorkspaceInstance, errChan chan error) {
327+
ch, err := backoff.RetryWithData(func() (<-chan *gitpod.WorkspaceInstance, error) {
328+
ch, err := s.gitpodService.InstanceUpdates(ctx, instanceID)
329+
if err != nil {
330+
log.WithError(err).Info("backoff failed to listen to serverAPI instanceUpdates, try again")
331+
}
332+
return ch, err
333+
}, connBackoff)
334+
if err != nil {
335+
log.WithField("method", "InstanceUpdates").WithError(err).Error("failed to call serverAPI")
336+
errChan <- err
337+
return
318338
}
319-
if !s.usePublicAPI(ctx) && s.persistServerAPIChannelWhenStart(ctx) {
320-
return s.gitpodService.InstanceUpdates(ctx, instanceID)
339+
log.WithField("method", "InstanceUpdates").WithField("instanceID", instanceID).Info("start to listen on serverAPI instanceUpdates")
340+
for update := range ch {
341+
updateChan <- update
321342
}
322-
updateChan := make(chan *gitpod.WorkspaceInstance)
323-
var latestInstance *gitpod.WorkspaceInstance
324-
go func() {
325-
for {
326-
if ctx.Err() != nil {
327-
close(updateChan)
328-
break
329-
}
330-
if instance, err := s.getWorkspaceInfo(ctx, instanceID, workspaceID); err == nil {
331-
if reflect.DeepEqual(latestInstance, instance) {
332-
continue
333-
}
334-
latestInstance = instance
335-
updateChan <- instance
336-
}
337-
time.Sleep(1 * time.Second)
338-
}
339-
}()
340-
return updateChan, nil
343+
if ctx.Err() != nil {
344+
return
345+
}
346+
errChan <- io.EOF
347+
}
348+
349+
var connBackoff = &backoff.ExponentialBackOff{
350+
InitialInterval: 2 * time.Second,
351+
RandomizationFactor: 0.5,
352+
Multiplier: 1.5,
353+
MaxInterval: 30 * time.Second,
354+
MaxElapsedTime: 0,
355+
Stop: backoff.Stop,
356+
Clock: backoff.SystemClock,
341357
}
342358

343359
// GetOwnerID implements APIInterface
@@ -375,3 +391,32 @@ func (s *Service) RegisterMetrics(registry *prometheus.Registry) error {
375391
}
376392
return registry.Register(s.publicApiMetrics)
377393
}
394+
395+
func workspaceStatusToWorkspaceInstance(status *v1.WorkspaceStatus) *gitpod.WorkspaceInstance {
396+
instance := &gitpod.WorkspaceInstance{
397+
CreationTime: status.Instance.CreatedAt.String(),
398+
ID: status.Instance.InstanceId,
399+
Status: &gitpod.WorkspaceInstanceStatus{
400+
ExposedPorts: []*gitpod.WorkspaceInstancePort{},
401+
Message: status.Instance.Status.Message,
402+
// OwnerToken: "", not used so ignore
403+
Phase: status.Instance.Status.Phase.String(),
404+
Timeout: status.Instance.Status.Conditions.Timeout,
405+
Version: int(status.Instance.Status.StatusVersion),
406+
},
407+
WorkspaceID: status.Instance.WorkspaceId,
408+
}
409+
for _, port := range status.Instance.Status.Ports {
410+
info := &gitpod.WorkspaceInstancePort{
411+
Port: float64(port.Port),
412+
URL: port.Url,
413+
}
414+
if port.Policy == v1.PortPolicy_PORT_POLICY_PUBLIC {
415+
info.Visibility = gitpod.PortVisibilityPublic
416+
} else {
417+
info.Visibility = gitpod.PortVisibilityPrivate
418+
}
419+
instance.Status.ExposedPorts = append(instance.Status.ExposedPorts, info)
420+
}
421+
return instance
422+
}

0 commit comments

Comments
 (0)