Skip to content

Use workspaceStatus stream rpc in supervisor #15403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 139 additions & 94 deletions components/supervisor/pkg/serverapi/publicapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"reflect"
"io"
"time"

backoff "github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -61,15 +61,14 @@ type Service struct {
token string
ownerID string

lastServerInstance *gitpod.WorkspaceInstance

// gitpodService server API
gitpodService gitpod.APIInterface
// publicAPIConn public API publicAPIConn
publicAPIConn *grpc.ClientConn
publicApiMetrics *grpc_prometheus.ClientMetrics

previousUsingPublicAPI bool
onUsingPublicAPI chan bool
}

var _ APIInterface = (*Service)(nil)
Expand Down Expand Up @@ -110,17 +109,29 @@ func NewServerApiService(ctx context.Context, cfg *ServiceConfig, tknsrv api.Tok
cfg: cfg,
experiments: experiments.NewClient(),
publicApiMetrics: grpc_prometheus.NewClientMetrics(),
onUsingPublicAPI: make(chan bool),
}

// schedule get public api configcat value for instance updates traffic switching
go func() {
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ctx.Done():
ticker.Stop()
case <-ticker.C:
service.usePublicAPI(ctx)
}
}
}()

service.publicApiMetrics.EnableClientHandlingTimeHistogram(
// it should be aligned with https://github.com/gitpod-io/gitpod/blob/84ed1a0672d91446ba33cb7b504cfada769271a8/install/installer/pkg/components/ide-metrics/configmap.go#L315
grpc_prometheus.WithHistogramBuckets([]float64{0.1, 0.2, 0.5, 1, 2, 5, 10}),
)

// public api
service.tryConnToPublicAPI()
// listen to server instance update
go service.listenInstanceUpdate(ctx, cfg.InstanceID)

if wsInfo, err := gitpodService.GetWorkspace(ctx, cfg.WorkspaceID); err != nil {
log.WithError(err).Error("cannot get workspace info")
Expand All @@ -135,6 +146,12 @@ func (s *Service) tryConnToPublicAPI() {
log.WithField("endpoint", endpoint).Info("connecting to PublicAPI...")
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS13})),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient([]grpc.StreamClientInterceptor{
func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
withAuth := metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+s.token)
return streamer(withAuth, desc, cc, method, opts...)
},
}...)),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient([]grpc.UnaryClientInterceptor{
s.publicApiMetrics.UnaryClientInterceptor(),
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand All @@ -150,15 +167,6 @@ func (s *Service) tryConnToPublicAPI() {
}
}

func (s *Service) persistServerAPIChannelWhenStart(ctx context.Context) bool {
if s.publicAPIConn == nil || s.ownerID == "" {
return true
}
return experiments.SupervisorPersistServerAPIChannelWhenStart(ctx, s.experiments, experiments.Attributes{
UserID: s.ownerID,
})
}

func (s *Service) usePublicAPI(ctx context.Context) bool {
if s.publicAPIConn == nil || s.ownerID == "" {
return false
Expand All @@ -173,6 +181,7 @@ func (s *Service) usePublicAPI(ctx context.Context) bool {
log.Info("switch to use ServerAPI")
}
s.previousUsingPublicAPI = usePublicAPI
s.onUsingPublicAPI <- usePublicAPI
}
return usePublicAPI
}
Expand Down Expand Up @@ -236,108 +245,115 @@ func (s *Service) OpenPort(ctx context.Context, workspaceID string, port *gitpod
return port, nil
}

func (s *Service) listenInstanceUpdate(ctx context.Context, instanceID string) {
for {
uptChan, err := backoff.RetryWithData(
func() (<-chan *gitpod.WorkspaceInstance, error) {
return s.gitpodService.InstanceUpdates(ctx, instanceID)
},
backoff.NewExponentialBackOff(),
)
if err != nil {
log.WithError(err).Error("failed to get workspace instance chan several retries")
continue
// InstanceUpdates implements protocol.APIInterface
func (s *Service) InstanceUpdates(ctx context.Context, instanceID string, workspaceID string) (<-chan *gitpod.WorkspaceInstance, error) {
if s == nil {
return nil, errNotConnected
}

updateChan := make(chan *gitpod.WorkspaceInstance)
errChan := make(chan error)
processUpdate := func(usePublicAPI bool) context.CancelFunc {
childCtx, cancel := context.WithCancel(ctx)
if usePublicAPI {
go s.publicAPIInstanceUpdate(childCtx, workspaceID, updateChan, errChan)
} else {
go s.serverInstanceUpdate(childCtx, instanceID, updateChan, errChan)
}
return cancel
}
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the method signature, there's no way for the caller to stop listening. This means it will listen forever (as long as the connection holds). Is that actually desirable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think this is expected, cc @mustard-mh @iQQBot if you have any input on this

cancel := processUpdate(s.usePublicAPI(ctx))
defer func() {
cancel()
close(updateChan)
}()
for {
select {
case <-ctx.Done():
return
case instance := <-uptChan:
s.lastServerInstance = instance
case usePublicAPI := <-s.onUsingPublicAPI:
cancel()
cancel = processUpdate(usePublicAPI)
case err := <-errChan:
if errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) {
continue
}
log.WithField("method", "InstanceUpdates").WithError(err).Error("failed to listen")
cancel()
time.Sleep(time.Second * 2)
cancel = processUpdate(s.usePublicAPI(ctx))
}
}
}
}()

return updateChan, nil
}

func (s *Service) getWorkspaceInfo(ctx context.Context, instanceID, workspaceID string) (*gitpod.WorkspaceInstance, error) {
getData := func() (*gitpod.WorkspaceInstance, error) {
if !s.usePublicAPI(ctx) {
return s.lastServerInstance, nil
}
func (s *Service) publicAPIInstanceUpdate(ctx context.Context, workspaceID string, updateChan chan *gitpod.WorkspaceInstance, errChan chan error) {
resp, err := backoff.RetryWithData(func() (v1.WorkspacesService_StreamWorkspaceStatusClient, error) {
service := v1.NewWorkspacesServiceClient(s.publicAPIConn)
resp, err := service.GetWorkspace(ctx, &v1.GetWorkspaceRequest{
resp, err := service.StreamWorkspaceStatus(ctx, &v1.StreamWorkspaceStatusRequest{
WorkspaceId: workspaceID,
})
if err != nil {
log.WithField("method", "GetWorkspace").WithError(err).Error("failed to call PublicAPI")
return nil, err
log.WithError(err).Info("backoff failed to get workspace service client of PublicAPI, try again")
}
instance := &gitpod.WorkspaceInstance{
CreationTime: resp.Result.Status.Instance.CreatedAt.String(),
ID: resp.Result.Status.Instance.InstanceId,
Status: &gitpod.WorkspaceInstanceStatus{
ExposedPorts: []*gitpod.WorkspaceInstancePort{},
Message: resp.Result.Status.Instance.Status.Message,
// OwnerToken: "", not used so ignore
Phase: resp.Result.Status.Instance.Status.Phase.String(),
Timeout: resp.Result.Status.Instance.Status.Conditions.Timeout,
Version: int(resp.Result.Status.Instance.Status.StatusVersion),
},
WorkspaceID: resp.Result.WorkspaceId,
}
for _, port := range resp.Result.Status.Instance.Status.Ports {
info := &gitpod.WorkspaceInstancePort{
Port: float64(port.Port),
URL: port.Url,
return resp, err
}, connBackoff)
if err != nil {
log.WithField("method", "StreamWorkspaceStatus").WithError(err).Error("failed to call PublicAPI")
errChan <- err
return
}
log.WithField("method", "StreamWorkspaceStatus").Info("start to listen on publicAPI instanceUpdates")
for {
resp, err := resp.Recv()
if err != nil {
if err != io.EOF {
log.WithField("method", "StreamWorkspaceStatus").WithError(err).Error("failed to receive status update")
}
if port.Policy == v1.PortPolicy_PORT_POLICY_PUBLIC {
info.Visibility = gitpod.PortVisibilityPublic
} else {
info.Visibility = gitpod.PortVisibilityPrivate
if ctx.Err() != nil {
return
}
instance.Status.ExposedPorts = append(instance.Status.ExposedPorts, info)
errChan <- err
return
}
return instance, nil
updateChan <- workspaceStatusToWorkspaceInstance(resp.Result)
}
exp := &backoff.ExponentialBackOff{
InitialInterval: 2 * time.Second,
RandomizationFactor: 0.5,
Multiplier: 1.5,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 0,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
return backoff.RetryWithData(getData, exp)
}

// InstanceUpdates implements protocol.APIInterface
func (s *Service) InstanceUpdates(ctx context.Context, instanceID string, workspaceID string) (<-chan *gitpod.WorkspaceInstance, error) {
if s == nil {
return nil, errNotConnected
func (s *Service) serverInstanceUpdate(ctx context.Context, instanceID string, updateChan chan *gitpod.WorkspaceInstance, errChan chan error) {
ch, err := backoff.RetryWithData(func() (<-chan *gitpod.WorkspaceInstance, error) {
ch, err := s.gitpodService.InstanceUpdates(ctx, instanceID)
if err != nil {
log.WithError(err).Info("backoff failed to listen to serverAPI instanceUpdates, try again")
}
return ch, err
}, connBackoff)
if err != nil {
log.WithField("method", "InstanceUpdates").WithError(err).Error("failed to call serverAPI")
errChan <- err
return
}
if !s.usePublicAPI(ctx) && s.persistServerAPIChannelWhenStart(ctx) {
return s.gitpodService.InstanceUpdates(ctx, instanceID)
log.WithField("method", "InstanceUpdates").WithField("instanceID", instanceID).Info("start to listen on serverAPI instanceUpdates")
for update := range ch {
updateChan <- update
}
updateChan := make(chan *gitpod.WorkspaceInstance)
var latestInstance *gitpod.WorkspaceInstance
go func() {
for {
if ctx.Err() != nil {
close(updateChan)
break
}
if instance, err := s.getWorkspaceInfo(ctx, instanceID, workspaceID); err == nil {
if reflect.DeepEqual(latestInstance, instance) {
continue
}
latestInstance = instance
updateChan <- instance
}
time.Sleep(1 * time.Second)
}
}()
return updateChan, nil
if ctx.Err() != nil {
return
}
errChan <- io.EOF
}

var connBackoff = &backoff.ExponentialBackOff{
InitialInterval: 2 * time.Second,
RandomizationFactor: 0.5,
Multiplier: 1.5,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 0,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}

// GetOwnerID implements APIInterface
Expand Down Expand Up @@ -375,3 +391,32 @@ func (s *Service) RegisterMetrics(registry *prometheus.Registry) error {
}
return registry.Register(s.publicApiMetrics)
}

func workspaceStatusToWorkspaceInstance(status *v1.WorkspaceStatus) *gitpod.WorkspaceInstance {
instance := &gitpod.WorkspaceInstance{
CreationTime: status.Instance.CreatedAt.String(),
ID: status.Instance.InstanceId,
Status: &gitpod.WorkspaceInstanceStatus{
ExposedPorts: []*gitpod.WorkspaceInstancePort{},
Message: status.Instance.Status.Message,
// OwnerToken: "", not used so ignore
Phase: status.Instance.Status.Phase.String(),
Timeout: status.Instance.Status.Conditions.Timeout,
Version: int(status.Instance.Status.StatusVersion),
},
WorkspaceID: status.Instance.WorkspaceId,
}
for _, port := range status.Instance.Status.Ports {
info := &gitpod.WorkspaceInstancePort{
Port: float64(port.Port),
URL: port.Url,
}
if port.Policy == v1.PortPolicy_PORT_POLICY_PUBLIC {
info.Visibility = gitpod.PortVisibilityPublic
} else {
info.Visibility = gitpod.PortVisibilityPrivate
}
Comment on lines +414 to +418
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PortPolicy is one of 3 values:

	PortPolicy_PORT_POLICY_UNSPECIFIED PortPolicy = 0
	// Private means the port is accessible by the workspace owner only using the workspace port URL
	PortPolicy_PORT_POLICY_PRIVATE PortPolicy = 1
	// Public means the port is accessible by everybody using the workspace port URL
	PortPolicy_PORT_POLICY_PUBLIC PortPolicy = 2

Use a switch here and handle them all. This ensures that you actually detect cases where the port policy is not set (UNSPECIFIED)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same as here, it can only be public or private and if for some reason is unspecified it should convert to private either way 🤔

if port.Policy == v1.PortPolicy_PORT_POLICY_PUBLIC {
info.Visibility = gitpod.PortVisibilityPublic
} else {
info.Visibility = gitpod.PortVisibilityPrivate
}

instance.Status.ExposedPorts = append(instance.Status.ExposedPorts, info)
}
return instance
}