diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index 503c2d43b..5f1aec219 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -30,7 +30,7 @@ import ( "github.com/openshift/cluster-version-operator/pkg/payload" ) -func setupCVOTest() (*Operator, map[string]runtime.Object, *fake.Clientset, *dynamicfake.FakeDynamicClient, func()) { +func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fake.Clientset, *dynamicfake.FakeDynamicClient, func()) { client := &fake.Clientset{} client.AddReactor("*", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { return false, nil, fmt.Errorf("unexpected client action: %#v", action) @@ -81,7 +81,7 @@ func setupCVOTest() (*Operator, map[string]runtime.Object, *fake.Clientset, *dyn dynamicClient := dynamicfake.NewSimpleDynamicClient(dynamicScheme) worker := NewSyncWorker( - &fakeDirectoryRetriever{Path: "testdata/payloadtest"}, + &fakeDirectoryRetriever{Path: payloadDir}, &testResourceBuilder{client: dynamicClient}, time.Second/2, wait.Backoff{ @@ -94,7 +94,7 @@ func setupCVOTest() (*Operator, map[string]runtime.Object, *fake.Clientset, *dyn } func TestCVO_StartupAndSync(t *testing.T) { - o, cvs, client, _, shutdownFn := setupCVOTest() + o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest") ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -214,31 +214,35 @@ func TestCVO_StartupAndSync(t *testing.T) { Actual: configv1.Update{Version: "4.0.1", Image: "image/image:1"}, }, SyncWorkerStatus{ - Step: "ApplyResources", - Initial: true, - VersionHash: "6GC9TkkG9PA=", - Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + Step: "ApplyResources", + Initial: true, + VersionHash: "6GC9TkkG9PA=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: time.Unix(1, 0), }, SyncWorkerStatus{ - Fraction: float32(1) / 3, - Step: "ApplyResources", - Initial: true, - VersionHash: "6GC9TkkG9PA=", - Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + Fraction: float32(1) / 3, + Step: "ApplyResources", + Initial: true, + VersionHash: "6GC9TkkG9PA=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: time.Unix(2, 0), }, SyncWorkerStatus{ - Fraction: float32(2) / 3, - Initial: true, - Step: "ApplyResources", - VersionHash: "6GC9TkkG9PA=", - Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + Fraction: float32(2) / 3, + Initial: true, + Step: "ApplyResources", + VersionHash: "6GC9TkkG9PA=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: time.Unix(3, 0), }, SyncWorkerStatus{ - Reconciling: true, - Completed: 1, - Fraction: 1, - VersionHash: "6GC9TkkG9PA=", - Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + Reconciling: true, + Completed: 1, + Fraction: 1, + VersionHash: "6GC9TkkG9PA=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: time.Unix(4, 0), }, ) @@ -306,11 +310,12 @@ func TestCVO_StartupAndSync(t *testing.T) { Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, }, SyncWorkerStatus{ - Reconciling: true, - Completed: 2, - Fraction: 1, - VersionHash: "6GC9TkkG9PA=", - Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + Reconciling: true, + Completed: 2, + Fraction: 1, + VersionHash: "6GC9TkkG9PA=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: time.Unix(1, 0), }, ) @@ -329,7 +334,7 @@ func TestCVO_StartupAndSync(t *testing.T) { } func TestCVO_RestartAndReconcile(t *testing.T) { - o, cvs, client, _, shutdownFn := setupCVOTest() + o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest") ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -412,25 +417,28 @@ func TestCVO_RestartAndReconcile(t *testing.T) { Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, }, SyncWorkerStatus{ - Reconciling: true, - Fraction: float32(1) / 3, - Step: "ApplyResources", - VersionHash: "6GC9TkkG9PA=", - Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + Reconciling: true, + Fraction: float32(1) / 3, + Step: "ApplyResources", + VersionHash: "6GC9TkkG9PA=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: time.Unix(1, 0), }, SyncWorkerStatus{ - Reconciling: true, - Fraction: float32(2) / 3, - Step: "ApplyResources", - VersionHash: "6GC9TkkG9PA=", - Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + Reconciling: true, + Fraction: float32(2) / 3, + Step: "ApplyResources", + VersionHash: "6GC9TkkG9PA=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: time.Unix(2, 0), }, SyncWorkerStatus{ - Reconciling: true, - Completed: 1, - Fraction: 1, - VersionHash: "6GC9TkkG9PA=", - Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + Reconciling: true, + Completed: 1, + Fraction: 1, + VersionHash: "6GC9TkkG9PA=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: time.Unix(3, 0), }, ) client.ClearActions() @@ -470,11 +478,12 @@ func TestCVO_RestartAndReconcile(t *testing.T) { Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, }, SyncWorkerStatus{ - Reconciling: true, - Completed: 2, - Fraction: 1, - VersionHash: "6GC9TkkG9PA=", - Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + Reconciling: true, + Completed: 2, + Fraction: 1, + VersionHash: "6GC9TkkG9PA=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: time.Unix(1, 0), }, ) client.ClearActions() @@ -490,7 +499,7 @@ func TestCVO_RestartAndReconcile(t *testing.T) { } func TestCVO_ErrorDuringReconcile(t *testing.T) { - o, cvs, client, _, shutdownFn := setupCVOTest() + o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest") ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -594,11 +603,12 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { // verify we observe the remaining changes in the first sync verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ - Reconciling: true, - Fraction: float32(1) / 3, - Step: "ApplyResources", - VersionHash: "6GC9TkkG9PA=", - Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + Reconciling: true, + Fraction: float32(1) / 3, + Step: "ApplyResources", + VersionHash: "6GC9TkkG9PA=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: time.Unix(1, 0), }, ) verifyAllStatus(t, worker.StatusCh()) @@ -610,11 +620,12 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { // verify we observe the remaining changes in the first sync verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ - Reconciling: true, - Fraction: float32(2) / 3, - Step: "ApplyResources", - VersionHash: "6GC9TkkG9PA=", - Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + Reconciling: true, + Fraction: float32(2) / 3, + Step: "ApplyResources", + VersionHash: "6GC9TkkG9PA=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: time.Unix(1, 0), }, ) verifyAllStatus(t, worker.StatusCh()) @@ -645,8 +656,10 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { Nested: fmt.Errorf("unable to proceed"), Reason: "UpdatePayloadFailed", Message: "Could not update test \"file-yml\" (3 of 3)", + Task: &payload.Task{Index: 3, Total: 3, Manifest: &worker.payload.Manifests[2]}, }, - Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: time.Unix(1, 0), }, ) client.ClearActions() @@ -686,8 +699,174 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { }) } +func TestCVO_ParallelError(t *testing.T) { + o, cvs, client, _, shutdownFn := setupCVOTest("testdata/paralleltest") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + defer shutdownFn() + worker := o.configSync.(*SyncWorker) + b := &errorResourceBuilder{errors: map[string]error{ + "0000_10_a_file.yaml": &payload.UpdateError{ + Reason: "ClusterOperatorNotAvailable", + Name: "operator-1", + }, + "0000_20_a_file.yaml": nil, + "0000_20_b_file.yaml": &payload.UpdateError{ + Reason: "ClusterOperatorNotAvailable", + Name: "operator-2", + }, + }} + worker.builder = b + + // Setup: an initializing cluster version which will run in parallel + // + o.releaseImage = "image/image:1" + o.releaseVersion = "1.0.0-abc" + desired := configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"} + uid, _ := uuid.NewRandom() + clusterUID := configv1.ClusterID(uid.String()) + cvs["version"] = &configv1.ClusterVersion{ + ObjectMeta: metav1.ObjectMeta{ + Name: "version", + ResourceVersion: "1", + }, + Spec: configv1.ClusterVersionSpec{ + ClusterID: clusterUID, + Channel: "fast", + }, + Status: configv1.ClusterVersionStatus{ + // Prefers the image version over the operator's version (although in general they will remain in sync) + Desired: desired, + History: []configv1.UpdateHistory{}, + Conditions: []configv1.ClusterOperatorStatusCondition{}, + }, + } + + // Step 1: Write initial status + // + client.ClearActions() + err := o.sync(o.queueKey()) + if err != nil { + t.Fatal(err) + } + actions := client.Actions() + if len(actions) != 2 { + t.Fatalf("%s", spew.Sdump(actions)) + } + expectGet(t, actions[0], "clusterversions", "", "version") + + // check the worker status is initially set to reconciling + if status := worker.Status(); status.Reconciling || status.Completed != 0 { + t.Fatalf("The worker should be reconciling from the beginning: %#v", status) + } + if worker.work.State != payload.InitializingPayload { + t.Fatalf("The worker should be reconciling: %v", worker.work) + } + + // Step 2: Start the sync worker and verify the sequence of events + // + cancellable, cancel := context.WithCancel(ctx) + defer cancel() + go worker.Start(cancellable, 1) + // + verifyAllStatus(t, worker.StatusCh(), + SyncWorkerStatus{ + Initial: true, + Step: "RetrievePayload", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + }, + SyncWorkerStatus{ + Initial: true, + Step: "ApplyResources", + VersionHash: "7m-gGRrpkDU=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + }, + ) + + // Step 3: Cancel after we've accumulated 2/3 errors + // + time.Sleep(100 * time.Millisecond) + cancel() + // + // verify we observe the remaining changes in the first sync + for status := range worker.StatusCh() { + if status.Failure == nil { + if status.Fraction == 0 || status.Fraction == 1/3 { + if !reflect.DeepEqual(status, SyncWorkerStatus{ + Initial: true, + Fraction: status.Fraction, + Step: "ApplyResources", + VersionHash: "7m-gGRrpkDU=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + }) { + t.Fatalf("unexpected status: %v", status) + } + } + continue + } + err := status.Failure + uErr, ok := err.(*payload.UpdateError) + if !ok || uErr.Reason != "ClusterOperatorsNotAvailable" || uErr.Message != "Some cluster operators are still updating: operator-1, operator-2" { + t.Fatalf("unexpected error: %v", err) + } + if status.LastProgress.IsZero() { + t.Fatalf("unexpected last progress: %v", status.LastProgress) + } + if !reflect.DeepEqual(status, SyncWorkerStatus{ + Initial: true, + Failure: err, + Fraction: float32(1) / 3, + Step: "ApplyResources", + VersionHash: "7m-gGRrpkDU=", + Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: status.LastProgress, + }) { + t.Fatalf("unexpected final: %v", status) + } + break + } + verifyAllStatus(t, worker.StatusCh()) + + client.ClearActions() + err = o.sync(o.queueKey()) + if err != nil { + t.Fatal(err) + } + actions = client.Actions() + if len(actions) != 2 { + t.Fatalf("%s", spew.Sdump(actions)) + } + expectGet(t, actions[0], "clusterversions", "", "version") + expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + ObjectMeta: metav1.ObjectMeta{ + Name: "version", + ResourceVersion: "1", + }, + Spec: configv1.ClusterVersionSpec{ + ClusterID: clusterUID, + Channel: "fast", + }, + Status: configv1.ClusterVersionStatus{ + // Prefers the image version over the operator's version (although in general they will remain in sync) + Desired: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, + VersionHash: "7m-gGRrpkDU=", + History: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Image: "image/image:1", Version: "1.0.0-abc", StartedTime: defaultStartedTime}, + }, + Conditions: []configv1.ClusterOperatorStatusCondition{ + {Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse}, + {Type: configv1.OperatorFailing, Status: configv1.ConditionFalse}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Reason: "ClusterOperatorsNotAvailable", Message: "Working towards 1.0.0-abc: 33% complete, waiting on operator-1, operator-2"}, + {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + }, + }, + }) +} + func TestCVO_VerifyInitializingPayloadState(t *testing.T) { - o, cvs, client, _, shutdownFn := setupCVOTest() + o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest") stopCh := make(chan struct{}) defer close(stopCh) defer shutdownFn() @@ -745,7 +924,7 @@ func TestCVO_VerifyInitializingPayloadState(t *testing.T) { } func TestCVO_VerifyUpdatingPayloadState(t *testing.T) { - o, cvs, client, _, shutdownFn := setupCVOTest() + o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest") stopCh := make(chan struct{}) defer close(stopCh) defer shutdownFn() @@ -811,11 +990,21 @@ func verifyAllStatus(t *testing.T, ch <-chan SyncWorkerStatus, items ...SyncWork } return } + var lastTime time.Time + count := int64(1) for i, expect := range items { actual, ok := <-ch if !ok { t.Fatalf("channel closed after reading only %d items", i) } + + if nextTime := actual.LastProgress; !nextTime.Equal(lastTime) { + actual.LastProgress = time.Unix(count, 0) + count++ + } else if !lastTime.IsZero() { + actual.LastProgress = time.Unix(count, 0) + } + if !reflect.DeepEqual(expect, actual) { t.Fatalf("unexpected status item %d: %s", i, diff.ObjectReflectDiff(expect, actual)) } @@ -854,3 +1043,14 @@ func (b *blockingResourceBuilder) Send(err error) { func (b *blockingResourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error { return <-b.ch } + +type errorResourceBuilder struct { + errors map[string]error +} + +func (b *errorResourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error { + if err, ok := b.errors[m.OriginalFilename]; ok { + return err + } + return fmt.Errorf("unknown file %s", m.OriginalFilename) +} diff --git a/pkg/cvo/status.go b/pkg/cvo/status.go index f58b89572..860a32695 100644 --- a/pkg/cvo/status.go +++ b/pkg/cvo/status.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "time" "github.com/golang/glog" @@ -156,6 +157,8 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat now := metav1.Now() version := versionString(status.Actual) + mergeOperatorHistory(config, status.Actual, now, status.Completed > 0) + // update validation errors var reason string if len(validationErrs) > 0 { @@ -200,7 +203,9 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat }) } - if err := status.Failure; err != nil { + progressReason, progressShortMessage, skipFailure := convertErrorToProgressing(config.Status.History, now.Time, status) + + if err := status.Failure; err != nil && !skipFailure { var reason string msg := "an error occurred" if uErr, ok := err.(*payload.UpdateError); ok { @@ -258,6 +263,9 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat switch { case len(validationErrs) > 0: message = fmt.Sprintf("Reconciling %s: the cluster version is invalid", version) + case status.Fraction > 0 && skipFailure: + reason = progressReason + message = fmt.Sprintf("Working towards %s: %.0f%% complete, %s", version, status.Fraction*100, progressShortMessage) case status.Fraction > 0: message = fmt.Sprintf("Working towards %s: %.0f%% complete", version, status.Fraction*100) case status.Step == "RetrievePayload": @@ -265,6 +273,9 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat reason = "DownloadingUpdate" } message = fmt.Sprintf("Working towards %s: downloading update", version) + case skipFailure: + reason = progressReason + message = fmt.Sprintf("Working towards %s: %s", version, progressShortMessage) default: message = fmt.Sprintf("Working towards %s", version) } @@ -287,8 +298,6 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat }) } - mergeOperatorHistory(config, status.Actual, now, status.Completed > 0) - if glog.V(6) { glog.Infof("Apply config: %s", diff.ObjectReflectDiff(original, config)) } @@ -297,6 +306,27 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat return err } +// convertErrorToProgressing returns true if the provided status indicates a failure condition can be interpreted as +// still making internal progress. The general error we try to suppress is an operator or operators still being +// unavailable AND the general payload task making progress towards its goal. An operator is given 10 minutes since +// its last update to go ready, or an hour has elapsed since the update began, before the condition is ignored. +func convertErrorToProgressing(history []configv1.UpdateHistory, now time.Time, status *SyncWorkerStatus) (reason string, message string, ok bool) { + if len(history) == 0 || status.Failure == nil || status.Reconciling || status.LastProgress.IsZero() { + return "", "", false + } + if now.Sub(status.LastProgress) > 10*time.Minute || now.Sub(history[0].StartedTime.Time) > time.Hour { + return "", "", false + } + uErr, ok := status.Failure.(*payload.UpdateError) + if !ok { + return "", "", false + } + if uErr.Reason == "ClusterOperatorNotAvailable" || uErr.Reason == "ClusterOperatorsNotAvailable" { + return uErr.Reason, fmt.Sprintf("waiting on %s", uErr.Name), true + } + return "", "", false +} + // syncDegradedStatus handles generic errors in the cluster version. It tries to preserve // all status fields that it can by using the provided config or loading the latest version // from the cache (instead of clearing the status). diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 639c3d1d8..0f8f0a7a6 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "reflect" + "sort" + "strings" "sync" "time" @@ -12,6 +14,7 @@ import ( "golang.org/x/time/rate" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -67,6 +70,8 @@ type SyncWorkerStatus struct { Initial bool VersionHash string + LastProgress time.Time + Actual configv1.Update } @@ -233,9 +238,10 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int) { var syncTimeout time.Duration switch work.State { case payload.InitializingPayload: - // during initialization we expect things to fail due to ordering - // dependencies, so give it extra time - syncTimeout = w.minimumReconcileInterval * 5 + // during initialization we want to show what operators are being + // created, so time out syncs more often to show a snapshot of progress + // TODO: allow status outside of sync + syncTimeout = w.minimumReconcileInterval case payload.UpdatingPayload: // during updates we want to flag failures on any resources that - // for cluster operators that are not reporting failing the error @@ -304,6 +310,9 @@ func (w *statusWrapper) Report(status SyncWorkerStatus) { } } } + if status.Fraction > p.Fraction || status.Completed > p.Completed || (status.Failure == nil && status.Actual != p.Actual) { + status.LastProgress = time.Now() + } w.w.updateStatus(status) } @@ -471,7 +480,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w } // update each object - err := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error { + errs := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error { for _, task := range tasks { if contextIsCancelled(ctx) { return cr.CancelError() @@ -495,8 +504,8 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w } return nil }) - if err != nil { - cr.Error(err) + if len(errs) > 0 { + err := cr.Errors(errs) return err } @@ -518,6 +527,12 @@ func init() { ) } +type errCanceled struct { + err error +} + +func (e errCanceled) Error() string { return e.err.Error() } + // consistentReporter hides the details of calculating the status based on the progress // of the graph runner. type consistentReporter struct { @@ -553,14 +568,31 @@ func (r *consistentReporter) Error(err error) { copied := r.status copied.Step = "ApplyResources" copied.Fraction = float32(r.done) / float32(r.total) - copied.Failure = err + if !isCancelledError(err) { + copied.Failure = err + } + r.reporter.Report(copied) +} + +func (r *consistentReporter) Errors(errs []error) error { + err := summarizeTaskGraphErrors(errs) + + r.lock.Lock() + defer r.lock.Unlock() + copied := r.status + copied.Step = "ApplyResources" + copied.Fraction = float32(r.done) / float32(r.total) + if err != nil { + copied.Failure = err + } r.reporter.Report(copied) + return err } func (r *consistentReporter) CancelError() error { r.lock.Lock() defer r.lock.Unlock() - return fmt.Errorf("update was cancelled at %d/%d", r.done, r.total) + return errCanceled{fmt.Errorf("update was cancelled at %d/%d", r.done, r.total)} } func (r *consistentReporter) Complete() { @@ -576,6 +608,136 @@ func (r *consistentReporter) Complete() { r.reporter.Report(copied) } +func isCancelledError(err error) bool { + if err == nil { + return false + } + _, ok := err.(errCanceled) + return ok +} + +// summarizeTaskGraphErrors takes a set of errors returned by the execution of a graph and attempts +// to reduce them to a single cause or message. This is domain specific to the payload and our update +// algorithms. The return value is the summarized error which may be nil if provided conditions are +// not truly an error (cancellation). +// TODO: take into account install vs upgrade +func summarizeTaskGraphErrors(errs []error) error { + // we ignore cancellation errors since they don't provide good feedback to users and are an internal + // detail of the server + err := errors.FilterOut(errors.NewAggregate(errs), isCancelledError) + if err == nil { + glog.V(4).Infof("All errors were cancellation errors: %v", errs) + return nil + } + agg, ok := err.(errors.Aggregate) + if !ok { + errs = []error{err} + } else { + errs = agg.Errors() + } + + // log the errors to assist in debugging future summarization + if glog.V(4) { + glog.Infof("Summarizing %d errors", len(errs)) + for _, err := range errs { + if uErr, ok := err.(*payload.UpdateError); ok { + if uErr.Task != nil { + glog.Infof("Update error %d/%d: %s %s (%T: %v)", uErr.Task.Index, uErr.Task.Total, uErr.Reason, uErr.Message, uErr.Nested, uErr.Nested) + } else { + glog.Infof("Update error: %s %s (%T: %v)", uErr.Reason, uErr.Message, uErr.Nested, uErr.Nested) + } + } else { + glog.Infof("Update error: %T: %v", err, err) + } + } + } + + // collapse into a set of common errors where necessary + if len(errs) == 1 { + return errs[0] + } + if err := newClusterOperatorsNotAvailable(errs); err != nil { + return err + } + return newMultipleError(errs) +} + +// newClusterOperatorsNotAvailable unifies multiple ClusterOperatorNotAvailable errors into +// a single error. It returns nil if the provided errors are not of the same type. +func newClusterOperatorsNotAvailable(errs []error) error { + names := make([]string, 0, len(errs)) + for _, err := range errs { + uErr, ok := err.(*payload.UpdateError) + if !ok || uErr.Reason != "ClusterOperatorNotAvailable" { + return nil + } + if len(uErr.Name) > 0 { + names = append(names, uErr.Name) + } + } + if len(names) == 0 { + return nil + } + + nested := make([]error, 0, len(errs)) + for _, err := range errs { + nested = append(nested, err) + } + sort.Strings(names) + name := strings.Join(names, ", ") + return &payload.UpdateError{ + Nested: errors.NewAggregate(errs), + Reason: "ClusterOperatorsNotAvailable", + Message: fmt.Sprintf("Some cluster operators are still updating: %s", name), + Name: name, + } +} + +// uniqueStrings returns an array with all sequential identical items removed. It modifies the contents +// of arr. Sort the input array before calling to remove all duplicates. +func uniqueStrings(arr []string) []string { + var last int + for i := 1; i < len(arr); i++ { + if arr[i] == arr[last] { + continue + } + last++ + if last != i { + arr[last] = arr[i] + } + } + if last < len(arr) { + last++ + } + return arr[:last] +} + +// newMultipleError reports a generic set of errors that block progress. This method expects multiple +// errors but handles singular and empty arrays gracefully. If all errors have the same message, the +// first item is returned. +func newMultipleError(errs []error) error { + if len(errs) == 0 { + return nil + } + if len(errs) == 1 { + return errs[0] + } + messages := make([]string, 0, len(errs)) + for _, err := range errs { + messages = append(messages, err.Error()) + } + sort.Strings(messages) + messages = uniqueStrings(messages) + if len(messages) == 0 { + return errs[0] + } + return &payload.UpdateError{ + Nested: errors.NewAggregate(errs), + Reason: "MultipleErrors", + Message: fmt.Sprintf("Multiple errors are preventing progress:\n* %s", strings.Join(messages, "\n* ")), + } +} + // getOverrideForManifest returns the override and true when override exists for manifest. func getOverrideForManifest(overrides []configv1.ComponentOverride, manifest *lib.Manifest) (configv1.ComponentOverride, bool) { for idx, ov := range overrides { diff --git a/pkg/cvo/sync_worker_test.go b/pkg/cvo/sync_worker_test.go index 5e122ddc8..2e9ac1c3f 100644 --- a/pkg/cvo/sync_worker_test.go +++ b/pkg/cvo/sync_worker_test.go @@ -10,10 +10,11 @@ import ( func Test_statusWrapper_Report(t *testing.T) { tests := []struct { - name string - previous SyncWorkerStatus - next SyncWorkerStatus - want bool + name string + previous SyncWorkerStatus + next SyncWorkerStatus + want bool + wantProgress bool }{ { name: "skip updates that clear an error and are at an earlier fraction", @@ -22,9 +23,10 @@ func Test_statusWrapper_Report(t *testing.T) { want: false, }, { - previous: SyncWorkerStatus{Failure: fmt.Errorf("a"), Actual: configv1.Update{Image: "testing"}, Fraction: 0.1}, - next: SyncWorkerStatus{Actual: configv1.Update{Image: "testing2"}}, - want: true, + previous: SyncWorkerStatus{Failure: fmt.Errorf("a"), Actual: configv1.Update{Image: "testing"}, Fraction: 0.1}, + next: SyncWorkerStatus{Actual: configv1.Update{Image: "testing2"}}, + want: true, + wantProgress: true, }, { previous: SyncWorkerStatus{Failure: fmt.Errorf("a"), Actual: configv1.Update{Image: "testing"}}, @@ -42,9 +44,22 @@ func Test_statusWrapper_Report(t *testing.T) { want: true, }, { - previous: SyncWorkerStatus{Failure: fmt.Errorf("a"), Actual: configv1.Update{Image: "testing"}, Fraction: 0.1}, - next: SyncWorkerStatus{Failure: fmt.Errorf("b"), Actual: configv1.Update{Image: "testing"}, Fraction: 0.2}, - want: true, + previous: SyncWorkerStatus{Failure: fmt.Errorf("a"), Actual: configv1.Update{Image: "testing"}, Fraction: 0.1}, + next: SyncWorkerStatus{Failure: fmt.Errorf("b"), Actual: configv1.Update{Image: "testing"}, Fraction: 0.2}, + want: true, + wantProgress: true, + }, + { + previous: SyncWorkerStatus{Actual: configv1.Update{Image: "testing"}, Completed: 1}, + next: SyncWorkerStatus{Actual: configv1.Update{Image: "testing"}, Completed: 2}, + want: true, + wantProgress: true, + }, + { + previous: SyncWorkerStatus{Actual: configv1.Update{Image: "testing-1"}, Completed: 1}, + next: SyncWorkerStatus{Actual: configv1.Update{Image: "testing-2"}, Completed: 1}, + want: true, + wantProgress: true, }, { previous: SyncWorkerStatus{Actual: configv1.Update{Image: "testing"}}, @@ -52,8 +67,9 @@ func Test_statusWrapper_Report(t *testing.T) { want: true, }, { - next: SyncWorkerStatus{Actual: configv1.Update{Image: "testing"}}, - want: true, + next: SyncWorkerStatus{Actual: configv1.Update{Image: "testing"}}, + want: true, + wantProgress: true, }, } for _, tt := range tests { @@ -70,6 +86,10 @@ func Test_statusWrapper_Report(t *testing.T) { if !ok { t.Fatalf("no event") } + if tt.wantProgress != (!evt.LastProgress.IsZero()) { + t.Errorf("unexpected progress timestamp: %#v", evt) + } + evt.LastProgress = time.Time{} if evt != tt.next { t.Fatalf("unexpected: %#v", evt) } diff --git a/pkg/cvo/testdata/paralleltest/manifests/.gitkeep b/pkg/cvo/testdata/paralleltest/manifests/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/pkg/cvo/testdata/paralleltest/release-manifests/0000_10_a_file.yaml b/pkg/cvo/testdata/paralleltest/release-manifests/0000_10_a_file.yaml new file mode 100644 index 000000000..503fdbe0f --- /dev/null +++ b/pkg/cvo/testdata/paralleltest/release-manifests/0000_10_a_file.yaml @@ -0,0 +1,4 @@ +kind: Test +apiVersion: v1 +metadata: + name: 10-a-file \ No newline at end of file diff --git a/pkg/cvo/testdata/paralleltest/release-manifests/0000_20_a_file.yaml b/pkg/cvo/testdata/paralleltest/release-manifests/0000_20_a_file.yaml new file mode 100644 index 000000000..8c6135213 --- /dev/null +++ b/pkg/cvo/testdata/paralleltest/release-manifests/0000_20_a_file.yaml @@ -0,0 +1,4 @@ +kind: Test +apiVersion: v1 +metadata: + name: 20-a-file \ No newline at end of file diff --git a/pkg/cvo/testdata/paralleltest/release-manifests/0000_20_b_file.yaml b/pkg/cvo/testdata/paralleltest/release-manifests/0000_20_b_file.yaml new file mode 100644 index 000000000..59b6e9c91 --- /dev/null +++ b/pkg/cvo/testdata/paralleltest/release-manifests/0000_20_b_file.yaml @@ -0,0 +1,4 @@ +kind: Test +apiVersion: v1 +metadata: + name: 20-b-file \ No newline at end of file diff --git a/pkg/cvo/testdata/paralleltest/release-manifests/image-references b/pkg/cvo/testdata/paralleltest/release-manifests/image-references new file mode 100644 index 000000000..781a25f19 --- /dev/null +++ b/pkg/cvo/testdata/paralleltest/release-manifests/image-references @@ -0,0 +1,4 @@ +kind: ImageStream +apiVersion: image.openshift.io/v1 +metadata: + name: 1.0.0-abc \ No newline at end of file diff --git a/pkg/cvo/testdata/paralleltest/release-manifests/release-metadata b/pkg/cvo/testdata/paralleltest/release-manifests/release-metadata new file mode 100644 index 000000000..e69de29bb diff --git a/pkg/payload/task.go b/pkg/payload/task.go index ae38983c7..eb1d137ae 100644 --- a/pkg/payload/task.go +++ b/pkg/payload/task.go @@ -43,6 +43,15 @@ type Task struct { Backoff wait.Backoff } +func (st *Task) Copy() *Task { + return &Task{ + Index: st.Index, + Total: st.Total, + Manifest: st.Manifest, + Requeued: st.Requeued, + } +} + func (st *Task) String() string { ns := st.Manifest.Object().GetNamespace() if len(ns) == 0 { @@ -81,6 +90,7 @@ func (st *Task) Run(ctx context.Context, version string, builder ResourceBuilder continue case <-ctx.Done(): if uerr, ok := lastErr.(*UpdateError); ok { + uerr.Task = st.Copy() return uerr } reason, cause := reasonForPayloadSyncError(lastErr) @@ -91,6 +101,8 @@ func (st *Task) Run(ctx context.Context, version string, builder ResourceBuilder Nested: lastErr, Reason: reason, Message: fmt.Sprintf("Could not update %s%s", st, cause), + + Task: st.Copy(), } } } @@ -102,6 +114,8 @@ type UpdateError struct { Reason string Message string Name string + + Task *Task } func (e *UpdateError) Error() string { @@ -181,6 +195,8 @@ func SummaryForReason(reason, name string) string { return fmt.Sprintf("the cluster operator %s has not yet successfully rolled out", name) } return "a cluster operator has not yet rolled out" + case "ClusterOperatorsNotAvailable": + return "some cluster operators have not yet rolled out" } if strings.HasPrefix(reason, "UpdatePayload") { diff --git a/pkg/payload/task_graph.go b/pkg/payload/task_graph.go index eb39592c1..6449ffecf 100644 --- a/pkg/payload/task_graph.go +++ b/pkg/payload/task_graph.go @@ -403,7 +403,7 @@ type taskStatus struct { success bool } -func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func(ctx context.Context, tasks []*Task) error) error { +func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func(ctx context.Context, tasks []*Task) error) []error { nestedCtx, cancelFn := context.WithCancel(ctx) defer cancelFn() @@ -565,7 +565,7 @@ func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func } glog.V(4).Infof("Result of work: %v", errs) if len(errs) > 0 { - return errs[0] + return errs } return nil } diff --git a/pkg/payload/task_graph_test.go b/pkg/payload/task_graph_test.go index 9ef698c42..04991b724 100644 --- a/pkg/payload/task_graph_test.go +++ b/pkg/payload/task_graph_test.go @@ -606,7 +606,7 @@ func TestRunGraph(t *testing.T) { order []string want []string invariants func(t *testing.T, got []string) - wantErr string + wantErrs []string }{ { nodes: []*TaskNode{ @@ -658,8 +658,8 @@ func TestRunGraph(t *testing.T) { } return nil }, - want: []string{"a", "b", "c"}, - wantErr: "error A", + want: []string{"a", "b", "c"}, + wantErrs: []string{"error A"}, invariants: func(t *testing.T, got []string) { for _, s := range got { if s == "e" { @@ -691,8 +691,8 @@ func TestRunGraph(t *testing.T) { } return nil }, - want: []string{"a", "b", "c"}, - wantErr: "cancelled", + want: []string{"a", "b", "c"}, + wantErrs: []string{"cancelled"}, invariants: func(t *testing.T, got []string) { for _, s := range got { if s == "e" { @@ -724,8 +724,8 @@ func TestRunGraph(t *testing.T) { } return nil }, - want: []string{"a", "b", "d1", "d2", "d3"}, - wantErr: "error -", + want: []string{"a", "b", "d1", "d2", "d3"}, + wantErrs: []string{"error - c1", "error - f"}, }, } for _, tt := range tests { @@ -736,7 +736,7 @@ func TestRunGraph(t *testing.T) { ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() var order safeSlice - err := RunGraph(ctx, g, tt.parallel, func(ctx context.Context, tasks []*Task) error { + errs := RunGraph(ctx, g, tt.parallel, func(ctx context.Context, tasks []*Task) error { for _, task := range tasks { time.Sleep(tt.sleep * time.Duration(rand.Intn(4))) if tt.errorOn != nil { @@ -764,14 +764,18 @@ func TestRunGraph(t *testing.T) { } } - if (err != nil) != (tt.wantErr != "") { - t.Fatalf("unexpected error: %v", err) + var messages []string + for _, err := range errs { + messages = append(messages, err.Error()) } - if err != nil { - if !strings.Contains(err.Error(), tt.wantErr) { - t.Fatalf("unexpected error: %v", err) + sort.Strings(messages) + if len(messages) != len(tt.wantErrs) { + t.Fatalf("unexpected error: %v", messages) + } + for i, want := range tt.wantErrs { + if !strings.Contains(errs[i].Error(), want) { + t.Errorf("error %d %q doesn't contain %q", i, errs[i].Error(), want) } - return } }) }