Skip to content

Commit 570370d

Browse files
committed
fix(watchers): gracefully handle watch disconnection
1 parent b01073d commit 570370d

File tree

6 files changed

+41
-3
lines changed

6 files changed

+41
-3
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@
99
import io.javaoperatorsdk.operator.processing.DefaultEventHandler;
1010
import io.javaoperatorsdk.operator.processing.EventDispatcher;
1111
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
12+
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
1213
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
1314
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
1415
import io.javaoperatorsdk.operator.processing.retry.Retry;
16+
import java.util.ArrayList;
1517
import java.util.Arrays;
18+
import java.util.List;
1619
import org.slf4j.Logger;
1720
import org.slf4j.LoggerFactory;
1821

@@ -22,6 +25,7 @@ public class Operator {
2225
private static final Logger log = LoggerFactory.getLogger(Operator.class);
2326
private final KubernetesClient k8sClient;
2427
private final ConfigurationService configurationService;
28+
private List<EventSourceManager> eventSourceManagers = new ArrayList<>();
2529

2630
public Operator(KubernetesClient k8sClient, ConfigurationService configurationService) {
2731
this.k8sClient = k8sClient;
@@ -66,6 +70,10 @@ public <R extends CustomResource> void registerController(
6670
registerController(controller, false, null, targetNamespaces);
6771
}
6872

73+
public void close() {
74+
eventSourceManagers.stream().forEach(EventSourceManager::close);
75+
}
76+
6977
@SuppressWarnings("rawtypes")
7078
private <R extends CustomResource> void registerController(
7179
ResourceController<R> controller,
@@ -102,6 +110,8 @@ private <R extends CustomResource> void registerController(
102110
finalizer);
103111
eventSourceManager.registerCustomResourceEventSource(customResourceEventSource);
104112

113+
eventSourceManagers.add(eventSourceManager);
114+
105115
log.info(
106116
"Registered Controller: '{}' for CRD: '{}' for namespaces: {}",
107117
controller.getClass().getSimpleName(),

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ public Map<String, EventSource> getRegisteredEventSources() {
8282
return Collections.unmodifiableMap(eventSources);
8383
}
8484

85+
@Override
86+
public void close() {
87+
customResourceEventSource.close();
88+
}
89+
8590
public void cleanup(String customResourceUid) {
8691
getRegisteredEventSources()
8792
.keySet()

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,7 @@ public interface EventSource {
55
void setEventHandler(EventHandler eventHandler);
66

77
void eventSourceDeRegisteredForResource(String customResourceUid);
8+
9+
default void close() {}
10+
;
811
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,7 @@ Optional<EventSource> deRegisterCustomResourceFromEventSource(
1111
String name, String customResourceUid);
1212

1313
Map<String, EventSource> getRegisteredEventSources();
14+
15+
default void close() {}
16+
;
1417
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.markedForDeletion;
66

77
import io.fabric8.kubernetes.client.CustomResource;
8+
import io.fabric8.kubernetes.client.Watch;
89
import io.fabric8.kubernetes.client.Watcher;
910
import io.fabric8.kubernetes.client.WatcherException;
1011
import io.fabric8.kubernetes.client.dsl.MixedOperation;
@@ -30,6 +31,7 @@ public class CustomResourceEventSource extends AbstractEventSource
3031
private final boolean generationAware;
3132
private final String resourceFinalizer;
3233
private final Map<String, Long> lastGenerationProcessedSuccessfully = new ConcurrentHashMap<>();
34+
private Watch watch;
3335

3436
public static CustomResourceEventSource customResourceEventSourceForAllNamespaces(
3537
CustomResourceCache customResourceCache,
@@ -63,6 +65,10 @@ private CustomResourceEventSource(
6365
this.resourceFinalizer = resourceFinalizer;
6466
}
6567

68+
public Watch getWatch() {
69+
return watch;
70+
}
71+
6672
private boolean isWatchAllNamespaces() {
6773
return targetNamespaces == null;
6874
}
@@ -74,12 +80,12 @@ public void addedToEventManager() {
7480
private void registerWatch() {
7581
CustomResourceOperationsImpl crClient = (CustomResourceOperationsImpl) client;
7682
if (isWatchAllNamespaces()) {
77-
crClient.inAnyNamespace().watch(this);
83+
this.watch = crClient.inAnyNamespace().watch(this);
7884
} else if (targetNamespaces.length == 0) {
79-
client.watch(this);
85+
this.watch = client.watch(this);
8086
} else {
8187
for (String targetNamespace : targetNamespaces) {
82-
crClient.inNamespace(targetNamespace).watch(this);
88+
this.watch = crClient.inNamespace(targetNamespace).watch(this);
8389
log.debug("Registered controller for namespace: {}", targetNamespace);
8490
}
8591
}
@@ -149,6 +155,11 @@ public void eventSourceDeRegisteredForResource(String customResourceUid) {
149155
lastGenerationProcessedSuccessfully.remove(customResourceUid);
150156
}
151157

158+
@Override
159+
public void close() {
160+
watch.close();
161+
}
162+
152163
@Override
153164
public void onClose(WatcherException e) {
154165
if (e == null) {

operator-framework-quarkus-extension/runtime/src/main/java/io/javaoperatorsdk/quarkus/extension/OperatorProducer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import io.javaoperatorsdk.operator.api.ResourceController;
77
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
88
import io.quarkus.arc.DefaultBean;
9+
import io.quarkus.runtime.ShutdownEvent;
10+
import javax.enterprise.event.Observes;
911
import javax.enterprise.inject.Instance;
1012
import javax.enterprise.inject.Produces;
1113
import javax.inject.Inject;
@@ -24,4 +26,8 @@ Operator operator(KubernetesClient client, ConfigurationService configuration) {
2426
controllers.stream().forEach(operator::register);
2527
return operator;
2628
}
29+
30+
void onStop(@Observes ShutdownEvent ev, Operator operator) {
31+
operator.close();
32+
}
2733
}

0 commit comments

Comments
 (0)