Skip to content

Sync olm config #235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions manifests/0000_50_olm_02-olmconfig.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: operators.coreos.com/v1
kind: OLMConfig
metadata:
name: cluster
annotations:
release.openshift.io/create-only: "true"
include.release.openshift.io/ibm-cloud-managed: "true"
include.release.openshift.io/self-managed-high-availability: "true"
include.release.openshift.io/single-node-developer: "true"
3 changes: 3 additions & 0 deletions scripts/cluster-olmconfig.patch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
metadata:
annotations:
release.openshift.io/create-only: "true"
1 change: 1 addition & 0 deletions scripts/generate_crds_manifests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ ${YQ} merge --inplace -d'0' manifests/0000_50_olm_00-namespace.yaml scripts/moni
${YQ} write --inplace -s scripts/olm-deployment.patch.yaml manifests/0000_50_olm_07-olm-operator.deployment.yaml
${YQ} write --inplace -s scripts/catalog-deployment.patch.yaml manifests/0000_50_olm_08-catalog-operator.deployment.yaml
${YQ} write --inplace -s scripts/packageserver-deployment.patch.yaml manifests/0000_50_olm_15-packageserver.clusterserviceversion.yaml
${YQ} merge --inplace manifests/0000_50_olm_02-olmconfig.yaml scripts/cluster-olmconfig.patch.yaml
mv manifests/0000_50_olm_15-packageserver.clusterserviceversion.yaml pkg/manifests/csv.yaml
cp scripts/packageserver-pdb.yaml manifests/0000_50_olm_00-packageserver.pdb.yaml

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: operators.coreos.com/v1
kind: OLMConfig
metadata:
name: cluster
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
utilclock "k8s.io/apimachinery/pkg/util/clock"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -68,6 +70,7 @@ type Operator struct {
copiedCSVLister operatorsv1alpha1listers.ClusterServiceVersionLister
ogQueueSet *queueinformer.ResourceQueueSet
csvQueueSet *queueinformer.ResourceQueueSet
olmConfigQueue workqueue.RateLimitingInterface
csvCopyQueueSet *queueinformer.ResourceQueueSet
copiedCSVGCQueueSet *queueinformer.ResourceQueueSet
objGCQueueSet *queueinformer.ResourceQueueSet
Expand Down Expand Up @@ -124,6 +127,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
client: config.externalClient,
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(),
csvQueueSet: queueinformer.NewEmptyResourceQueueSet(),
olmConfigQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "olmConfig"),
csvCopyQueueSet: queueinformer.NewEmptyResourceQueueSet(),
copiedCSVGCQueueSet: queueinformer.NewEmptyResourceQueueSet(),
objGCQueueSet: queueinformer.NewEmptyResourceQueueSet(),
Expand Down Expand Up @@ -433,6 +437,26 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil, err
}

// Register QueueInformer for olmConfig
olmConfigInformer := externalversions.NewSharedInformerFactoryWithOptions(
op.client,
config.resyncPeriod(),
).Operators().V1().OLMConfigs().Informer()
olmConfigQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithInformer(olmConfigInformer),
queueinformer.WithLogger(op.logger),
queueinformer.WithQueue(op.olmConfigQueue),
queueinformer.WithIndexer(olmConfigInformer.GetIndexer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOLMConfig).ToSyncer()),
)
if err != nil {
return nil, err
}
if err := op.RegisterQueueInformer(olmConfigQueueInformer); err != nil {
return nil, err
}

k8sInformerFactory := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod())
clusterRoleInformer := k8sInformerFactory.Rbac().V1().ClusterRoles()
op.lister.RbacV1().RegisterClusterRoleLister(clusterRoleInformer.Lister())
Expand Down Expand Up @@ -1194,13 +1218,143 @@ func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error)
return
}

func (a *Operator) allNamespaceOperatorGroups() ([]*v1.OperatorGroup, error) {
operatorGroups, err := a.lister.OperatorsV1().OperatorGroupLister().List(labels.Everything())
if err != nil {
return nil, err
}

result := []*v1.OperatorGroup{}
for _, operatorGroup := range operatorGroups {
if NewNamespaceSet(operatorGroup.Status.Namespaces).IsAllNamespaces() {
result = append(result, operatorGroup)
}
}
return result, nil
}

func (a *Operator) syncOLMConfig(obj interface{}) (syncError error) {
a.logger.Info("Processing olmConfig")
olmConfig, ok := obj.(*v1.OLMConfig)
if !ok {
return fmt.Errorf("casting OLMConfig failed")
}

// Generate an array of allNamespace OperatorGroups
allNSOperatorGroups, err := a.allNamespaceOperatorGroups()
if err != nil {
return err
}

nonCopiedCSVRequirement, err := labels.NewRequirement(v1alpha1.CopiedLabelKey, selection.DoesNotExist, []string{})
if err != nil {
return err
}

csvIsRequeued := false
for _, og := range allNSOperatorGroups {
// Get all copied CSVs owned by this operatorGroup
copiedCSVRequirement, err := labels.NewRequirement(v1alpha1.CopiedLabelKey, selection.Equals, []string{og.GetNamespace()})
if err != nil {
return err
}

copiedCSVs, err := a.copiedCSVLister.List(labels.NewSelector().Add(*copiedCSVRequirement))
if err != nil {
return err
}

// Filter to unique copies
uniqueCopiedCSVs := map[string]struct{}{}
for _, copiedCSV := range copiedCSVs {
uniqueCopiedCSVs[copiedCSV.GetName()] = struct{}{}
}

csvs, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(og.GetNamespace()).List(labels.NewSelector().Add(*nonCopiedCSVRequirement))
if err != nil {
return err
}

for _, csv := range csvs {
// If the correct number of copied CSVs were found, continue
if _, ok := uniqueCopiedCSVs[csv.GetName()]; ok == olmConfig.CopiedCSVsAreEnabled() {
continue
}

if err := a.csvQueueSet.Requeue(csv.GetNamespace(), csv.GetName()); err != nil {
a.logger.WithError(err).Warn("unable to requeue")
}
csvIsRequeued = true
}
}

// Update the olmConfig status if it has changed.
condition := getCopiedCSVsCondition(!olmConfig.CopiedCSVsAreEnabled(), csvIsRequeued)
if !isStatusConditionPresentAndAreTypeReasonMessageStatusEqual(olmConfig.Status.Conditions, condition) {
meta.SetStatusCondition(&olmConfig.Status.Conditions, condition)
if _, err := a.client.OperatorsV1().OLMConfigs().UpdateStatus(context.TODO(), olmConfig, metav1.UpdateOptions{}); err != nil {
return err
}
}

return nil
}

func isStatusConditionPresentAndAreTypeReasonMessageStatusEqual(conditions []metav1.Condition, condition metav1.Condition) bool {
foundCondition := meta.FindStatusCondition(conditions, condition.Type)
if foundCondition == nil {
return false
}
return foundCondition.Type == condition.Type &&
foundCondition.Reason == condition.Reason &&
foundCondition.Message == condition.Message &&
foundCondition.Status == condition.Status
}

func getCopiedCSVsCondition(isDisabled, csvIsRequeued bool) metav1.Condition {
condition := metav1.Condition{
Type: v1.DisabledCopiedCSVsConditionType,
LastTransitionTime: metav1.Now(),
Status: metav1.ConditionFalse,
}
if !isDisabled {
condition.Reason = "CopiedCSVsEnabled"
condition.Message = "Copied CSVs are enabled and present accross the cluster"
if csvIsRequeued {
condition.Message = "Copied CSVs are enabled and at least one copied CSVs is missing"
}
return condition
}

if csvIsRequeued {
condition.Reason = "CopiedCSVsFound"
condition.Message = "Copied CSVs are disabled and at least one copied CSV was found for an operator installed in AllNamespace mode"
return condition
}

condition.Status = metav1.ConditionTrue
condition.Reason = "NoCopiedCSVsFound"
condition.Message = "Copied CSVs are disabled and none were found for operators installed in AllNamespace mode"

return condition
}

func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) {
clusterServiceVersion, ok := obj.(*v1alpha1.ClusterServiceVersion)
if !ok {
a.logger.Debugf("wrong type: %#v", obj)
return fmt.Errorf("casting ClusterServiceVersion failed")
}

olmConfig, err := a.client.OperatorsV1().OLMConfigs().Get(context.TODO(), "cluster", metav1.GetOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return err
}

if err == nil {
go a.olmConfigQueue.AddAfter(olmConfig, time.Second*5)
}

logger := a.logger.WithFields(logrus.Fields{
"id": queueinformer.NewLoopID(),
"csv": clusterServiceVersion.GetName(),
Expand All @@ -1222,15 +1376,145 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) {
"targetNamespaces": strings.Join(operatorGroup.Status.Namespaces, ","),
}).Debug("copying csv to targets")

copiedCSVsAreEnabled, err := a.copiedCSVsAreEnabled()
if err != nil {
return err
}

// Check if we need to do any copying / annotation for the operatorgroup
if err := a.ensureCSVsInNamespaces(clusterServiceVersion, operatorGroup, NewNamespaceSet(operatorGroup.Status.Namespaces)); err != nil {
logger.WithError(err).Info("couldn't copy CSV to target namespaces")
syncError = err
namespaceSet := NewNamespaceSet(operatorGroup.Status.Namespaces)
if copiedCSVsAreEnabled || !namespaceSet.IsAllNamespaces() {
if err := a.ensureCSVsInNamespaces(clusterServiceVersion, operatorGroup, namespaceSet); err != nil {
logger.WithError(err).Info("couldn't copy CSV to target namespaces")
syncError = err
}

// If the CSV was installed in AllNamespace mode, remove any "CSV Copying Disabled" events
// in which the related object's name, namespace, and uid match the given CSV's.
if namespaceSet.IsAllNamespaces() {
if err := a.deleteCSVCopyingDisabledEvent(clusterServiceVersion); err != nil {
return err
}
}
return
}

requirement, err := labels.NewRequirement(v1alpha1.CopiedLabelKey, selection.Equals, []string{clusterServiceVersion.Namespace})
if err != nil {
return err
}

copiedCSVs, err := a.copiedCSVLister.List(labels.NewSelector().Add(*requirement))
if err != nil {
return err
}

for _, copiedCSV := range copiedCSVs {
err := a.client.OperatorsV1alpha1().ClusterServiceVersions(copiedCSV.Namespace).Delete(context.TODO(), copiedCSV.Name, metav1.DeleteOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return err
}
}

if err := a.createCSVCopyingDisabledEvent(clusterServiceVersion); err != nil {
return err
}

return
}

// copiedCSVsAreEnabled determines if csv copying is enabled for OLM.
//
// This method will first attempt to get the "cluster" olmConfig resource,
// if any error other than "IsNotFound" is encountered, false and the error
// will be returned.
//
// If the "cluster" olmConfig resource is found, the value of
// olmConfig.spec.features.disableCopiedCSVs will be returned along with a
// nil error.
//
// If the "cluster" olmConfig resource is not found, true will be returned
// without an error.
func (a *Operator) copiedCSVsAreEnabled() (bool, error) {
olmConfig, err := a.client.OperatorsV1().OLMConfigs().Get(context.TODO(), "cluster", metav1.GetOptions{})
if err != nil {
// Default to true if olmConfig singleton cannot be found
if k8serrors.IsNotFound(err) {
return true, nil
}
// If there was an error that wasn't an IsNotFound, return the error
return false, err
}

// If there was no error, return value based on olmConfig singleton
return olmConfig.CopiedCSVsAreEnabled(), nil
}

func (a *Operator) getCopiedCSVDisabledEventsForCSV(csv *v1alpha1.ClusterServiceVersion) ([]corev1.Event, error) {
result := []corev1.Event{}
if csv == nil {
return result, nil
}

events, err := a.opClient.KubernetesInterface().CoreV1().Events(csv.GetNamespace()).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}

for _, event := range events.Items {
if event.InvolvedObject.Namespace == csv.GetNamespace() &&
event.InvolvedObject.Name == csv.GetName() &&
event.InvolvedObject.UID == csv.GetUID() &&
event.Reason == v1.DisabledCopiedCSVsConditionType {
result = append(result, event)
}
}

return result, nil
}

func (a *Operator) deleteCSVCopyingDisabledEvent(csv *v1alpha1.ClusterServiceVersion) error {
events, err := a.getCopiedCSVDisabledEventsForCSV(csv)
if err != nil {
return err
}

// Remove existing events.
return a.deleteEvents(events)
}

func (a *Operator) deleteEvents(events []corev1.Event) error {
for _, event := range events {
err := a.opClient.KubernetesInterface().EventsV1().Events(event.GetNamespace()).Delete(context.TODO(), event.GetName(), metav1.DeleteOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return err
}
}
return nil
}

func (a *Operator) createCSVCopyingDisabledEvent(csv *v1alpha1.ClusterServiceVersion) error {
events, err := a.getCopiedCSVDisabledEventsForCSV(csv)
if err != nil {
return err
}

if len(events) == 1 {
return nil
}

// Remove existing events.
if len(events) > 1 {
if err := a.deleteEvents(events); err != nil {
return err
}
}

a.recorder.Eventf(csv, corev1.EventTypeWarning, v1.DisabledCopiedCSVsConditionType, "CSV copying disabled for %s/%s", csv.GetNamespace(), csv.GetName())

return nil
}

func (a *Operator) syncGcCsv(obj interface{}) (syncError error) {
clusterServiceVersion, ok := obj.(*v1alpha1.ClusterServiceVersion)
if !ok {
Expand Down
Loading