Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 262 additions & 62 deletions pkg/cvo/cvo_scenarios_test.go

Large diffs are not rendered by default.

36 changes: 33 additions & 3 deletions pkg/cvo/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"time"

"github.com/golang/glog"

Expand Down Expand Up @@ -156,6 +157,8 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat
now := metav1.Now()
version := versionString(status.Actual)

mergeOperatorHistory(config, status.Actual, now, status.Completed > 0)

// update validation errors
var reason string
if len(validationErrs) > 0 {
Expand Down Expand Up @@ -200,7 +203,9 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat
})
}

if err := status.Failure; err != nil {
progressReason, progressShortMessage, skipFailure := convertErrorToProgressing(config.Status.History, now.Time, status)

if err := status.Failure; err != nil && !skipFailure {
var reason string
msg := "an error occurred"
if uErr, ok := err.(*payload.UpdateError); ok {
Expand Down Expand Up @@ -258,13 +263,19 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat
switch {
case len(validationErrs) > 0:
message = fmt.Sprintf("Reconciling %s: the cluster version is invalid", version)
case status.Fraction > 0 && skipFailure:
reason = progressReason
message = fmt.Sprintf("Working towards %s: %.0f%% complete, %s", version, status.Fraction*100, progressShortMessage)
case status.Fraction > 0:
message = fmt.Sprintf("Working towards %s: %.0f%% complete", version, status.Fraction*100)
case status.Step == "RetrievePayload":
if len(reason) == 0 {
reason = "DownloadingUpdate"
}
message = fmt.Sprintf("Working towards %s: downloading update", version)
case skipFailure:
reason = progressReason
message = fmt.Sprintf("Working towards %s: %s", version, progressShortMessage)
default:
message = fmt.Sprintf("Working towards %s", version)
}
Expand All @@ -287,8 +298,6 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat
})
}

mergeOperatorHistory(config, status.Actual, now, status.Completed > 0)

if glog.V(6) {
glog.Infof("Apply config: %s", diff.ObjectReflectDiff(original, config))
}
Expand All @@ -297,6 +306,27 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat
return err
}

// convertErrorToProgressing returns true if the provided status indicates a failure condition can be interpreted as
// still making internal progress. The general error we try to suppress is an operator or operators still being
// unavailable AND the general payload task making progress towards its goal. An operator is given 10 minutes since
// its last update to go ready, or an hour has elapsed since the update began, before the condition is ignored.
func convertErrorToProgressing(history []configv1.UpdateHistory, now time.Time, status *SyncWorkerStatus) (reason string, message string, ok bool) {
if len(history) == 0 || status.Failure == nil || status.Reconciling || status.LastProgress.IsZero() {
return "", "", false
}
if now.Sub(status.LastProgress) > 10*time.Minute || now.Sub(history[0].StartedTime.Time) > time.Hour {
return "", "", false
}
uErr, ok := status.Failure.(*payload.UpdateError)
if !ok {
return "", "", false
}
if uErr.Reason == "ClusterOperatorNotAvailable" || uErr.Reason == "ClusterOperatorsNotAvailable" {
return uErr.Reason, fmt.Sprintf("waiting on %s", uErr.Name), true
}
return "", "", false
}

// syncDegradedStatus handles generic errors in the cluster version. It tries to preserve
// all status fields that it can by using the provided config or loading the latest version
// from the cache (instead of clearing the status).
Expand Down
178 changes: 170 additions & 8 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"reflect"
"sort"
"strings"
"sync"
"time"

Expand All @@ -12,6 +14,7 @@ import (
"golang.org/x/time/rate"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"

Expand Down Expand Up @@ -67,6 +70,8 @@ type SyncWorkerStatus struct {
Initial bool
VersionHash string

LastProgress time.Time

Actual configv1.Update
}

Expand Down Expand Up @@ -233,9 +238,10 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int) {
var syncTimeout time.Duration
switch work.State {
case payload.InitializingPayload:
// during initialization we expect things to fail due to ordering
// dependencies, so give it extra time
syncTimeout = w.minimumReconcileInterval * 5
// during initialization we want to show what operators are being
// created, so time out syncs more often to show a snapshot of progress
// TODO: allow status outside of sync
syncTimeout = w.minimumReconcileInterval
case payload.UpdatingPayload:
// during updates we want to flag failures on any resources that -
// for cluster operators that are not reporting failing the error
Expand Down Expand Up @@ -304,6 +310,9 @@ func (w *statusWrapper) Report(status SyncWorkerStatus) {
}
}
}
if status.Fraction > p.Fraction || status.Completed > p.Completed || (status.Failure == nil && status.Actual != p.Actual) {
status.LastProgress = time.Now()
}
w.w.updateStatus(status)
}

Expand Down Expand Up @@ -471,7 +480,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
}

// update each object
err := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error {
errs := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error {
for _, task := range tasks {
if contextIsCancelled(ctx) {
return cr.CancelError()
Expand All @@ -495,8 +504,8 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
}
return nil
})
if err != nil {
cr.Error(err)
if len(errs) > 0 {
err := cr.Errors(errs)
return err
}

Expand All @@ -518,6 +527,12 @@ func init() {
)
}

type errCanceled struct {
err error
}

func (e errCanceled) Error() string { return e.err.Error() }

// consistentReporter hides the details of calculating the status based on the progress
// of the graph runner.
type consistentReporter struct {
Expand Down Expand Up @@ -553,14 +568,31 @@ func (r *consistentReporter) Error(err error) {
copied := r.status
copied.Step = "ApplyResources"
copied.Fraction = float32(r.done) / float32(r.total)
copied.Failure = err
if !isCancelledError(err) {
copied.Failure = err
}
r.reporter.Report(copied)
}

func (r *consistentReporter) Errors(errs []error) error {
err := summarizeTaskGraphErrors(errs)

r.lock.Lock()
defer r.lock.Unlock()
copied := r.status
copied.Step = "ApplyResources"
copied.Fraction = float32(r.done) / float32(r.total)
if err != nil {
copied.Failure = err
}
r.reporter.Report(copied)
return err
}

func (r *consistentReporter) CancelError() error {
r.lock.Lock()
defer r.lock.Unlock()
return fmt.Errorf("update was cancelled at %d/%d", r.done, r.total)
return errCanceled{fmt.Errorf("update was cancelled at %d/%d", r.done, r.total)}
}

func (r *consistentReporter) Complete() {
Expand All @@ -576,6 +608,136 @@ func (r *consistentReporter) Complete() {
r.reporter.Report(copied)
}

func isCancelledError(err error) bool {
if err == nil {
return false
}
_, ok := err.(errCanceled)
return ok
}

// summarizeTaskGraphErrors takes a set of errors returned by the execution of a graph and attempts
// to reduce them to a single cause or message. This is domain specific to the payload and our update
// algorithms. The return value is the summarized error which may be nil if provided conditions are
// not truly an error (cancellation).
// TODO: take into account install vs upgrade
func summarizeTaskGraphErrors(errs []error) error {
// we ignore cancellation errors since they don't provide good feedback to users and are an internal
// detail of the server
err := errors.FilterOut(errors.NewAggregate(errs), isCancelledError)
if err == nil {
glog.V(4).Infof("All errors were cancellation errors: %v", errs)
return nil
}
agg, ok := err.(errors.Aggregate)
if !ok {
errs = []error{err}
} else {
errs = agg.Errors()
}

// log the errors to assist in debugging future summarization
if glog.V(4) {
glog.Infof("Summarizing %d errors", len(errs))
for _, err := range errs {
if uErr, ok := err.(*payload.UpdateError); ok {
if uErr.Task != nil {
glog.Infof("Update error %d/%d: %s %s (%T: %v)", uErr.Task.Index, uErr.Task.Total, uErr.Reason, uErr.Message, uErr.Nested, uErr.Nested)
} else {
glog.Infof("Update error: %s %s (%T: %v)", uErr.Reason, uErr.Message, uErr.Nested, uErr.Nested)
}
} else {
glog.Infof("Update error: %T: %v", err, err)
}
}
}

// collapse into a set of common errors where necessary
if len(errs) == 1 {
return errs[0]
}
if err := newClusterOperatorsNotAvailable(errs); err != nil {
return err
}
return newMultipleError(errs)
}

// newClusterOperatorsNotAvailable unifies multiple ClusterOperatorNotAvailable errors into
// a single error. It returns nil if the provided errors are not of the same type.
func newClusterOperatorsNotAvailable(errs []error) error {
names := make([]string, 0, len(errs))
for _, err := range errs {
uErr, ok := err.(*payload.UpdateError)
if !ok || uErr.Reason != "ClusterOperatorNotAvailable" {
return nil
}
if len(uErr.Name) > 0 {
names = append(names, uErr.Name)
}
}
if len(names) == 0 {
return nil
}

nested := make([]error, 0, len(errs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ques: why create new nested error list?

for _, err := range errs {
nested = append(nested, err)
}
sort.Strings(names)
name := strings.Join(names, ", ")
return &payload.UpdateError{
Nested: errors.NewAggregate(errs),
Reason: "ClusterOperatorsNotAvailable",
Message: fmt.Sprintf("Some cluster operators are still updating: %s", name),
Name: name,
}
}

// uniqueStrings returns an array with all sequential identical items removed. It modifies the contents
// of arr. Sort the input array before calling to remove all duplicates.
func uniqueStrings(arr []string) []string {
var last int
for i := 1; i < len(arr); i++ {
if arr[i] == arr[last] {
continue
}
last++
if last != i {
arr[last] = arr[i]
}
}
if last < len(arr) {
last++
}
return arr[:last]
}

// newMultipleError reports a generic set of errors that block progress. This method expects multiple
// errors but handles singular and empty arrays gracefully. If all errors have the same message, the
// first item is returned.
func newMultipleError(errs []error) error {
if len(errs) == 0 {
return nil
}
if len(errs) == 1 {
return errs[0]
}
messages := make([]string, 0, len(errs))
for _, err := range errs {
messages = append(messages, err.Error())
}
sort.Strings(messages)
messages = uniqueStrings(messages)
if len(messages) == 0 {
return errs[0]
}
return &payload.UpdateError{
Nested: errors.NewAggregate(errs),
Reason: "MultipleErrors",
Message: fmt.Sprintf("Multiple errors are preventing progress:\n* %s", strings.Join(messages, "\n* ")),
}
}

// getOverrideForManifest returns the override and true when override exists for manifest.
func getOverrideForManifest(overrides []configv1.ComponentOverride, manifest *lib.Manifest) (configv1.ComponentOverride, bool) {
for idx, ov := range overrides {
Expand Down
Loading