Skip to content

Multiple CRD versions support #874

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions operator-framework-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>openshift-client</artifactId>
<version>${fabric8-client.version}</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -98,6 +99,7 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-server-mock</artifactId>
<version>${fabric8-client.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public static String getResourceTypeName(Class<? extends HasMetadata> resourceCl
// return HasMetadata.getFullResourceName(resourceClass);
final var group = HasMetadata.getGroup(resourceClass);
final var plural = HasMetadata.getPlural(resourceClass);
return (group == null || group.isEmpty()) ? plural : plural + "." + group;
final var version = HasMetadata.getVersion(resourceClass);
return (group == null || group.isEmpty()) ? plural : plural + "." + group + "/" + version;
}

public static String getDefaultFinalizerName(Class<? extends HasMetadata> resourceClass) {
Expand Down Expand Up @@ -83,15 +84,16 @@ public static String getDefaultNameFor(Reconciler reconciler) {
}

public static String getDefaultNameFor(Class<? extends Reconciler> reconcilerClass) {
return getDefaultReconcilerName(reconcilerClass.getSimpleName());
return getDefaultReconcilerName(reconcilerClass.getCanonicalName());
}

public static String getDefaultReconcilerName(String reconcilerClassName) {
// if the name is fully qualified, extract the simple class name
final var lastDot = reconcilerClassName.lastIndexOf('.');
if (lastDot > 0) {
reconcilerClassName = reconcilerClassName.substring(lastDot + 1);
}
// TODO: check why this logic was imlpemented in first place
// // if the name is fully qualified, extract the simple class name
// final var lastDot = reconcilerClassName.lastIndexOf('.');
// if (lastDot > 0) {
// reconcilerClassName = reconcilerClassName.substring(lastDot + 1);
// }
return reconcilerClassName.toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
import java.util.List;
import java.util.Objects;

import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.api.model.GenericKubernetesResourceList;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext;
import io.fabric8.kubernetes.internal.KubernetesDeserializer;
import io.javaoperatorsdk.operator.CustomResourceUtils;
import io.javaoperatorsdk.operator.MissingCRDException;
import io.javaoperatorsdk.operator.OperatorException;
Expand Down Expand Up @@ -152,6 +156,13 @@ public MixedOperation<R, KubernetesResourceList<R>, Resource<R>> getCRClient() {
return kubernetesClient.resources(configuration.getResourceClass());
}

public MixedOperation<GenericKubernetesResource, GenericKubernetesResourceList, Resource<GenericKubernetesResource>> getGenericClient() {
var context = ResourceDefinitionContext.fromResourceType(configuration.getResourceClass());
KubernetesDeserializer.registerCustomKind(context.getVersion(), context.getKind(),
GenericKubernetesResource.class);
return kubernetesClient.genericKubernetesResources(context);
}

/**
* Registers the specified controller with this operator, overriding its default configuration by
* the specified one (usually created via
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ReconciliationDispatcher<R extends HasMetadata> {
}

public ReconciliationDispatcher(Controller<R> controller) {
this(controller, new CustomResourceFacade<>(controller.getCRClient()));
this(controller, new CustomResourceFacade<R>(controller.getCRClient()));
}

public PostExecutionControl<R> handleExecution(ExecutionScope<R> executionScope) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;

import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.javaoperatorsdk.operator.api.config.Cloner;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.Cache;
Expand All @@ -17,30 +20,37 @@

public class ControllerResourceCache<T extends HasMetadata> implements ResourceCache<T> {

private final Map<String, SharedIndexInformer<T>> sharedIndexInformers;
private final Map<String, SharedIndexInformer<GenericKubernetesResource>> sharedIndexInformers;
private final Cloner cloner;

public ControllerResourceCache(Map<String, SharedIndexInformer<T>> sharedIndexInformers,
public ControllerResourceCache(
Map<String, SharedIndexInformer<GenericKubernetesResource>> sharedIndexInformers,
Cloner cloner) {
this.sharedIndexInformers = sharedIndexInformers;
this.cloner = cloner;
}

private Function<GenericKubernetesResource, T> CONVERT =
resource -> Serialization.unmarshal(Serialization.asJson(resource));

@Override
public Stream<T> list(Predicate<T> predicate) {
return sharedIndexInformers.values().stream()
.flatMap(i -> i.getStore().list().stream().filter(predicate));
.flatMap(i -> i.getStore().list().stream()
.map(CONVERT).filter(predicate));
}

@Override
public Stream<T> list(String namespace, Predicate<T> predicate) {
if (isWatchingAllNamespaces()) {
final var stream = sharedIndexInformers.get(ANY_NAMESPACE_MAP_KEY).getStore().list().stream()
.filter(r -> r.getMetadata().getNamespace().equals(namespace));
.map(CONVERT).filter(r -> r.getMetadata().getNamespace().equals(namespace));
return predicate != null ? stream.filter(predicate) : stream;
} else {
final var informer = sharedIndexInformers.get(namespace);
return informer != null ? informer.getStore().list().stream().filter(predicate)
return informer != null
? informer.getStore().list().stream()
.map(CONVERT).filter(predicate)
: Stream.empty();
}
}
Expand All @@ -59,7 +69,7 @@ public Optional<T> get(ResourceID resourceID) {
if (resource == null) {
return Optional.empty();
} else {
return Optional.of(cloner.clone(resource));
return Optional.of(cloner.clone(resource)).map(CONVERT);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.api.model.GenericKubernetesResourceList;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.javaoperatorsdk.operator.MissingCRDException;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
Expand All @@ -29,27 +31,28 @@

public class ControllerResourceEventSource<T extends HasMetadata>
extends AbstractResourceEventSource<T, T>
implements ResourceEventHandler<T> {
implements ResourceEventHandler<GenericKubernetesResource> {

public static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace";

private static final Logger log = LoggerFactory.getLogger(ControllerResourceEventSource.class);

private final Controller<T> controller;
private final Map<String, SharedIndexInformer<T>> sharedIndexInformers =
private final Map<String, SharedIndexInformer<GenericKubernetesResource>> sharedIndexInformers =
new ConcurrentHashMap<>();

private final ResourceEventFilter<T> filter;
private final OnceWhitelistEventFilterEventFilter<T> onceWhitelistEventFilterEventFilter;
private final ControllerResourceCache<T> cache;
private final String CRVersion;

public ControllerResourceEventSource(Controller<T> controller) {
super(controller.getConfiguration().getResourceClass());
this.controller = controller;
final var configurationService = controller.getConfiguration().getConfigurationService();
var cloner = configurationService != null ? configurationService.getResourceCloner()
: ConfigurationService.DEFAULT_CLONER;
this.cache = new ControllerResourceCache<>(sharedIndexInformers, cloner);
this.cache = new ControllerResourceCache<T>(sharedIndexInformers, cloner);

var filters = new ResourceEventFilter[] {
ResourceEventFilters.finalizerNeededAndApplied(),
Expand All @@ -69,13 +72,18 @@ public ControllerResourceEventSource(Controller<T> controller) {
} else {
filter = ResourceEventFilters.or(filters);
}

var resourceClass = controller.getConfiguration().getResourceClass();
// TODO: check if we should use: HasMetadata.getFullResourceName(resourceClass);
this.CRVersion =
HasMetadata.getGroup(resourceClass) + "/" + HasMetadata.getVersion(resourceClass);
}

@Override
public void start() {
final var configuration = controller.getConfiguration();
final var targetNamespaces = configuration.getEffectiveNamespaces();
final var client = controller.getCRClient();
final var client = controller.getGenericClient();
final var labelSelector = configuration.getLabelSelector();

try {
Expand All @@ -100,8 +108,9 @@ public void start() {
super.start();
}

private SharedIndexInformer<T> createAndRunInformerFor(
FilterWatchListDeletable<T, KubernetesResourceList<T>> filteredBySelectorClient, String key) {
private SharedIndexInformer<GenericKubernetesResource> createAndRunInformerFor(
FilterWatchListDeletable<GenericKubernetesResource, GenericKubernetesResourceList> filteredBySelectorClient,
String key) {
var informer = filteredBySelectorClient.runnableInformer(0);
informer.addEventHandler(this);
sharedIndexInformers.put(key, informer);
Expand All @@ -111,7 +120,7 @@ private SharedIndexInformer<T> createAndRunInformerFor(

@Override
public void stop() {
for (SharedIndexInformer<T> informer : sharedIndexInformers.values()) {
for (SharedIndexInformer<GenericKubernetesResource> informer : sharedIndexInformers.values()) {
try {
log.info("Stopping informer {} -> {}", controller, informer);
informer.stop();
Expand Down Expand Up @@ -143,19 +152,49 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource
}
}

private String extractUnderlyingCRVersion(GenericKubernetesResource resource) {
return resource
.getMetadata()
.getManagedFields()
.get(0)
.getApiVersion();
}

@Override
public void onAdd(T resource) {
eventReceived(ResourceAction.ADDED, resource, null);
public void onAdd(GenericKubernetesResource genericKubernetesResource) {
if (CRVersion.equals(extractUnderlyingCRVersion(genericKubernetesResource))) {
var resource = Serialization.unmarshal(
Serialization.asJson(genericKubernetesResource), this.getResourceClass());
eventReceived(ResourceAction.ADDED, resource, null);
}
}

@Override
public void onUpdate(T oldCustomResource, T newCustomResource) {
eventReceived(ResourceAction.UPDATED, newCustomResource, oldCustomResource);
public void onUpdate(GenericKubernetesResource oldResource,
GenericKubernetesResource newResource) {
if (CRVersion.equals(extractUnderlyingCRVersion(newResource))) {
var newCustomResource = Serialization.unmarshal(
Serialization.asJson(newResource), this.getResourceClass());

// Best effort try to deserialize the old CR with the same deserializer as the new
T oldCustomResource = null;
try {
oldCustomResource = Serialization.unmarshal(
Serialization.asJson(newResource), this.getResourceClass());
} catch (Exception e) {
// ignored
}
eventReceived(ResourceAction.UPDATED, newCustomResource, oldCustomResource);
}
}

@Override
public void onDelete(T resource, boolean b) {
eventReceived(ResourceAction.DELETED, resource, null);
public void onDelete(GenericKubernetesResource genericKubernetesResource, boolean b) {
if (CRVersion.equals(extractUnderlyingCRVersion(genericKubernetesResource))) {
var resource = Serialization.unmarshal(
Serialization.asJson(genericKubernetesResource), this.getResourceClass());
eventReceived(ResourceAction.DELETED, resource, null);
}
}

public Optional<T> get(ResourceID resourceID) {
Expand All @@ -170,11 +209,11 @@ public ControllerResourceCache<T> getResourceCache() {
* @return shared informers by namespace. If custom resource is not namespace scoped use
* CustomResourceEventSource.ANY_NAMESPACE_MAP_KEY
*/
public Map<String, SharedIndexInformer<T>> getInformers() {
public Map<String, SharedIndexInformer<GenericKubernetesResource>> getInformers() {
return Collections.unmodifiableMap(sharedIndexInformers);
}

public SharedIndexInformer<T> getInformer(String namespace) {
public SharedIndexInformer<GenericKubernetesResource> getInformer(String namespace) {
return getInformers().get(Objects.requireNonNullElse(namespace, ANY_NAMESPACE_MAP_KEY));
}

Expand Down Expand Up @@ -206,4 +245,5 @@ private void handleKubernetesClientException(Exception e) {
public Optional<T> getAssociated(T primary) {
return cache.get(ResourceID.fromResource(primary));
}

}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<sonar.host.url>https://sonarcloud.io</sonar.host.url>

<junit.version>5.8.2</junit.version>
<fabric8-client.version>5.11.2</fabric8-client.version>
<fabric8-client.version>6.0-SNAPSHOT</fabric8-client.version>
<slf4j.version>1.7.33</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<mokito.version>4.2.0</mokito.version>
Expand Down