diff --git a/test/pkg/integration/integration.go b/test/pkg/integration/integration.go index 6369d6c9f5885e..534d8023e27ccf 100644 --- a/test/pkg/integration/integration.go +++ b/test/pkg/integration/integration.go @@ -20,9 +20,12 @@ import ( "runtime" "strconv" "strings" + "syscall" "time" "golang.org/x/xerrors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -39,6 +42,11 @@ import ( "github.com/gitpod-io/gitpod/test/pkg/integration/common" ) +const ( + connectFailureMaxTries = 5 + errorDialingBackendEOF = "error dialing backend: EOF" +) + type PodExec struct { RestConfig *rest.Config *kubernetes.Clientset @@ -57,7 +65,7 @@ func NewPodExec(config rest.Config, clientset *kubernetes.Clientset) *PodExec { func (p *PodExec) PodCopyFile(src string, dst string, containername string) (*bytes.Buffer, *bytes.Buffer, *bytes.Buffer, error) { var in, out, errOut *bytes.Buffer var ioStreams genericclioptions.IOStreams - for { + for count := 0; ; count++ { ioStreams, in, out, errOut = genericclioptions.NewTestIOStreams() copyOptions := kubectlcp.NewCopyOptions(ioStreams) copyOptions.Clientset = p.Clientset @@ -65,8 +73,8 @@ func (p *PodExec) PodCopyFile(src string, dst string, containername string) (*by copyOptions.Container = containername err := copyOptions.Run([]string{src, dst}) if err != nil { - if !errors.Is(err, io.EOF) { - return nil, nil, nil, fmt.Errorf("Could not run copy operation: %v", err) + if !shouldRetry(count, err) { + return nil, nil, nil, fmt.Errorf("could not run copy operation: %v", err) } time.Sleep(10 * time.Second) continue @@ -76,6 +84,13 @@ func (p *PodExec) PodCopyFile(src string, dst string, containername string) (*by return in, out, errOut, nil } +func shouldRetry(count int, err error) bool { + if count < connectFailureMaxTries { + return err.Error() == errorDialingBackendEOF + } + return false +} + func (p *PodExec) ExecCmd(command []string, podname string, namespace string, containername string) (*bytes.Buffer, *bytes.Buffer, *bytes.Buffer, error) { ioStreams, in, out, errOut := genericclioptions.NewTestIOStreams() execOptions := &kubectlexec.ExecOptions{ @@ -187,6 +202,43 @@ func Instrument(component ComponentType, agentName string, namespace string, kub return nil, closer, err } + var ( + res *rpc.Client + cl []func() error + ) + for i := 0; i < connectFailureMaxTries; i++ { + res, cl, err = portfw(podExec, kubeconfig, podName, namespace, containerName, tgtFN, options) + if err == nil { + closer = append(closer, cl...) + break + } + for _, c := range cl { + _ = c() + } + time.Sleep(5 * time.Second) + } + if err != nil { + return nil, closer, err + } + + closer = append(closer, func() error { + err := res.Call(MethodTestAgentShutdown, new(TestAgentShutdownRequest), new(TestAgentShutdownResponse)) + if err != nil && strings.Contains(err.Error(), "connection is shut down") { + return nil + } + + if err != nil { + return xerrors.Errorf("cannot shutdown agent: %w", err) + } + return nil + }) + + return res, closer, nil +} + +func portfw(podExec *PodExec, kubeconfig string, podName string, namespace string, containerName string, tgtFN string, options instrumentOptions) (*rpc.Client, []func() error, error) { + var closer []func() error + localAgentPort, err := getFreePort() if err != nil { return nil, closer, err @@ -198,22 +250,13 @@ func Instrument(component ComponentType, agentName string, namespace string, kub } execErrs := make(chan error, 1) - execF := func() { + go func() { defer close(execErrs) _, _, _, execErr := podExec.ExecCmd(cmd, podName, namespace, containerName) if execErr != nil { execErrs <- execErr } - } - go execF() - select { - case err := <-execErrs: - if err != nil { - return nil, closer, err - } - return nil, closer, fmt.Errorf("agent stopped unexepectedly") - case <-time.After(30 * time.Second): - } + }() ctx, cancel := context.WithCancel(context.Background()) defer func() { @@ -226,24 +269,32 @@ func Instrument(component ComponentType, agentName string, namespace string, kub cancel() } }() - - fwdReady, fwdErr := common.ForwardPortOfPod(ctx, kubeconfig, namespace, podName, strconv.Itoa(localAgentPort)) - select { - case <-fwdReady: - case err := <-execErrs: - if err != nil { - return nil, closer, err - } - case err := <-fwdErr: - if err != nil { - return nil, closer, err +L: + for { + fwdReady, fwdErr := common.ForwardPortOfPod(ctx, kubeconfig, namespace, podName, strconv.Itoa(localAgentPort)) + select { + case <-fwdReady: + break L + case err := <-execErrs: + if err != nil { + return nil, closer, err + } + case err := <-fwdErr: + var eno syscall.Errno + if errors.Is(err, io.EOF) || (errors.As(err, &eno) && eno == syscall.ECONNREFUSED) { + time.Sleep(10 * time.Second) + } else if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable { + time.Sleep(10 * time.Second) + } else if err != nil { + return nil, closer, err + } } } var res *rpc.Client var lastError error - waitErr := wait.PollImmediate(5*time.Second, 3*time.Minute, func() (bool, error) { - res, lastError = rpc.DialHTTP("tcp", fmt.Sprintf("localhost:%d", localAgentPort)) + waitErr := wait.PollImmediate(5*time.Second, 5*time.Minute, func() (bool, error) { + res, lastError = rpc.DialHTTP("tcp", net.JoinHostPort("localhost", strconv.Itoa(localAgentPort))) if lastError != nil { return false, nil } @@ -257,18 +308,6 @@ func Instrument(component ComponentType, agentName string, namespace string, kub return nil, closer, err } - closer = append(closer, func() error { - err := res.Call(MethodTestAgentShutdown, new(TestAgentShutdownRequest), new(TestAgentShutdownResponse)) - if err != nil && strings.Contains(err.Error(), "connection is shut down") { - return nil - } - - if err != nil { - return xerrors.Errorf("cannot shutdown agent: %w", err) - } - return nil - }) - return res, closer, nil } diff --git a/test/tests/components/content-service/content-service_test.go b/test/tests/components/content-service/content-service_test.go index c6c23cc2a5f1ec..eaca16fd936421 100644 --- a/test/tests/components/content-service/content-service_test.go +++ b/test/tests/components/content-service/content-service_test.go @@ -194,7 +194,7 @@ func TestUploadDownloadBlob(t *testing.T) { originalUrl := resp.Url updatedUrl, err := api.Storage(originalUrl) if err != nil { - t.Fatalf("error resolving blob upload target url") + t.Fatalf("error resolving blob upload target url: %q", err) } t.Logf("upload URL: %s", updatedUrl) @@ -207,7 +207,7 @@ func TestUploadDownloadBlob(t *testing.T) { originalUrl = resp2.Url updatedUrl, err = api.Storage(originalUrl) if err != nil { - t.Fatalf("error resolving blob download target url") + t.Fatalf("error resolving blob download target url: %q", err) } t.Logf("download URL: %s", updatedUrl) @@ -263,7 +263,7 @@ func TestUploadDownloadBlobViaServer(t *testing.T) { updatedUrl, err = api.Storage(originalUrl) if err != nil { - t.Fatalf("error resolving blob download target url") + t.Fatalf("error resolving blob download target url, %q", err) } t.Logf("download URL: %s", updatedUrl)