Skip to content

WIP: refactor resolution sync #2400

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 107 additions & 128 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,22 +888,20 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {

o.gcInstallPlans(logger, namespace)

// get the set of sources that should be used for resolution and best-effort get their connections working
logger.Debug("resolving sources")

querier := NewNamespaceSourceQuerier(o.sources.AsClients(o.namespace, namespace))

logger.Debug("checking if subscriptions need update")

subs, err := o.listSubscriptions(namespace)
if err != nil {
logger.WithError(err).Debug("couldn't list subscriptions")
return err
}

// TODO: parallel
maxGeneration := 0
subscriptionUpdated := false
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
var (
maxGeneration int
skipResolution bool
errs []error
lastResolution *metav1.Time
)
for i, sub := range subs {
logger := logger.WithFields(logrus.Fields{
"sub": sub.GetName(),
Expand All @@ -916,39 +914,54 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
maxGeneration = sub.Status.InstallPlanGeneration
}

// ensure the installplan reference is correct
sub, changedIP, err := o.ensureSubscriptionInstallPlanState(logger, sub)
if err != nil {
logger.Debugf("error ensuring installplan state: %v", err)
return err
out := sub.DeepCopy()
if err := o.setIPState(ctx, out); err != nil {
errs = append(errs, err)
continue
}
subscriptionUpdated = subscriptionUpdated || changedIP

// record the current state of the desired corresponding CSV in the status. no-op if we don't know the csv yet.
sub, changedCSV, err := o.ensureSubscriptionCSVState(logger, sub, querier)
if err != nil {
logger.Debugf("error recording current state of CSV in status: %v", err)
return err
if err := o.setCSVState(ctx, out); err != nil {
errs = append(errs, err)
continue
}

lastUpdated := sub.Status.LastUpdated
if out.Status.InstallPlanRef != nil && out.Status.State == v1alpha1.SubscriptionStateUpgradePending {
logger.Debug("pending upgrade")
skipResolution = true
}

if lastResolution == nil || lastUpdated.Before(lastResolution) {
// lastResolution is min(subs[].Status.LastUpdated)
lastResolution = &lastUpdated
}

if reflect.DeepEqual(out.Status, sub.Status) {
continue
}

out.Status.LastUpdated = o.now()
if out, err = o.client.OperatorsV1alpha1().Subscriptions(out.GetNamespace()).UpdateStatus(ctx, out, metav1.UpdateOptions{}); err != nil {
errs = append(errs, err)
continue
}

subscriptionUpdated = subscriptionUpdated || changedCSV
subs[i] = sub
subs[i] = out
}
if subscriptionUpdated {
logger.Debug("subscriptions were updated, wait for a new resolution")
return nil

if len(errs) > 0 {
return utilerrors.NewAggregate(errs)
}

shouldUpdate := false
for _, sub := range subs {
shouldUpdate = shouldUpdate || !o.nothingToUpdate(logger, sub)
if lastResolution != nil && o.sourcesLastUpdate.Before(lastResolution.Time) {
logger.Debugf("last resolution (%s) occurred after catalog update (%s)", lastResolution, o.sourcesLastUpdate.Current())
skipResolution = true
}
if !shouldUpdate {
logger.Debug("all subscriptions up to date")
if skipResolution {
logger.Debug("skipping resolution")
return nil
}

logger.Debug("resolving subscriptions in namespace")
logger.Debug("resolving subscriptions")

// resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors
steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace)
Expand Down Expand Up @@ -987,45 +1000,45 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
}
}()

// create installplan if anything updated
if len(updatedSubs) > 0 {
logger.Debug("resolution caused subscription changes, creating installplan")
// Finish calculating max generation by checking the existing installplans
installPlans, err := o.listInstallPlans(namespace)
if err != nil {
return err
}
for _, ip := range installPlans {
if gen := ip.Spec.Generation; gen > maxGeneration {
maxGeneration = gen
}
}
if len(updatedSubs) < 1 {
logger.Debugf("no subscriptions were updated")
return nil
}
logger.Debug("resolution caused subscription changes, creating installplan")

// any subscription in the namespace with manual approval will force generated installplans to be manual
// TODO: this is an odd artifact of the older resolver, and will probably confuse users. approval mode could be on the operatorgroup?
installPlanApproval := v1alpha1.ApprovalAutomatic
for _, sub := range subs {
if sub.Spec.InstallPlanApproval == v1alpha1.ApprovalManual {
installPlanApproval = v1alpha1.ApprovalManual
break
}
// Finish calculating max generation by checking the existing installplans
installPlans, err := o.listInstallPlans(namespace)
if err != nil {
return err
}
for _, ip := range installPlans {
if gen := ip.Spec.Generation; gen > maxGeneration {
maxGeneration = gen
}
}

installPlanReference, err := o.ensureInstallPlan(logger, namespace, maxGeneration+1, subs, installPlanApproval, steps, bundleLookups)
if err != nil {
logger.WithError(err).Debug("error ensuring installplan")
return err
// Any subscription in the namespace with manual approval will force generated installplans to be manual
// TODO: this is an odd artifact of the older resolver, and will probably confuse users. approval mode could be on the operatorgroup?
installPlanApproval := v1alpha1.ApprovalAutomatic
for _, sub := range subs {
if sub.Spec.InstallPlanApproval == v1alpha1.ApprovalManual {
installPlanApproval = v1alpha1.ApprovalManual
break
}
updatedSubs = o.setIPReference(updatedSubs, maxGeneration+1, installPlanReference)
for _, updatedSub := range updatedSubs {
for i, sub := range subs {
if sub.Name == updatedSub.Name && sub.Namespace == updatedSub.Namespace {
subs[i] = updatedSub
}
}

installPlanReference, err := o.ensureInstallPlan(logger, namespace, maxGeneration+1, subs, installPlanApproval, steps, bundleLookups)
if err != nil {
logger.WithError(err).Debug("error ensuring installplan")
return err
}
updatedSubs = o.setIPReference(updatedSubs, maxGeneration+1, installPlanReference)
for _, updatedSub := range updatedSubs {
for i, sub := range subs {
if sub.Name == updatedSub.Name && sub.Namespace == updatedSub.Namespace {
subs[i] = updatedSub
}
}
} else {
logger.Debugf("no subscriptions were updated")
}

return nil
Expand All @@ -1043,89 +1056,55 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
return nil
}

func (o *Operator) nothingToUpdate(logger *logrus.Entry, sub *v1alpha1.Subscription) bool {
if sub.Status.InstallPlanRef != nil && sub.Status.State == v1alpha1.SubscriptionStateUpgradePending {
logger.Debugf("skipping update: installplan already created")
return true
}
return false
}

func (o *Operator) ensureSubscriptionInstallPlanState(logger *logrus.Entry, sub *v1alpha1.Subscription) (*v1alpha1.Subscription, bool, error) {
func (o *Operator) setIPState(ctx context.Context, sub *v1alpha1.Subscription) error {
if sub.Status.InstallPlanRef != nil || sub.Status.Install != nil {
return sub, false, nil
return nil
}

logger.Debug("checking for existing installplan")

// check if there's an installplan that created this subscription (only if it doesn't have a reference yet)
// this indicates it was newly resolved by another operator, and we should reference that installplan in the status
ipName, ok := sub.GetAnnotations()[generatedByKey]
if !ok {
return sub, false, nil
// Created by a user, resolution will set the ref
return nil
}

ip, err := o.client.OperatorsV1alpha1().InstallPlans(sub.GetNamespace()).Get(context.TODO(), ipName, metav1.GetOptions{})
// This Subscription was created by an InstallPlan, find it and set the ref
ip, err := o.client.OperatorsV1alpha1().InstallPlans(sub.GetNamespace()).Get(ctx, ipName, metav1.GetOptions{})
if err != nil {
logger.WithField("installplan", ipName).Warn("unable to get installplan from cache")
return nil, false, err
return err
}
logger.WithField("installplan", ipName).Debug("found installplan that generated subscription")

out := sub.DeepCopy()
ref, err := reference.GetReference(ip)
if err != nil {
logger.WithError(err).Warn("unable to generate installplan reference")
return nil, false, err
}
out.Status.InstallPlanRef = ref
out.Status.Install = v1alpha1.NewInstallPlanReference(ref)
out.Status.State = v1alpha1.SubscriptionStateUpgradePending
out.Status.CurrentCSV = out.Spec.StartingCSV
out.Status.LastUpdated = o.now()

updated, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).UpdateStatus(context.TODO(), out, metav1.UpdateOptions{})
if err != nil {
return nil, false, err
return err
}
sub.Status.InstallPlanRef = ref
sub.Status.Install = v1alpha1.NewInstallPlanReference(ref)
sub.Status.State = v1alpha1.SubscriptionStateUpgradePending
sub.Status.CurrentCSV = sub.Spec.StartingCSV // TODO: Does this always make sense to stomp? Shouldn't this reference a CSV in the InstallPlan?
// out.Status.LastUpdated = o.now()

return updated, true, nil
return nil
}

func (o *Operator) ensureSubscriptionCSVState(logger *logrus.Entry, sub *v1alpha1.Subscription, querier SourceQuerier) (*v1alpha1.Subscription, bool, error) {
func (o *Operator) setCSVState(ctx context.Context, sub *v1alpha1.Subscription) error {
if sub.Status.CurrentCSV == "" {
return sub, false, nil
}

_, err := o.client.OperatorsV1alpha1().ClusterServiceVersions(sub.GetNamespace()).Get(context.TODO(), sub.Status.CurrentCSV, metav1.GetOptions{})
out := sub.DeepCopy()
if err != nil {
logger.WithError(err).WithField("currentCSV", sub.Status.CurrentCSV).Debug("error fetching csv listed in subscription status")
out.Status.State = v1alpha1.SubscriptionStateUpgradePending
} else {
// Check if an update is available for the current csv
if err := querier.Queryable(); err != nil {
return nil, false, err
}
out.Status.State = v1alpha1.SubscriptionStateAtLatest
out.Status.InstalledCSV = sub.Status.CurrentCSV
}

if sub.Status.State == out.Status.State {
// The subscription status represents the cluster state
return sub, false, nil
// Target CSV hasn't been determined yet, nothing to do
return nil
}
out.Status.LastUpdated = o.now()

// Update Subscription with status of transition. Log errors if we can't write them to the status.
updatedSub, err := o.client.OperatorsV1alpha1().Subscriptions(out.GetNamespace()).UpdateStatus(context.TODO(), out, metav1.UpdateOptions{})
if err != nil {
logger.WithError(err).Info("error updating subscription status")
return nil, false, fmt.Errorf("error updating Subscription status: " + err.Error())
_, err := o.client.OperatorsV1alpha1().ClusterServiceVersions(sub.GetNamespace()).Get(ctx, sub.Status.CurrentCSV, metav1.GetOptions{})
switch {
case err == nil:
// Target CSV found, bump
// TODO: What does AtLatest mean? If status.currentCSV is just the next target version in a chain of upgrades, then AtLatest can't mean we are done upgrading for now. Should this always be the decision of resolution? Does this cause flapping with resolution; e.g. AtLatest -> UpgradePending -> AtLatest -> UpgradePending -> ... (until resolution has no more changes)?
sub.Status.State = v1alpha1.SubscriptionStateAtLatest
sub.Status.InstalledCSV = sub.Status.CurrentCSV
case k8serrors.IsNotFound(err):
sub.Status.State = v1alpha1.SubscriptionStateUpgradePending
err = nil // Clear error
}

// subscription status represents cluster state
return updatedSub, true, nil
return err
}

func (o *Operator) setIPReference(subs []*v1alpha1.Subscription, gen int, installPlanRef *corev1.ObjectReference) []*v1alpha1.Subscription {
Expand Down
7 changes: 7 additions & 0 deletions pkg/lib/time/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,10 @@ func (s *SharedTime) Set(current time.Time) {

s.time = current
}

func (s *SharedTime) Current() time.Time {
s.RLock()
defer s.RUnlock()

return s.time
}