Skip to content

Commit d3b317a

Browse files
authored
improve: caching executor for startup tasks and remove parallel() (#1755)
1 parent 2ce7703 commit d3b317a

File tree

6 files changed

+65
-23
lines changed

6 files changed

+65
-23
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
1010

11+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1112
import io.javaoperatorsdk.operator.processing.Controller;
1213

1314
/**
@@ -32,20 +33,27 @@ public synchronized void shouldStart() {
3233
}
3334

3435
public synchronized void start(boolean startEventProcessor) {
35-
controllers().parallelStream().forEach(c -> c.start(startEventProcessor));
36+
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> {
37+
c.start(startEventProcessor);
38+
return null;
39+
}, c -> "Controller Starter for: " + c.getConfiguration().getName());
3640
started = true;
3741
}
3842

3943
public synchronized void stop() {
40-
controllers().parallelStream().forEach(closeable -> {
41-
log.debug("closing {}", closeable);
42-
closeable.stop();
43-
});
44+
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> {
45+
log.debug("closing {}", c);
46+
c.stop();
47+
return null;
48+
}, c -> "Controller Stopper for: " + c.getConfiguration().getName());
4449
started = false;
4550
}
4651

4752
public synchronized void startEventProcessing() {
48-
controllers().parallelStream().forEach(Controller::startEventProcessing);
53+
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> {
54+
c.startEventProcessing();
55+
return null;
56+
}, c -> "Event processor starter for: " + c.getConfiguration().getName());
4957
}
5058

5159
@SuppressWarnings({"unchecked", "rawtypes"})

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public synchronized void start() {
104104
if (started) {
105105
return;
106106
}
107+
ExecutorServiceManager.init();
107108
controllerManager.shouldStart();
108109
final var version = ConfigurationServiceProvider.instance().getVersion();
109110
log.info(
@@ -114,7 +115,6 @@ public synchronized void start() {
114115

115116
final var clientVersion = Version.clientVersion();
116117
log.info("Client version: {}", clientVersion);
117-
ExecutorServiceManager.init();
118118
// first start the controller manager before leader election,
119119
// the leader election would start subsequently the processor if on
120120
controllerManager.start(!leaderElectionManager.isLeaderElectionEnabled());

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ public class ExecutorServiceManager {
2323
private static ExecutorServiceManager instance;
2424
private final ExecutorService executor;
2525
private final ExecutorService workflowExecutor;
26+
private final ExecutorService cachingExecutorService;
2627
private final int terminationTimeoutSeconds;
2728

2829
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
2930
int terminationTimeoutSeconds) {
31+
this.cachingExecutorService = Executors.newCachedThreadPool();
3032
this.executor = new InstrumentedExecutorService(executor);
3133
this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor);
3234
this.terminationTimeoutSeconds = terminationTimeoutSeconds;
@@ -49,7 +51,7 @@ public static void init() {
4951
}
5052
}
5153

52-
public synchronized static void stop() {
54+
public static synchronized void stop() {
5355
if (instance != null) {
5456
instance.doStop();
5557
}
@@ -66,13 +68,26 @@ public synchronized static ExecutorServiceManager instance() {
6668
return instance;
6769
}
6870

69-
public static <T> void executeAndWaitForAllToComplete(Stream<T> stream,
71+
/**
72+
* Uses cachingExecutorService from this manager. Use this only for tasks, that don't have dynamic
73+
* nature, in sense that won't grow with the number of inputs (thus kubernetes resources)
74+
*
75+
* @param stream of elements
76+
* @param task to call on stream elements
77+
* @param threadNamer for naming thread
78+
* @param <T> type
79+
*/
80+
public static <T> void boundedExecuteAndWaitForAllToComplete(Stream<T> stream,
7081
Function<T, Void> task, Function<T, String> threadNamer) {
71-
final var instrumented = new InstrumentedExecutorService(
72-
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
82+
executeAndWaitForAllToComplete(stream, task, threadNamer, instance().cachingExecutorService());
83+
}
7384

85+
public static <T> void executeAndWaitForAllToComplete(Stream<T> stream,
86+
Function<T, Void> task, Function<T, String> threadNamer,
87+
ExecutorService executorService) {
88+
final var instrumented = new InstrumentedExecutorService(executorService);
7489
try {
75-
instrumented.invokeAll(stream.parallel().map(item -> (Callable<Void>) () -> {
90+
instrumented.invokeAll(stream.map(item -> (Callable<Void>) () -> {
7691
// change thread name for easier debugging
7792
final var thread = Thread.currentThread();
7893
final var name = thread.getName();
@@ -91,11 +106,12 @@ public static <T> void executeAndWaitForAllToComplete(Stream<T> stream,
91106
} catch (ExecutionException e) {
92107
throw new OperatorException(e.getCause());
93108
} catch (InterruptedException e) {
109+
log.warn("Interrupted.", e);
94110
Thread.currentThread().interrupt();
95111
}
96112
});
97-
shutdown(instrumented);
98113
} catch (InterruptedException e) {
114+
log.warn("Interrupted.", e);
99115
Thread.currentThread().interrupt();
100116
}
101117
}
@@ -108,11 +124,16 @@ public ExecutorService workflowExecutorService() {
108124
return workflowExecutor;
109125
}
110126

127+
public ExecutorService cachingExecutorService() {
128+
return cachingExecutorService;
129+
}
130+
111131
private void doStop() {
112132
try {
113133
log.debug("Closing executor");
114134
shutdown(executor);
115135
shutdown(workflowExecutor);
136+
shutdown(cachingExecutorService);
116137
} catch (InterruptedException e) {
117138
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
118139
Thread.currentThread().interrupt();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
6565
public synchronized void start() {
6666
startEventSource(eventSources.namedControllerResourceEventSource());
6767

68-
ExecutorServiceManager.executeAndWaitForAllToComplete(
68+
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(
6969
eventSources.additionalNamedEventSources()
7070
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)),
7171
this::startEventSource,
7272
getThreadNamer("start"));
7373

74-
ExecutorServiceManager.executeAndWaitForAllToComplete(
74+
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(
7575
eventSources.additionalNamedEventSources()
7676
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)),
7777
this::startEventSource,
@@ -86,10 +86,14 @@ private static Function<NamedEventSource, String> getThreadNamer(String stage) {
8686
};
8787
}
8888

89+
private static Function<NamespaceChangeable, String> getEventSourceThreadNamer(String stage) {
90+
return es -> stage + " -> " + es;
91+
}
92+
8993
@Override
9094
public synchronized void stop() {
9195
stopEventSource(eventSources.namedControllerResourceEventSource());
92-
ExecutorServiceManager.executeAndWaitForAllToComplete(
96+
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(
9397
eventSources.additionalNamedEventSources(),
9498
this::stopEventSource,
9599
getThreadNamer("stop"));
@@ -181,13 +185,15 @@ public void broadcastOnResourceEvent(ResourceAction action, P resource, P oldRes
181185
public void changeNamespaces(Set<String> namespaces) {
182186
eventSources.controllerResourceEventSource()
183187
.changeNamespaces(namespaces);
184-
eventSources
188+
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(eventSources
185189
.additionalEventSources()
186190
.filter(NamespaceChangeable.class::isInstance)
187191
.map(NamespaceChangeable.class::cast)
188-
.filter(NamespaceChangeable::allowsNamespaceChanges)
189-
.parallel()
190-
.forEach(ies -> ies.changeNamespaces(namespaces));
192+
.filter(NamespaceChangeable::allowsNamespaceChanges), e -> {
193+
e.changeNamespaces(namespaces);
194+
return null;
195+
},
196+
getEventSourceThreadNamer("changeNamespace"));
191197
}
192198

193199
public Set<EventSource> getRegisteredEventSources() {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public InformerManager(MixedOperation<T, KubernetesResourceList<T>, Resource<T>>
5959
public void start() throws OperatorException {
6060
initSources();
6161
// make sure informers are all started before proceeding further
62-
ExecutorServiceManager.executeAndWaitForAllToComplete(sources.values().stream(),
62+
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(sources.values().stream(),
6363
iw -> {
6464
iw.start();
6565
return null;

operator-framework/src/test/java/io/javaoperatorsdk/operator/MultiVersionCRDIT.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.fasterxml.jackson.core.JsonProcessingException;
2424

2525
import static com.google.common.truth.Truth.assertThat;
26+
import static io.javaoperatorsdk.operator.api.config.ConfigurationService.log;
2627
import static org.awaitility.Awaitility.await;
2728

2829
class MultiVersionCRDIT {
@@ -72,9 +73,15 @@ public void onStop(SharedIndexInformer informer, Throwable ex) {
7273
acceptOnlyIfUnsetOrEqualToAlreadySet(errorMessage, watcherEx.getCause().getMessage());
7374
}
7475
final var apiTypeClass = informer.getApiTypeClass();
76+
77+
log.debug("Current resourceClassName: " + resourceClassName);
78+
7579
resourceClassName =
7680
acceptOnlyIfUnsetOrEqualToAlreadySet(resourceClassName, apiTypeClass.getName());
77-
System.out.println("Informer for " + HasMetadata.getFullResourceName(apiTypeClass)
81+
82+
log.debug("API Type Class: " + apiTypeClass.getName()
83+
+ " - resource class name: " + resourceClassName);
84+
log.info("Informer for " + HasMetadata.getFullResourceName(apiTypeClass)
7885
+ " stopped due to: " + ex.getMessage());
7986
}
8087

@@ -132,7 +139,7 @@ void invalidEventsShouldStopInformerAndCallInformerStoppedHandler() {
132139
operator.create(v1res);
133140

134141
await()
135-
.atMost(Duration.ofSeconds(1))
142+
.atMost(Duration.ofSeconds(10))
136143
.pollInterval(Duration.ofMillis(50))
137144
.untilAsserted(() -> {
138145
// v1 is the stored version so trying to create a v2 version should fail because we cannot

0 commit comments

Comments
 (0)