Skip to content

feat: enable configuring a handler to listen to informers stopping #1493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 30, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,7 @@ default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
return Optional.empty();
}

default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class ConfigurationServiceOverrider {
private ExecutorService executorService;
private ExecutorService workflowExecutorService;
private LeaderElectionConfiguration leaderElectionConfiguration;
private InformerStoppedHandler informerStoppedHandler;

ConfigurationServiceOverrider(ConfigurationService original) {
this.original = original;
Expand Down Expand Up @@ -93,6 +94,11 @@ public ConfigurationServiceOverrider withLeaderElectionConfiguration(
return this;
}

public ConfigurationServiceOverrider withInformerStoppedHandler(InformerStoppedHandler handler) {
this.informerStoppedHandler = handler;
return this;
}

public ConfigurationService build() {
return new BaseConfigurationService(original.getVersion(), cloner, objectMapper) {
@Override
Expand Down Expand Up @@ -159,6 +165,12 @@ public Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
return leaderElectionConfiguration != null ? Optional.of(leaderElectionConfiguration)
: original.getLeaderElectionConfiguration();
}

@Override
public Optional<InformerStoppedHandler> getInformerStoppedHandler() {
return informerStoppedHandler != null ? Optional.of(informerStoppedHandler)
: original.getInformerStoppedHandler();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.javaoperatorsdk.operator.api.config;

import io.fabric8.kubernetes.client.informers.SharedIndexInformer;

public interface InformerStoppedHandler {
Copy link
Collaborator

@csviri csviri Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would rather rename it to InformerStopHandler ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current name is better because it reflects the fact that the handler is called after the informer is stopped.


@SuppressWarnings("rawtypes")
void onStop(SharedIndexInformer informer, Throwable ex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
Expand All @@ -25,6 +26,18 @@ class InformerWrapper<T extends HasMetadata>

public InformerWrapper(SharedIndexInformer<T> informer) {
this.informer = informer;

// register
ConfigurationServiceProvider.instance().getInformerStoppedHandler()
.ifPresent(ish -> {
final var stopped = informer.stopped();
if (stopped != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which condition stopped is going to be null ?
With the current implementation in the client it doesn't seem to be possible.

I would leave this check out to fail early if this erroneous situation appears.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. This happens during tests, though, where it would require quite a bit of setup just to mock the future…

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather prefer more setup in the tests as opposed to the risk of this to happen and be swallowed in a real operator.
The risk here is to end up with a non-working callback without noticing.

If you are really, really against doing it, I should, at least, ask for a loud warning when this situation occurs .

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's now explicit so there's no way to silently fail anymore.

stopped.handle((res, ex) -> {
ish.onStop(informer, ex);
return null;
});
}
});
this.cache = (Cache<T>) informer.getStore();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.javaoperatorsdk.operator;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.GenericKubernetesClient;
Expand All @@ -14,12 +17,18 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MockKubernetesClient {

public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
return client(clazz, null);
}

public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz,
Consumer<Void> informerRunBehavior) {
final var client = mock(GenericKubernetesClient.class);
MixedOperation<T, KubernetesResourceList<T>, Resource<T>> resources =
mock(MixedOperation.class);
Expand All @@ -34,6 +43,19 @@ public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
when(resources.inAnyNamespace()).thenReturn(inAnyNamespace);
when(inAnyNamespace.withLabelSelector(nullable(String.class))).thenReturn(filterable);
SharedIndexInformer<T> informer = mock(SharedIndexInformer.class);
CompletableFuture<Void> stopped = new CompletableFuture<>();
when(informer.stopped()).thenReturn(stopped);
if (informerRunBehavior != null) {
doAnswer(invocation -> {
try {
informerRunBehavior.accept(null);
} catch (Exception e) {
stopped.completeExceptionally(e);
}
return null;
}).when(informer).run();
}
doAnswer(invocation -> null).when(informer).stop();
Indexer mockIndexer = mock(Indexer.class);
when(informer.getIndexer()).thenReturn(mockIndexer);
when(filterable.runnableInformer(anyLong())).thenReturn(informer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public <R extends HasMetadata> R clone(R object) {
.withTerminationTimeoutSeconds(100)
.withMetrics(new Metrics() {})
.withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS"))
.withInformerStoppedHandler((informer, ex) -> {
})
.build();

assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop());
Expand All @@ -128,5 +130,7 @@ public <R extends HasMetadata> R clone(R object) {
assertNotEquals(config.getObjectMapper(), overridden.getObjectMapper());
assertNotEquals(config.getLeaderElectionConfiguration(),
overridden.getLeaderElectionConfiguration());
assertNotEquals(config.getInformerStoppedHandler(),
overridden.getLeaderElectionConfiguration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.FilterWatchListMultiDeletable;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
import io.javaoperatorsdk.operator.MockKubernetesClient;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
Expand All @@ -22,6 +20,8 @@

import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -36,28 +36,15 @@ class InformerEventSourceTest {
private static final String NEXT_RESOURCE_VERSION = "2";

private InformerEventSource<Deployment, TestCustomResource> informerEventSource;
private final KubernetesClient clientMock = mock(KubernetesClient.class);
private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class);
private final TemporaryResourceCache<Deployment> temporaryResourceCacheMock =
mock(TemporaryResourceCache.class);
private final EventHandler eventHandlerMock = mock(EventHandler.class);
private final MixedOperation crClientMock = mock(MixedOperation.class);
private final FilterWatchListMultiDeletable specificResourceClientMock =
mock(FilterWatchListMultiDeletable.class);
private final FilterWatchListDeletable labeledResourceClientMock =
mock(FilterWatchListDeletable.class);
private final SharedIndexInformer informer = mock(SharedIndexInformer.class);
private final InformerConfiguration<Deployment> informerConfiguration =
mock(InformerConfiguration.class);

@BeforeEach
void setup() {
when(clientMock.resources(any())).thenReturn(crClientMock);
when(crClientMock.inAnyNamespace()).thenReturn(specificResourceClientMock);
when(specificResourceClientMock.withLabelSelector((String) null))
.thenReturn(labeledResourceClientMock);
when(labeledResourceClientMock.runnableInformer(0)).thenReturn(informer);
when(informer.getIndexer()).thenReturn(mock(Indexer.class));

when(informerConfiguration.getEffectiveNamespaces())
.thenReturn(DEFAULT_NAMESPACES_SET);
when(informerConfiguration.getSecondaryToPrimaryMapper())
Expand Down Expand Up @@ -256,6 +243,20 @@ void filtersOnDeleteEvents() {
verify(eventHandlerMock, never()).handleEvent(any());
}

@Test
void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
final var exception = new RuntimeException("Informer stopped exceptionally!");
final var informerStoppedHandler = mock(InformerStoppedHandler.class);
ConfigurationServiceProvider
.overrideCurrent(overrider -> overrider.withInformerStoppedHandler(informerStoppedHandler));
informerEventSource = new InformerEventSource<>(informerConfiguration,
MockKubernetesClient.client(Deployment.class, unused -> {
throw exception;
}));
informerEventSource.start();
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
}

Deployment testDeployment() {
Deployment deployment = new Deployment();
deployment.setMetadata(new ObjectMeta());
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<sonar.host.url>https://sonarcloud.io</sonar.host.url>

<junit.version>5.9.0</junit.version>
<fabric8-client.version>5.12.3</fabric8-client.version>
<fabric8-client.version>5.12-SNAPSHOT</fabric8-client.version>
<slf4j.version>1.7.36</slf4j.version>
<log4j.version>2.18.0</log4j.version>
<mokito.version>4.8.0</mokito.version>
Expand Down