Skip to content

feat: use only one, configurable ExecutorService #546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
java: [11, 16]
java: [ 11 ]
distribution: [ temurin, adopt-openj9 ]
kubernetes: [ 'v1.17.13','v1.18.20','v1.19.14','v1.20.10','v1.21.4', 'v1.22.1' ]
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.javaoperatorsdk.operator.api.ResourceController;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.processing.ConfiguredController;
import io.javaoperatorsdk.operator.processing.DefaultEventHandler;
import io.javaoperatorsdk.operator.processing.DefaultEventHandler.EventMonitor;
Expand Down Expand Up @@ -93,6 +94,7 @@ public void start() {
throw new OperatorException(error, e);
}

ExecutorServiceManager.init(configurationService);
controllers.start();
}

Expand All @@ -104,6 +106,7 @@ public void close() {

controllers.close();

ExecutorServiceManager.close();
k8sClient.close();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.javaoperatorsdk.operator.api.config;

import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.CustomResource;
Expand Down Expand Up @@ -102,4 +104,8 @@ default int getTerminationTimeoutSeconds() {
default Metrics getMetrics() {
return Metrics.NOOP;
}

default ExecutorService getExecutorService() {
return Executors.newFixedThreadPool(concurrentReconciliationThreads());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package io.javaoperatorsdk.operator.api.config;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutorServiceManager {
private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
private static ExecutorServiceManager instance;

private final ExecutorService executor;
private final int terminationTimeoutSeconds;

private ExecutorServiceManager(ExecutorService executor, int terminationTimeoutSeconds) {
this.executor = executor;
this.terminationTimeoutSeconds = terminationTimeoutSeconds;
}

public static void init(ConfigurationService configuration) {
if (instance == null) {
instance = new ExecutorServiceManager(
new InstrumentedExecutorService(configuration.getExecutorService()),
configuration.getTerminationTimeoutSeconds());
} else {
log.debug("Already started, reusing already setup instance!");
}
}

public static void close() {
if (instance != null) {
instance.stop();
}
// make sure that we remove the singleton so that the thread pool is re-created on next call to
// start
instance = null;
}

public static ExecutorServiceManager instance() {
if (instance == null) {
throw new IllegalStateException(
"ExecutorServiceManager hasn't been started. Call start method before using!");
}
return instance;
}

public ExecutorService executorService() {
return executor;
}

private void stop() {
try {
log.debug("Closing executor");
executor.shutdown();
if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
executor.shutdownNow(); // if we timed out, waiting, cancel everything
}
} catch (InterruptedException e) {
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
}
}

private static class InstrumentedExecutorService implements ExecutorService {
private final boolean debug;
private final ExecutorService executor;

private InstrumentedExecutorService(ExecutorService executor) {
this.executor = executor;
debug = Utils.debugThreadPool();
}

@Override
public void shutdown() {
if (debug) {
Thread.dumpStack();
}
executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
return executor.isShutdown();
}

@Override
public boolean isTerminated() {
return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return executor.submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return executor.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return executor.invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
TimeUnit unit) throws InterruptedException {
return executor.invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return executor.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return executor.invokeAny(tasks, timeout, unit);
}

@Override
public void execute(Runnable command) {
executor.execute(command);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class Utils {

private static final Logger log = LoggerFactory.getLogger(Utils.class);
public static final String CHECK_CRD_ENV_KEY = "JAVA_OPERATOR_SDK_CHECK_CRD";
public static final String DEBUG_THREAD_POOL_ENV_KEY = "JAVA_OPERATOR_SDK_DEBUG_THREAD_POOL";

/**
* Attempts to load version information from a properties file produced at build time, currently
Expand Down Expand Up @@ -60,7 +61,10 @@ public static boolean isValidateCustomResourcesEnvVarSet() {
}

public static boolean shouldCheckCRDAndValidateLocalModel() {
final var value = System.getProperty(CHECK_CRD_ENV_KEY);
return value == null || Boolean.getBoolean(value);
return Boolean.getBoolean(System.getProperty(CHECK_CRD_ENV_KEY, "true"));
}

public static boolean debugThreadPool() {
return Boolean.getBoolean(System.getProperty(DEBUG_THREAD_POOL_ENV_KEY, "false"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

Expand All @@ -16,6 +16,7 @@
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.RetryInfo;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
Expand Down Expand Up @@ -44,61 +45,37 @@ public void failedEvent(String uid, Event event) {}

private final EventBuffer eventBuffer;
private final Set<String> underProcessing = new HashSet<>();
private final ScheduledThreadPoolExecutor executor;
private final EventDispatcher<R> eventDispatcher;
private final Retry retry;
private final Map<String, RetryExecution> retryState = new HashMap<>();
private final ExecutorService executor;
private final String controllerName;
private final int terminationTimeout;
private final ReentrantLock lock = new ReentrantLock();
private DefaultEventSourceManager<R> eventSourceManager;

public DefaultEventHandler(ConfiguredController<R> controller) {
this(
new EventDispatcher<>(controller),
this(ExecutorServiceManager.instance().executorService(),
controller.getConfiguration().getName(),
GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()),
controller.getConfiguration().getConfigurationService().concurrentReconciliationThreads(),
controller.getConfiguration().getConfigurationService().getTerminationTimeoutSeconds());
new EventDispatcher<>(controller),
GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()));
}

DefaultEventHandler(EventDispatcher<R> dispatcher, String relatedControllerName, Retry retry) {
this(
dispatcher,
relatedControllerName,
retry,
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER,
ConfigurationService.DEFAULT_TERMINATION_TIMEOUT_SECONDS);
DefaultEventHandler(EventDispatcher<R> eventDispatcher, String relatedControllerName,
Retry retry) {
this(null, relatedControllerName, eventDispatcher, retry);
}

private DefaultEventHandler(
EventDispatcher<R> eventDispatcher,
String relatedControllerName,
Retry retry,
int concurrentReconciliationThreads,
int terminationTimeout) {
private DefaultEventHandler(ExecutorService executor, String relatedControllerName,
EventDispatcher<R> eventDispatcher, Retry retry) {
this.executor =
executor == null
? new ScheduledThreadPoolExecutor(
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER)
: executor;
this.controllerName = relatedControllerName;
this.eventDispatcher = eventDispatcher;
this.retry = retry;
this.controllerName = relatedControllerName;
eventBuffer = new EventBuffer();
this.terminationTimeout = terminationTimeout;
executor =
new ScheduledThreadPoolExecutor(
concurrentReconciliationThreads,
runnable -> new Thread(runnable, "EventHandler-" + relatedControllerName));
}

@Override
public void close() {
try {
log.debug("Closing handler for {}", controllerName);
executor.shutdown();
if (!executor.awaitTermination(terminationTimeout, TimeUnit.SECONDS)) {
executor.shutdownNow(); // if we timed out, waiting, cancel everything
}
} catch (InterruptedException e) {
log.debug("Exception closing handler for {}: {}", controllerName, e.getLocalizedMessage());
}
}

public void setEventSourceManager(DefaultEventSourceManager<R> eventSourceManager) {
Expand Down Expand Up @@ -146,7 +123,7 @@ private void executeBufferedEvents(String customResourceUid) {
latestCustomResource.get(),
retryInfo(customResourceUid));
log.debug("Executing events for custom resource. Scope: {}", executionScope);
executor.execute(new ExecutionConsumer(executionScope, eventDispatcher, this));
executor.execute(new ControllerExecution(executionScope));
} else {
log.debug(
"Skipping executing controller for resource id: {}. Events in queue: {}."
Expand Down Expand Up @@ -293,4 +270,26 @@ private void setUnderExecutionProcessing(String customResourceUid) {
private void unsetUnderExecution(String customResourceUid) {
underProcessing.remove(customResourceUid);
}

private class ControllerExecution implements Runnable {
private final ExecutionScope<R> executionScope;

private ControllerExecution(ExecutionScope<R> executionScope) {
this.executionScope = executionScope;
}

@Override
public void run() {
// change thread name for easier debugging
Thread.currentThread().setName("EventHandler-" + controllerName);
PostExecutionControl<R> postExecutionControl =
eventDispatcher.handleExecution(executionScope);
eventProcessingFinished(executionScope, postExecutionControl);
}

@Override
public String toString() {
return controllerName + " -> " + executionScope;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ public DefaultEventSourceManager(ConfiguredController<R> controller) {
new CustomResourceEventSource<>(controller));
}

public DefaultEventHandler<R> getEventHandler() {
return defaultEventHandler;
}

@Override
public void close() {
try {
Expand Down