diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index b857ddb710..15e39a4fcd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -9,10 +9,13 @@ import io.javaoperatorsdk.operator.processing.DefaultEventHandler; import io.javaoperatorsdk.operator.processing.EventDispatcher; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; +import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; import io.javaoperatorsdk.operator.processing.retry.Retry; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +25,7 @@ public class Operator { private static final Logger log = LoggerFactory.getLogger(Operator.class); private final KubernetesClient k8sClient; private final ConfigurationService configurationService; + private List eventSourceManagers = new ArrayList<>(); public Operator(KubernetesClient k8sClient, ConfigurationService configurationService) { this.k8sClient = k8sClient; @@ -66,6 +70,10 @@ public void registerController( registerController(controller, false, null, targetNamespaces); } + public void close() { + eventSourceManagers.stream().forEach(EventSourceManager::close); + } + @SuppressWarnings("rawtypes") private void registerController( ResourceController controller, @@ -102,6 +110,8 @@ private void registerController( finalizer); eventSourceManager.registerCustomResourceEventSource(customResourceEventSource); + eventSourceManagers.add(eventSourceManager); + log.info( "Registered Controller: '{}' for CRD: '{}' for namespaces: {}", controller.getClass().getSimpleName(), diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index 24e0fe2d4f..3f54141933 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -82,6 +82,11 @@ public Map getRegisteredEventSources() { return Collections.unmodifiableMap(eventSources); } + @Override + public void close() { + customResourceEventSource.close(); + } + public void cleanup(String customResourceUid) { getRegisteredEventSources() .keySet() diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java index bd992b71de..72a8844a78 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java @@ -5,4 +5,6 @@ public interface EventSource { void setEventHandler(EventHandler eventHandler); void eventSourceDeRegisteredForResource(String customResourceUid); + + default void close() {}; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index cc8e5660a8..d50c70ff6b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -11,4 +11,6 @@ Optional deRegisterCustomResourceFromEventSource( String name, String customResourceUid); Map getRegisteredEventSources(); + + default void close() {}; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index c8d13a4d4b..2470188df0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -5,6 +5,7 @@ import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.markedForDeletion; import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.MixedOperation; @@ -30,6 +31,7 @@ public class CustomResourceEventSource extends AbstractEventSource private final boolean generationAware; private final String resourceFinalizer; private final Map lastGenerationProcessedSuccessfully = new ConcurrentHashMap<>(); + private Watch watch; public static CustomResourceEventSource customResourceEventSourceForAllNamespaces( CustomResourceCache customResourceCache, @@ -63,6 +65,10 @@ private CustomResourceEventSource( this.resourceFinalizer = resourceFinalizer; } + public Watch getWatch() { + return watch; + } + private boolean isWatchAllNamespaces() { return targetNamespaces == null; } @@ -74,12 +80,12 @@ public void addedToEventManager() { private void registerWatch() { CustomResourceOperationsImpl crClient = (CustomResourceOperationsImpl) client; if (isWatchAllNamespaces()) { - crClient.inAnyNamespace().watch(this); + this.watch = crClient.inAnyNamespace().watch(this); } else if (targetNamespaces.length == 0) { - client.watch(this); + this.watch = client.watch(this); } else { for (String targetNamespace : targetNamespaces) { - crClient.inNamespace(targetNamespace).watch(this); + this.watch = crClient.inNamespace(targetNamespace).watch(this); log.debug("Registered controller for namespace: {}", targetNamespace); } } @@ -149,6 +155,11 @@ public void eventSourceDeRegisteredForResource(String customResourceUid) { lastGenerationProcessedSuccessfully.remove(customResourceUid); } + @Override + public void close() { + watch.close(); + } + @Override public void onClose(WatcherException e) { if (e == null) { diff --git a/operator-framework-quarkus-extension/runtime/src/main/java/io/javaoperatorsdk/quarkus/extension/OperatorProducer.java b/operator-framework-quarkus-extension/runtime/src/main/java/io/javaoperatorsdk/quarkus/extension/OperatorProducer.java index 7ad1cbac56..bcd03bfc2e 100644 --- a/operator-framework-quarkus-extension/runtime/src/main/java/io/javaoperatorsdk/quarkus/extension/OperatorProducer.java +++ b/operator-framework-quarkus-extension/runtime/src/main/java/io/javaoperatorsdk/quarkus/extension/OperatorProducer.java @@ -6,6 +6,8 @@ import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.quarkus.arc.DefaultBean; +import io.quarkus.runtime.ShutdownEvent; +import javax.enterprise.event.Observes; import javax.enterprise.inject.Instance; import javax.enterprise.inject.Produces; import javax.inject.Inject; @@ -24,4 +26,9 @@ Operator operator(KubernetesClient client, ConfigurationService configuration) { controllers.stream().forEach(operator::register); return operator; } + + void onStop(@Observes ShutdownEvent ev, Operator operator) { + operator.close(); + } + }