From 05de665cc03d1a5375c902153ec7c3049b73785c Mon Sep 17 00:00:00 2001 From: Nick Hale Date: Wed, 6 Oct 2021 05:56:09 -0400 Subject: [PATCH] WIP: refactor resolution sync Signed-off-by: Nick Hale --- pkg/controller/operators/catalog/operator.go | 235 +++++++++---------- pkg/lib/time/shared.go | 7 + 2 files changed, 114 insertions(+), 128 deletions(-) diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index a66712d878..8c80089f9b 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -888,22 +888,20 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { o.gcInstallPlans(logger, namespace) - // get the set of sources that should be used for resolution and best-effort get their connections working - logger.Debug("resolving sources") - - querier := NewNamespaceSourceQuerier(o.sources.AsClients(o.namespace, namespace)) - logger.Debug("checking if subscriptions need update") - subs, err := o.listSubscriptions(namespace) if err != nil { - logger.WithError(err).Debug("couldn't list subscriptions") return err } - // TODO: parallel - maxGeneration := 0 - subscriptionUpdated := false + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + var ( + maxGeneration int + skipResolution bool + errs []error + lastResolution *metav1.Time + ) for i, sub := range subs { logger := logger.WithFields(logrus.Fields{ "sub": sub.GetName(), @@ -916,39 +914,54 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { maxGeneration = sub.Status.InstallPlanGeneration } - // ensure the installplan reference is correct - sub, changedIP, err := o.ensureSubscriptionInstallPlanState(logger, sub) - if err != nil { - logger.Debugf("error ensuring installplan state: %v", err) - return err + out := sub.DeepCopy() + if err := o.setIPState(ctx, out); err != nil { + errs = append(errs, err) + continue } - subscriptionUpdated = subscriptionUpdated || changedIP - // record the current state of the desired corresponding CSV in the status. no-op if we don't know the csv yet. - sub, changedCSV, err := o.ensureSubscriptionCSVState(logger, sub, querier) - if err != nil { - logger.Debugf("error recording current state of CSV in status: %v", err) - return err + if err := o.setCSVState(ctx, out); err != nil { + errs = append(errs, err) + continue + } + + lastUpdated := sub.Status.LastUpdated + if out.Status.InstallPlanRef != nil && out.Status.State == v1alpha1.SubscriptionStateUpgradePending { + logger.Debug("pending upgrade") + skipResolution = true + } + + if lastResolution == nil || lastUpdated.Before(lastResolution) { + // lastResolution is min(subs[].Status.LastUpdated) + lastResolution = &lastUpdated + } + + if reflect.DeepEqual(out.Status, sub.Status) { + continue + } + + out.Status.LastUpdated = o.now() + if out, err = o.client.OperatorsV1alpha1().Subscriptions(out.GetNamespace()).UpdateStatus(ctx, out, metav1.UpdateOptions{}); err != nil { + errs = append(errs, err) + continue } - subscriptionUpdated = subscriptionUpdated || changedCSV - subs[i] = sub + subs[i] = out } - if subscriptionUpdated { - logger.Debug("subscriptions were updated, wait for a new resolution") - return nil + + if len(errs) > 0 { + return utilerrors.NewAggregate(errs) } - shouldUpdate := false - for _, sub := range subs { - shouldUpdate = shouldUpdate || !o.nothingToUpdate(logger, sub) + if lastResolution != nil && o.sourcesLastUpdate.Before(lastResolution.Time) { + logger.Debugf("last resolution (%s) occurred after catalog update (%s)", lastResolution, o.sourcesLastUpdate.Current()) + skipResolution = true } - if !shouldUpdate { - logger.Debug("all subscriptions up to date") + if skipResolution { + logger.Debug("skipping resolution") return nil } - - logger.Debug("resolving subscriptions in namespace") + logger.Debug("resolving subscriptions") // resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace) @@ -987,45 +1000,45 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { } }() - // create installplan if anything updated - if len(updatedSubs) > 0 { - logger.Debug("resolution caused subscription changes, creating installplan") - // Finish calculating max generation by checking the existing installplans - installPlans, err := o.listInstallPlans(namespace) - if err != nil { - return err - } - for _, ip := range installPlans { - if gen := ip.Spec.Generation; gen > maxGeneration { - maxGeneration = gen - } - } + if len(updatedSubs) < 1 { + logger.Debugf("no subscriptions were updated") + return nil + } + logger.Debug("resolution caused subscription changes, creating installplan") - // any subscription in the namespace with manual approval will force generated installplans to be manual - // TODO: this is an odd artifact of the older resolver, and will probably confuse users. approval mode could be on the operatorgroup? - installPlanApproval := v1alpha1.ApprovalAutomatic - for _, sub := range subs { - if sub.Spec.InstallPlanApproval == v1alpha1.ApprovalManual { - installPlanApproval = v1alpha1.ApprovalManual - break - } + // Finish calculating max generation by checking the existing installplans + installPlans, err := o.listInstallPlans(namespace) + if err != nil { + return err + } + for _, ip := range installPlans { + if gen := ip.Spec.Generation; gen > maxGeneration { + maxGeneration = gen } + } - installPlanReference, err := o.ensureInstallPlan(logger, namespace, maxGeneration+1, subs, installPlanApproval, steps, bundleLookups) - if err != nil { - logger.WithError(err).Debug("error ensuring installplan") - return err + // Any subscription in the namespace with manual approval will force generated installplans to be manual + // TODO: this is an odd artifact of the older resolver, and will probably confuse users. approval mode could be on the operatorgroup? + installPlanApproval := v1alpha1.ApprovalAutomatic + for _, sub := range subs { + if sub.Spec.InstallPlanApproval == v1alpha1.ApprovalManual { + installPlanApproval = v1alpha1.ApprovalManual + break } - updatedSubs = o.setIPReference(updatedSubs, maxGeneration+1, installPlanReference) - for _, updatedSub := range updatedSubs { - for i, sub := range subs { - if sub.Name == updatedSub.Name && sub.Namespace == updatedSub.Namespace { - subs[i] = updatedSub - } + } + + installPlanReference, err := o.ensureInstallPlan(logger, namespace, maxGeneration+1, subs, installPlanApproval, steps, bundleLookups) + if err != nil { + logger.WithError(err).Debug("error ensuring installplan") + return err + } + updatedSubs = o.setIPReference(updatedSubs, maxGeneration+1, installPlanReference) + for _, updatedSub := range updatedSubs { + for i, sub := range subs { + if sub.Name == updatedSub.Name && sub.Namespace == updatedSub.Namespace { + subs[i] = updatedSub } } - } else { - logger.Debugf("no subscriptions were updated") } return nil @@ -1043,89 +1056,55 @@ func (o *Operator) syncSubscriptions(obj interface{}) error { return nil } -func (o *Operator) nothingToUpdate(logger *logrus.Entry, sub *v1alpha1.Subscription) bool { - if sub.Status.InstallPlanRef != nil && sub.Status.State == v1alpha1.SubscriptionStateUpgradePending { - logger.Debugf("skipping update: installplan already created") - return true - } - return false -} - -func (o *Operator) ensureSubscriptionInstallPlanState(logger *logrus.Entry, sub *v1alpha1.Subscription) (*v1alpha1.Subscription, bool, error) { +func (o *Operator) setIPState(ctx context.Context, sub *v1alpha1.Subscription) error { if sub.Status.InstallPlanRef != nil || sub.Status.Install != nil { - return sub, false, nil + return nil } - logger.Debug("checking for existing installplan") - - // check if there's an installplan that created this subscription (only if it doesn't have a reference yet) - // this indicates it was newly resolved by another operator, and we should reference that installplan in the status ipName, ok := sub.GetAnnotations()[generatedByKey] if !ok { - return sub, false, nil + // Created by a user, resolution will set the ref + return nil } - ip, err := o.client.OperatorsV1alpha1().InstallPlans(sub.GetNamespace()).Get(context.TODO(), ipName, metav1.GetOptions{}) + // This Subscription was created by an InstallPlan, find it and set the ref + ip, err := o.client.OperatorsV1alpha1().InstallPlans(sub.GetNamespace()).Get(ctx, ipName, metav1.GetOptions{}) if err != nil { - logger.WithField("installplan", ipName).Warn("unable to get installplan from cache") - return nil, false, err + return err } - logger.WithField("installplan", ipName).Debug("found installplan that generated subscription") - out := sub.DeepCopy() ref, err := reference.GetReference(ip) if err != nil { - logger.WithError(err).Warn("unable to generate installplan reference") - return nil, false, err - } - out.Status.InstallPlanRef = ref - out.Status.Install = v1alpha1.NewInstallPlanReference(ref) - out.Status.State = v1alpha1.SubscriptionStateUpgradePending - out.Status.CurrentCSV = out.Spec.StartingCSV - out.Status.LastUpdated = o.now() - - updated, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).UpdateStatus(context.TODO(), out, metav1.UpdateOptions{}) - if err != nil { - return nil, false, err + return err } + sub.Status.InstallPlanRef = ref + sub.Status.Install = v1alpha1.NewInstallPlanReference(ref) + sub.Status.State = v1alpha1.SubscriptionStateUpgradePending + sub.Status.CurrentCSV = sub.Spec.StartingCSV // TODO: Does this always make sense to stomp? Shouldn't this reference a CSV in the InstallPlan? + // out.Status.LastUpdated = o.now() - return updated, true, nil + return nil } -func (o *Operator) ensureSubscriptionCSVState(logger *logrus.Entry, sub *v1alpha1.Subscription, querier SourceQuerier) (*v1alpha1.Subscription, bool, error) { +func (o *Operator) setCSVState(ctx context.Context, sub *v1alpha1.Subscription) error { if sub.Status.CurrentCSV == "" { - return sub, false, nil - } - - _, err := o.client.OperatorsV1alpha1().ClusterServiceVersions(sub.GetNamespace()).Get(context.TODO(), sub.Status.CurrentCSV, metav1.GetOptions{}) - out := sub.DeepCopy() - if err != nil { - logger.WithError(err).WithField("currentCSV", sub.Status.CurrentCSV).Debug("error fetching csv listed in subscription status") - out.Status.State = v1alpha1.SubscriptionStateUpgradePending - } else { - // Check if an update is available for the current csv - if err := querier.Queryable(); err != nil { - return nil, false, err - } - out.Status.State = v1alpha1.SubscriptionStateAtLatest - out.Status.InstalledCSV = sub.Status.CurrentCSV - } - - if sub.Status.State == out.Status.State { - // The subscription status represents the cluster state - return sub, false, nil + // Target CSV hasn't been determined yet, nothing to do + return nil } - out.Status.LastUpdated = o.now() - // Update Subscription with status of transition. Log errors if we can't write them to the status. - updatedSub, err := o.client.OperatorsV1alpha1().Subscriptions(out.GetNamespace()).UpdateStatus(context.TODO(), out, metav1.UpdateOptions{}) - if err != nil { - logger.WithError(err).Info("error updating subscription status") - return nil, false, fmt.Errorf("error updating Subscription status: " + err.Error()) + _, err := o.client.OperatorsV1alpha1().ClusterServiceVersions(sub.GetNamespace()).Get(ctx, sub.Status.CurrentCSV, metav1.GetOptions{}) + switch { + case err == nil: + // Target CSV found, bump + // TODO: What does AtLatest mean? If status.currentCSV is just the next target version in a chain of upgrades, then AtLatest can't mean we are done upgrading for now. Should this always be the decision of resolution? Does this cause flapping with resolution; e.g. AtLatest -> UpgradePending -> AtLatest -> UpgradePending -> ... (until resolution has no more changes)? + sub.Status.State = v1alpha1.SubscriptionStateAtLatest + sub.Status.InstalledCSV = sub.Status.CurrentCSV + case k8serrors.IsNotFound(err): + sub.Status.State = v1alpha1.SubscriptionStateUpgradePending + err = nil // Clear error } - // subscription status represents cluster state - return updatedSub, true, nil + return err } func (o *Operator) setIPReference(subs []*v1alpha1.Subscription, gen int, installPlanRef *corev1.ObjectReference) []*v1alpha1.Subscription { diff --git a/pkg/lib/time/shared.go b/pkg/lib/time/shared.go index 5ebb93d5c5..dcb45e205b 100644 --- a/pkg/lib/time/shared.go +++ b/pkg/lib/time/shared.go @@ -30,3 +30,10 @@ func (s *SharedTime) Set(current time.Time) { s.time = current } + +func (s *SharedTime) Current() time.Time { + s.RLock() + defer s.RUnlock() + + return s.time +}