From 77ed9d547ec924c24937447948e53d6a54e28ef8 Mon Sep 17 00:00:00 2001 From: Huiwen Date: Thu, 12 Jan 2023 09:40:15 +0000 Subject: [PATCH] Use workspaceStatus stream rpc in supervisor Co-authored-by: mustard Co-authored-by: Jean Pierre --- .../supervisor/pkg/serverapi/publicapi.go | 233 +++++++++++------- 1 file changed, 139 insertions(+), 94 deletions(-) diff --git a/components/supervisor/pkg/serverapi/publicapi.go b/components/supervisor/pkg/serverapi/publicapi.go index 1ff8372d7e0d3e..ea9a9425e3ece2 100644 --- a/components/supervisor/pkg/serverapi/publicapi.go +++ b/components/supervisor/pkg/serverapi/publicapi.go @@ -9,7 +9,7 @@ import ( "crypto/tls" "errors" "fmt" - "reflect" + "io" "time" backoff "github.com/cenkalti/backoff/v4" @@ -61,8 +61,6 @@ type Service struct { token string ownerID string - lastServerInstance *gitpod.WorkspaceInstance - // gitpodService server API gitpodService gitpod.APIInterface // publicAPIConn public API publicAPIConn @@ -70,6 +68,7 @@ type Service struct { publicApiMetrics *grpc_prometheus.ClientMetrics previousUsingPublicAPI bool + onUsingPublicAPI chan bool } var _ APIInterface = (*Service)(nil) @@ -110,8 +109,22 @@ func NewServerApiService(ctx context.Context, cfg *ServiceConfig, tknsrv api.Tok cfg: cfg, experiments: experiments.NewClient(), publicApiMetrics: grpc_prometheus.NewClientMetrics(), + onUsingPublicAPI: make(chan bool), } + // schedule get public api configcat value for instance updates traffic switching + go func() { + ticker := time.NewTicker(time.Second * 1) + for { + select { + case <-ctx.Done(): + ticker.Stop() + case <-ticker.C: + service.usePublicAPI(ctx) + } + } + }() + service.publicApiMetrics.EnableClientHandlingTimeHistogram( // it should be aligned with https://github.com/gitpod-io/gitpod/blob/84ed1a0672d91446ba33cb7b504cfada769271a8/install/installer/pkg/components/ide-metrics/configmap.go#L315 grpc_prometheus.WithHistogramBuckets([]float64{0.1, 0.2, 0.5, 1, 2, 5, 10}), @@ -119,8 +132,6 @@ func NewServerApiService(ctx context.Context, cfg *ServiceConfig, tknsrv api.Tok // public api service.tryConnToPublicAPI() - // listen to server instance update - go service.listenInstanceUpdate(ctx, cfg.InstanceID) if wsInfo, err := gitpodService.GetWorkspace(ctx, cfg.WorkspaceID); err != nil { log.WithError(err).Error("cannot get workspace info") @@ -135,6 +146,12 @@ func (s *Service) tryConnToPublicAPI() { log.WithField("endpoint", endpoint).Info("connecting to PublicAPI...") opts := []grpc.DialOption{ grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS13})), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient([]grpc.StreamClientInterceptor{ + func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + withAuth := metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+s.token) + return streamer(withAuth, desc, cc, method, opts...) + }, + }...)), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient([]grpc.UnaryClientInterceptor{ s.publicApiMetrics.UnaryClientInterceptor(), func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { @@ -150,15 +167,6 @@ func (s *Service) tryConnToPublicAPI() { } } -func (s *Service) persistServerAPIChannelWhenStart(ctx context.Context) bool { - if s.publicAPIConn == nil || s.ownerID == "" { - return true - } - return experiments.SupervisorPersistServerAPIChannelWhenStart(ctx, s.experiments, experiments.Attributes{ - UserID: s.ownerID, - }) -} - func (s *Service) usePublicAPI(ctx context.Context) bool { if s.publicAPIConn == nil || s.ownerID == "" { return false @@ -173,6 +181,7 @@ func (s *Service) usePublicAPI(ctx context.Context) bool { log.Info("switch to use ServerAPI") } s.previousUsingPublicAPI = usePublicAPI + s.onUsingPublicAPI <- usePublicAPI } return usePublicAPI } @@ -236,108 +245,115 @@ func (s *Service) OpenPort(ctx context.Context, workspaceID string, port *gitpod return port, nil } -func (s *Service) listenInstanceUpdate(ctx context.Context, instanceID string) { - for { - uptChan, err := backoff.RetryWithData( - func() (<-chan *gitpod.WorkspaceInstance, error) { - return s.gitpodService.InstanceUpdates(ctx, instanceID) - }, - backoff.NewExponentialBackOff(), - ) - if err != nil { - log.WithError(err).Error("failed to get workspace instance chan several retries") - continue +// InstanceUpdates implements protocol.APIInterface +func (s *Service) InstanceUpdates(ctx context.Context, instanceID string, workspaceID string) (<-chan *gitpod.WorkspaceInstance, error) { + if s == nil { + return nil, errNotConnected + } + + updateChan := make(chan *gitpod.WorkspaceInstance) + errChan := make(chan error) + processUpdate := func(usePublicAPI bool) context.CancelFunc { + childCtx, cancel := context.WithCancel(ctx) + if usePublicAPI { + go s.publicAPIInstanceUpdate(childCtx, workspaceID, updateChan, errChan) + } else { + go s.serverInstanceUpdate(childCtx, instanceID, updateChan, errChan) } + return cancel + } + go func() { + cancel := processUpdate(s.usePublicAPI(ctx)) + defer func() { + cancel() + close(updateChan) + }() for { select { case <-ctx.Done(): return - case instance := <-uptChan: - s.lastServerInstance = instance + case usePublicAPI := <-s.onUsingPublicAPI: + cancel() + cancel = processUpdate(usePublicAPI) + case err := <-errChan: + if errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) { + continue + } + log.WithField("method", "InstanceUpdates").WithError(err).Error("failed to listen") + cancel() + time.Sleep(time.Second * 2) + cancel = processUpdate(s.usePublicAPI(ctx)) } } - } + }() + + return updateChan, nil } -func (s *Service) getWorkspaceInfo(ctx context.Context, instanceID, workspaceID string) (*gitpod.WorkspaceInstance, error) { - getData := func() (*gitpod.WorkspaceInstance, error) { - if !s.usePublicAPI(ctx) { - return s.lastServerInstance, nil - } +func (s *Service) publicAPIInstanceUpdate(ctx context.Context, workspaceID string, updateChan chan *gitpod.WorkspaceInstance, errChan chan error) { + resp, err := backoff.RetryWithData(func() (v1.WorkspacesService_StreamWorkspaceStatusClient, error) { service := v1.NewWorkspacesServiceClient(s.publicAPIConn) - resp, err := service.GetWorkspace(ctx, &v1.GetWorkspaceRequest{ + resp, err := service.StreamWorkspaceStatus(ctx, &v1.StreamWorkspaceStatusRequest{ WorkspaceId: workspaceID, }) if err != nil { - log.WithField("method", "GetWorkspace").WithError(err).Error("failed to call PublicAPI") - return nil, err + log.WithError(err).Info("backoff failed to get workspace service client of PublicAPI, try again") } - instance := &gitpod.WorkspaceInstance{ - CreationTime: resp.Result.Status.Instance.CreatedAt.String(), - ID: resp.Result.Status.Instance.InstanceId, - Status: &gitpod.WorkspaceInstanceStatus{ - ExposedPorts: []*gitpod.WorkspaceInstancePort{}, - Message: resp.Result.Status.Instance.Status.Message, - // OwnerToken: "", not used so ignore - Phase: resp.Result.Status.Instance.Status.Phase.String(), - Timeout: resp.Result.Status.Instance.Status.Conditions.Timeout, - Version: int(resp.Result.Status.Instance.Status.StatusVersion), - }, - WorkspaceID: resp.Result.WorkspaceId, - } - for _, port := range resp.Result.Status.Instance.Status.Ports { - info := &gitpod.WorkspaceInstancePort{ - Port: float64(port.Port), - URL: port.Url, + return resp, err + }, connBackoff) + if err != nil { + log.WithField("method", "StreamWorkspaceStatus").WithError(err).Error("failed to call PublicAPI") + errChan <- err + return + } + log.WithField("method", "StreamWorkspaceStatus").Info("start to listen on publicAPI instanceUpdates") + for { + resp, err := resp.Recv() + if err != nil { + if err != io.EOF { + log.WithField("method", "StreamWorkspaceStatus").WithError(err).Error("failed to receive status update") } - if port.Policy == v1.PortPolicy_PORT_POLICY_PUBLIC { - info.Visibility = gitpod.PortVisibilityPublic - } else { - info.Visibility = gitpod.PortVisibilityPrivate + if ctx.Err() != nil { + return } - instance.Status.ExposedPorts = append(instance.Status.ExposedPorts, info) + errChan <- err + return } - return instance, nil + updateChan <- workspaceStatusToWorkspaceInstance(resp.Result) } - exp := &backoff.ExponentialBackOff{ - InitialInterval: 2 * time.Second, - RandomizationFactor: 0.5, - Multiplier: 1.5, - MaxInterval: 30 * time.Second, - MaxElapsedTime: 0, - Stop: backoff.Stop, - Clock: backoff.SystemClock, - } - return backoff.RetryWithData(getData, exp) } -// InstanceUpdates implements protocol.APIInterface -func (s *Service) InstanceUpdates(ctx context.Context, instanceID string, workspaceID string) (<-chan *gitpod.WorkspaceInstance, error) { - if s == nil { - return nil, errNotConnected +func (s *Service) serverInstanceUpdate(ctx context.Context, instanceID string, updateChan chan *gitpod.WorkspaceInstance, errChan chan error) { + ch, err := backoff.RetryWithData(func() (<-chan *gitpod.WorkspaceInstance, error) { + ch, err := s.gitpodService.InstanceUpdates(ctx, instanceID) + if err != nil { + log.WithError(err).Info("backoff failed to listen to serverAPI instanceUpdates, try again") + } + return ch, err + }, connBackoff) + if err != nil { + log.WithField("method", "InstanceUpdates").WithError(err).Error("failed to call serverAPI") + errChan <- err + return } - if !s.usePublicAPI(ctx) && s.persistServerAPIChannelWhenStart(ctx) { - return s.gitpodService.InstanceUpdates(ctx, instanceID) + log.WithField("method", "InstanceUpdates").WithField("instanceID", instanceID).Info("start to listen on serverAPI instanceUpdates") + for update := range ch { + updateChan <- update } - updateChan := make(chan *gitpod.WorkspaceInstance) - var latestInstance *gitpod.WorkspaceInstance - go func() { - for { - if ctx.Err() != nil { - close(updateChan) - break - } - if instance, err := s.getWorkspaceInfo(ctx, instanceID, workspaceID); err == nil { - if reflect.DeepEqual(latestInstance, instance) { - continue - } - latestInstance = instance - updateChan <- instance - } - time.Sleep(1 * time.Second) - } - }() - return updateChan, nil + if ctx.Err() != nil { + return + } + errChan <- io.EOF +} + +var connBackoff = &backoff.ExponentialBackOff{ + InitialInterval: 2 * time.Second, + RandomizationFactor: 0.5, + Multiplier: 1.5, + MaxInterval: 30 * time.Second, + MaxElapsedTime: 0, + Stop: backoff.Stop, + Clock: backoff.SystemClock, } // GetOwnerID implements APIInterface @@ -375,3 +391,32 @@ func (s *Service) RegisterMetrics(registry *prometheus.Registry) error { } return registry.Register(s.publicApiMetrics) } + +func workspaceStatusToWorkspaceInstance(status *v1.WorkspaceStatus) *gitpod.WorkspaceInstance { + instance := &gitpod.WorkspaceInstance{ + CreationTime: status.Instance.CreatedAt.String(), + ID: status.Instance.InstanceId, + Status: &gitpod.WorkspaceInstanceStatus{ + ExposedPorts: []*gitpod.WorkspaceInstancePort{}, + Message: status.Instance.Status.Message, + // OwnerToken: "", not used so ignore + Phase: status.Instance.Status.Phase.String(), + Timeout: status.Instance.Status.Conditions.Timeout, + Version: int(status.Instance.Status.StatusVersion), + }, + WorkspaceID: status.Instance.WorkspaceId, + } + for _, port := range status.Instance.Status.Ports { + info := &gitpod.WorkspaceInstancePort{ + Port: float64(port.Port), + URL: port.Url, + } + if port.Policy == v1.PortPolicy_PORT_POLICY_PUBLIC { + info.Visibility = gitpod.PortVisibilityPublic + } else { + info.Visibility = gitpod.PortVisibilityPrivate + } + instance.Status.ExposedPorts = append(instance.Status.ExposedPorts, info) + } + return instance +}