-
Notifications
You must be signed in to change notification settings - Fork 564
OCPBUGS-17157: *: detect when all objects are labelled, restart #3028
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4ff6f5f
f690c77
7b31c00
0ddd697
fcbe188
7bd684f
608e55f
a30fcf2
0a876ef
cbf1df5
8fccaed
6b40c16
034ba2d
6bb58e6
ead44da
c5e6b63
3552193
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -185,7 +185,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo | |
return nil, err | ||
} | ||
|
||
canFilter, err := labeller.Validate(ctx, logger, metadataClient) | ||
canFilter, err := labeller.Validate(ctx, logger, metadataClient, crClient) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -207,10 +207,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo | |
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(), | ||
catalogSubscriberIndexer: map[string]cache.Indexer{}, | ||
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient), | ||
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient), | ||
clientAttenuator: scoped.NewClientAttenuator(logger, validatingConfig, opClient), | ||
installPlanTimeout: installPlanTimeout, | ||
bundleUnpackTimeout: bundleUnpackTimeout, | ||
clientFactory: clients.NewFactory(config), | ||
clientFactory: clients.NewFactory(validatingConfig), | ||
} | ||
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState) | ||
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger) | ||
|
@@ -380,10 +380,25 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo | |
op.lister.RbacV1().RegisterRoleLister(metav1.NamespaceAll, roleInformer.Lister()) | ||
sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer()) | ||
|
||
labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error { | ||
complete := map[schema.GroupVersionResource][]bool{} | ||
completeLock := &sync.Mutex{} | ||
|
||
labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync func(done func() bool) queueinformer.LegacySyncHandler) error { | ||
if canFilter { | ||
return nil | ||
} | ||
|
||
// for each GVR, we may have more than one labelling controller active; each of which detects | ||
// when it is done; we allocate a space in complete[gvr][idx] to hold that outcome and track it | ||
var idx int | ||
joelanford marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if _, exists := complete[gvr]; exists { | ||
idx = len(complete[gvr]) | ||
complete[gvr] = append(complete[gvr], false) | ||
} else { | ||
idx = 0 | ||
complete[gvr] = []bool{false} | ||
} | ||
|
||
queue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{ | ||
Name: gvr.String(), | ||
}) | ||
|
@@ -392,7 +407,23 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo | |
queueinformer.WithQueue(queue), | ||
queueinformer.WithLogger(op.logger), | ||
queueinformer.WithInformer(informer), | ||
queueinformer.WithSyncer(sync.ToSyncer()), | ||
queueinformer.WithSyncer(sync(func() bool { | ||
// this function is called by the processor when it detects that it's work is done - so, for that | ||
// particular labelling action on that particular GVR, all objects are in the correct state. when | ||
// that action is done, we need to further know if that was the last action to be completed, as | ||
// when every action we know about has been completed, we re-start the process to allow the future | ||
// invocation of this process to filter informers (canFilter = true) and elide all this logic | ||
completeLock.Lock() | ||
complete[gvr][idx] = true | ||
allDone := true | ||
for _, items := range complete { | ||
for _, done := range items { | ||
allDone = allDone && done | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. break early if false? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but the computation done here is trivial enough that breaking early won't make any meaningful difference in performance and will increase the code length/control flow. |
||
} | ||
} | ||
completeLock.Unlock() | ||
return allDone | ||
}).ToSyncer()), | ||
) | ||
if err != nil { | ||
return err | ||
|
@@ -408,6 +439,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo | |
rolesgvk := rbacv1.SchemeGroupVersion.WithResource("roles") | ||
if err := labelObjects(rolesgvk, roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration]( | ||
ctx, op.logger, labeller.Filter(rolesgvk), | ||
roleInformer.Lister().List, | ||
rbacv1applyconfigurations.Role, | ||
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) { | ||
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts) | ||
|
@@ -420,6 +452,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo | |
func(role *rbacv1.Role) (string, error) { | ||
return resolver.PolicyRuleHashLabelValue(role.Rules) | ||
}, | ||
roleInformer.Lister().List, | ||
rbacv1applyconfigurations.Role, | ||
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) { | ||
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts) | ||
|
@@ -436,6 +469,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo | |
rolebindingsgvk := rbacv1.SchemeGroupVersion.WithResource("rolebindings") | ||
if err := labelObjects(rolebindingsgvk, roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration]( | ||
ctx, op.logger, labeller.Filter(rolebindingsgvk), | ||
roleBindingInformer.Lister().List, | ||
rbacv1applyconfigurations.RoleBinding, | ||
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) { | ||
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts) | ||
|
@@ -448,6 +482,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo | |
func(roleBinding *rbacv1.RoleBinding) (string, error) { | ||
return resolver.RoleReferenceAndSubjectHashLabelValue(roleBinding.RoleRef, roleBinding.Subjects) | ||
}, | ||
roleBindingInformer.Lister().List, | ||
rbacv1applyconfigurations.RoleBinding, | ||
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) { | ||
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts) | ||
|
@@ -463,7 +498,19 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo | |
|
||
serviceaccountsgvk := corev1.SchemeGroupVersion.WithResource("serviceaccounts") | ||
if err := labelObjects(serviceaccountsgvk, serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration]( | ||
ctx, op.logger, labeller.Filter(serviceaccountsgvk), | ||
ctx, op.logger, labeller.ServiceAccountFilter(func(namespace, name string) bool { | ||
operatorGroups, err := operatorGroupInformer.Lister().OperatorGroups(namespace).List(labels.Everything()) | ||
if err != nil { | ||
return false | ||
} | ||
for _, operatorGroup := range operatorGroups { | ||
if operatorGroup.Spec.ServiceAccountName == name { | ||
return true | ||
} | ||
} | ||
return false | ||
}), | ||
serviceAccountInformer.Lister().List, | ||
corev1applyconfigurations.ServiceAccount, | ||
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) { | ||
return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts) | ||
|
@@ -480,6 +527,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo | |
servicesgvk := corev1.SchemeGroupVersion.WithResource("services") | ||
if err := labelObjects(servicesgvk, serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration]( | ||
ctx, op.logger, labeller.Filter(servicesgvk), | ||
serviceInformer.Lister().List, | ||
corev1applyconfigurations.Service, | ||
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) { | ||
return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts) | ||
|
@@ -505,6 +553,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo | |
podsgvk := corev1.SchemeGroupVersion.WithResource("pods") | ||
if err := labelObjects(podsgvk, csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration]( | ||
ctx, op.logger, labeller.Filter(podsgvk), | ||
csPodInformer.Lister().List, | ||
corev1applyconfigurations.Pod, | ||
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) { | ||
return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts) | ||
|
@@ -542,6 +591,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo | |
ctx, op.logger, labeller.JobFilter(func(namespace, name string) (metav1.Object, error) { | ||
return configMapInformer.Lister().ConfigMaps(namespace).Get(name) | ||
}), | ||
jobInformer.Lister().List, | ||
batchv1applyconfigurations.Job, | ||
func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) { | ||
return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts) | ||
|
@@ -617,6 +667,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo | |
customresourcedefinitionsgvk := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions") | ||
if err := labelObjects(customresourcedefinitionsgvk, crdInformer, labeller.ObjectPatchLabeler( | ||
ctx, op.logger, labeller.Filter(customresourcedefinitionsgvk), | ||
crdLister.List, | ||
op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch, | ||
)); err != nil { | ||
return nil, err | ||
|
@@ -1988,13 +2039,15 @@ func transitionInstallPlanState(log logrus.FieldLogger, transitioner installPlan | |
} | ||
log.Debug("attempting to install") | ||
if err := transitioner.ExecutePlan(out); err != nil { | ||
if now.Sub(out.Status.StartTime.Time) >= timeout { | ||
if apierrors.IsForbidden(err) || now.Sub(out.Status.StartTime.Time) < timeout { | ||
// forbidden problems are never terminal since we don't know when a user might provide | ||
// the service account they specified with more permissions | ||
out.Status.Message = fmt.Sprintf("retrying execution due to error: %s", err.Error()) | ||
} else { | ||
out.Status.SetCondition(v1alpha1.ConditionFailed(v1alpha1.InstallPlanInstalled, | ||
v1alpha1.InstallPlanReasonComponentFailed, err.Error(), &now)) | ||
out.Status.Phase = v1alpha1.InstallPlanPhaseFailed | ||
out.Status.Message = err.Error() | ||
} else { | ||
out.Status.Message = fmt.Sprintf("retrying execution due to error: %s", err.Error()) | ||
} | ||
return out, err | ||
} else if !out.Status.NeedsRequeue() { | ||
|
Uh oh!
There was an error while loading. Please reload this page.