Skip to content

Commit 5a01ff7

Browse files
committed
refactor: encapsulate executor service access via ExecutorServiceManager
The goal is to make sure the thread pool is properly started and shut down at appropriate time, while getting it ready for instrumentation. This should also address the tests issue where the executor was shut down at the end of one test and never re-started at the beginning of the next one thus leading to tasks being rejected.
1 parent 5514f18 commit 5a01ff7

File tree

5 files changed

+151
-60
lines changed

5 files changed

+151
-60
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import java.util.Collections;
77
import java.util.LinkedList;
88
import java.util.List;
9-
import java.util.concurrent.TimeUnit;
109

1110
import org.slf4j.Logger;
1211
import org.slf4j.LoggerFactory;
@@ -17,6 +16,7 @@
1716
import io.javaoperatorsdk.operator.api.ResourceController;
1817
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
1918
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
19+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
2020
import io.javaoperatorsdk.operator.processing.ConfiguredController;
2121
import io.javaoperatorsdk.operator.processing.DefaultEventHandler;
2222
import io.javaoperatorsdk.operator.processing.DefaultEventHandler.EventMonitor;
@@ -94,6 +94,7 @@ public void start() {
9494
throw new OperatorException(error, e);
9595
}
9696

97+
ExecutorServiceManager.start(configurationService);
9798
controllers.start();
9899
}
99100

@@ -103,20 +104,9 @@ public void close() {
103104
log.info(
104105
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
105106

106-
try {
107-
log.debug("Closing executor");
108-
final var executor = configurationService.getExecutorService();
109-
executor.shutdown();
110-
if (!executor.awaitTermination(configurationService.getTerminationTimeoutSeconds(),
111-
TimeUnit.SECONDS)) {
112-
executor.shutdownNow(); // if we timed out, waiting, cancel everything
113-
}
114-
} catch (InterruptedException e) {
115-
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
116-
}
117-
118107
controllers.close();
119108

109+
ExecutorServiceManager.instance().close();
120110
k8sClient.close();
121111
}
122112

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.Set;
44
import java.util.concurrent.ExecutorService;
5+
import java.util.concurrent.Executors;
56

67
import io.fabric8.kubernetes.client.Config;
78
import io.fabric8.kubernetes.client.CustomResource;
@@ -105,6 +106,6 @@ default Metrics getMetrics() {
105106
}
106107

107108
default ExecutorService getExecutorService() {
108-
return ExecutorServiceProducer.getExecutor(concurrentReconciliationThreads());
109+
return Executors.newFixedThreadPool(concurrentReconciliationThreads());
109110
}
110111
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package io.javaoperatorsdk.operator.api.config;
2+
3+
import java.io.Closeable;
4+
import java.util.Collection;
5+
import java.util.List;
6+
import java.util.concurrent.Callable;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Future;
10+
import java.util.concurrent.TimeUnit;
11+
import java.util.concurrent.TimeoutException;
12+
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
public class ExecutorServiceManager implements Closeable {
17+
private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
18+
private static ExecutorServiceManager instance;
19+
20+
private final ExecutorService executor;
21+
private final int terminationTimeoutSeconds;
22+
23+
private ExecutorServiceManager(ExecutorService executor, int terminationTimeoutSeconds) {
24+
this.executor = executor;
25+
this.terminationTimeoutSeconds = terminationTimeoutSeconds;
26+
}
27+
28+
public static void start(ConfigurationService configuration) {
29+
if (instance == null) {
30+
instance = new ExecutorServiceManager(
31+
new InstrumentedExecutorService(configuration.getExecutorService()),
32+
configuration.getTerminationTimeoutSeconds());
33+
} else {
34+
log.debug("Already started, reusing already setup instance!");
35+
}
36+
}
37+
38+
public static ExecutorServiceManager instance() {
39+
if (instance == null) {
40+
throw new IllegalStateException(
41+
"ExecutorServiceManager hasn't been started. Call start method before using!");
42+
}
43+
return instance;
44+
}
45+
46+
public ExecutorService executorService() {
47+
return executor;
48+
}
49+
50+
@Override
51+
public void close() {
52+
try {
53+
log.debug("Closing executor");
54+
executor.shutdown();
55+
if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
56+
executor.shutdownNow(); // if we timed out, waiting, cancel everything
57+
}
58+
} catch (InterruptedException e) {
59+
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
60+
}
61+
}
62+
63+
private static class InstrumentedExecutorService implements ExecutorService {
64+
private final boolean debug;
65+
private final ExecutorService executor;
66+
67+
private InstrumentedExecutorService(ExecutorService executor) {
68+
this.executor = executor;
69+
debug = Utils.debugThreadPool();
70+
}
71+
72+
@Override
73+
public void shutdown() {
74+
if (debug) {
75+
Thread.dumpStack();
76+
}
77+
executor.shutdown();
78+
}
79+
80+
@Override
81+
public List<Runnable> shutdownNow() {
82+
return executor.shutdownNow();
83+
}
84+
85+
@Override
86+
public boolean isShutdown() {
87+
return executor.isShutdown();
88+
}
89+
90+
@Override
91+
public boolean isTerminated() {
92+
return executor.isTerminated();
93+
}
94+
95+
@Override
96+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
97+
return executor.awaitTermination(timeout, unit);
98+
}
99+
100+
@Override
101+
public <T> Future<T> submit(Callable<T> task) {
102+
return executor.submit(task);
103+
}
104+
105+
@Override
106+
public <T> Future<T> submit(Runnable task, T result) {
107+
return executor.submit(task, result);
108+
}
109+
110+
@Override
111+
public Future<?> submit(Runnable task) {
112+
return executor.submit(task);
113+
}
114+
115+
@Override
116+
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
117+
throws InterruptedException {
118+
return executor.invokeAll(tasks);
119+
}
120+
121+
@Override
122+
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
123+
TimeUnit unit) throws InterruptedException {
124+
return executor.invokeAll(tasks, timeout, unit);
125+
}
126+
127+
@Override
128+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
129+
throws InterruptedException, ExecutionException {
130+
return executor.invokeAny(tasks);
131+
}
132+
133+
@Override
134+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
135+
throws InterruptedException, ExecutionException, TimeoutException {
136+
return executor.invokeAny(tasks, timeout, unit);
137+
}
138+
139+
@Override
140+
public void execute(Runnable command) {
141+
executor.execute(command);
142+
}
143+
}
144+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.fabric8.kubernetes.client.CustomResource;
1717
import io.javaoperatorsdk.operator.api.RetryInfo;
1818
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
19+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1920
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
2021
import io.javaoperatorsdk.operator.processing.event.Event;
2122
import io.javaoperatorsdk.operator.processing.event.EventHandler;
@@ -53,7 +54,7 @@ public void failedEvent(String uid, Event event) {}
5354
private DefaultEventSourceManager<R> eventSourceManager;
5455

5556
public DefaultEventHandler(ConfiguredController<R> controller) {
56-
this(controller.getConfiguration().getConfigurationService().getExecutorService(),
57+
this(ExecutorServiceManager.instance().executorService(),
5758
controller.getConfiguration().getName(),
5859
new EventDispatcher<>(controller),
5960
GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()));

0 commit comments

Comments
 (0)