From 8eb61710ea05e76dcafd880c4caa6fb58bbfc834 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 17 Sep 2021 19:01:15 +0200 Subject: [PATCH 1/8] feat: use only one, configurable ExecutorService Fixes #540 --- .../io/javaoperatorsdk/operator/Operator.java | 13 ++++ .../api/config/ConfigurationService.java | 5 ++ .../api/config/ExecutorServiceProducer.java | 26 ++++++++ .../processing/DefaultEventHandler.java | 63 +++++++------------ .../processing/ExecutionConsumer.java | 25 -------- .../event/DefaultEventSourceManager.java | 4 -- 6 files changed, 67 insertions(+), 69 deletions(-) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java delete mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 63df0a79c3..dc6c2220e5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -6,6 +6,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +103,18 @@ public void close() { log.info( "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); + try { + log.debug("Closing executor"); + final var executor = configurationService.getExecutorService(); + executor.shutdown(); + if (!executor.awaitTermination(configurationService.getTerminationTimeoutSeconds(), + TimeUnit.SECONDS)) { + executor.shutdownNow(); // if we timed out, waiting, cancel everything + } + } catch (InterruptedException e) { + log.debug("Exception closing executor: {}", e.getLocalizedMessage()); + } + controllers.close(); k8sClient.close(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index d6075e75e5..c034f6bf03 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -8,6 +8,7 @@ import io.javaoperatorsdk.operator.api.ResourceController; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.concurrent.ExecutorService; /** An interface from which to retrieve configuration information. */ public interface ConfigurationService { @@ -102,4 +103,8 @@ default int getTerminationTimeoutSeconds() { default Metrics getMetrics() { return Metrics.NOOP; } + + default ExecutorService getExecutorService() { + return ExecutorServiceProducer.getExecutor(concurrentReconciliationThreads()); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java new file mode 100644 index 0000000000..c3edfb2931 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java @@ -0,0 +1,26 @@ +package io.javaoperatorsdk.operator.api.config; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicReference; + +class ExecutorServiceProducer { + + private final static AtomicReference executor = + new AtomicReference<>(); + + static ExecutorService getExecutor(int threadPoolSize) { + final var gotSet = + executor.compareAndSet(null, new ScheduledThreadPoolExecutor(threadPoolSize)); + final var result = executor.get(); + if (!gotSet) { + // check that we didn't try to change the pool size + if (result.getCorePoolSize() != threadPoolSize) { + throw new IllegalArgumentException( + "Cannot change the ExecutorService's thread pool size once set! Was " + + result.getCorePoolSize() + ", attempted to retrieve " + threadPoolSize); + } + } + return result; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index d2ef3d3416..2527ce294a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -44,61 +45,37 @@ public void failedEvent(String uid, Event event) {} private final EventBuffer eventBuffer; private final Set underProcessing = new HashSet<>(); - private final ScheduledThreadPoolExecutor executor; private final EventDispatcher eventDispatcher; private final Retry retry; private final Map retryState = new HashMap<>(); + private final ExecutorService executor; private final String controllerName; - private final int terminationTimeout; private final ReentrantLock lock = new ReentrantLock(); private DefaultEventSourceManager eventSourceManager; public DefaultEventHandler(ConfiguredController controller) { - this( - new EventDispatcher<>(controller), + this(controller.getConfiguration().getConfigurationService().getExecutorService(), controller.getConfiguration().getName(), - GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()), - controller.getConfiguration().getConfigurationService().concurrentReconciliationThreads(), - controller.getConfiguration().getConfigurationService().getTerminationTimeoutSeconds()); + new EventDispatcher<>(controller), + GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration())); } - DefaultEventHandler(EventDispatcher dispatcher, String relatedControllerName, Retry retry) { - this( - dispatcher, - relatedControllerName, - retry, - ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER, - ConfigurationService.DEFAULT_TERMINATION_TIMEOUT_SECONDS); + DefaultEventHandler(EventDispatcher eventDispatcher, String relatedControllerName, + Retry retry) { + this(null, relatedControllerName, eventDispatcher, retry); } - private DefaultEventHandler( - EventDispatcher eventDispatcher, - String relatedControllerName, - Retry retry, - int concurrentReconciliationThreads, - int terminationTimeout) { + private DefaultEventHandler(ExecutorService executor, String relatedControllerName, + EventDispatcher eventDispatcher, Retry retry) { + this.executor = + executor == null + ? new ScheduledThreadPoolExecutor( + ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER) + : executor; + this.controllerName = relatedControllerName; this.eventDispatcher = eventDispatcher; this.retry = retry; - this.controllerName = relatedControllerName; eventBuffer = new EventBuffer(); - this.terminationTimeout = terminationTimeout; - executor = - new ScheduledThreadPoolExecutor( - concurrentReconciliationThreads, - runnable -> new Thread(runnable, "EventHandler-" + relatedControllerName)); - } - - @Override - public void close() { - try { - log.debug("Closing handler for {}", controllerName); - executor.shutdown(); - if (!executor.awaitTermination(terminationTimeout, TimeUnit.SECONDS)) { - executor.shutdownNow(); // if we timed out, waiting, cancel everything - } - } catch (InterruptedException e) { - log.debug("Exception closing handler for {}: {}", controllerName, e.getLocalizedMessage()); - } } public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) { @@ -146,7 +123,13 @@ private void executeBufferedEvents(String customResourceUid) { latestCustomResource.get(), retryInfo(customResourceUid)); log.debug("Executing events for custom resource. Scope: {}", executionScope); - executor.execute(new ExecutionConsumer(executionScope, eventDispatcher, this)); + executor.execute(() -> { + // change thread name for easier debugging + Thread.currentThread().setName("EventHandler-" + controllerName); + PostExecutionControl postExecutionControl = + eventDispatcher.handleExecution(executionScope); + eventProcessingFinished(executionScope, postExecutionControl); + }); } else { log.debug( "Skipping executing controller for resource id: {}. Events in queue: {}." diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java deleted file mode 100644 index f648b82955..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java +++ /dev/null @@ -1,25 +0,0 @@ -package io.javaoperatorsdk.operator.processing; - -import io.fabric8.kubernetes.client.CustomResource; - -class ExecutionConsumer> implements Runnable { - - private final ExecutionScope executionScope; - private final EventDispatcher eventDispatcher; - private final DefaultEventHandler defaultEventHandler; - - ExecutionConsumer( - ExecutionScope executionScope, - EventDispatcher eventDispatcher, - DefaultEventHandler defaultEventHandler) { - this.executionScope = executionScope; - this.eventDispatcher = eventDispatcher; - this.defaultEventHandler = defaultEventHandler; - } - - @Override - public void run() { - PostExecutionControl postExecutionControl = eventDispatcher.handleExecution(executionScope); - defaultEventHandler.eventProcessingFinished(executionScope, postExecutionControl); - } -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index f71a4feb47..ff7ba37b15 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -51,10 +51,6 @@ public DefaultEventSourceManager(ConfiguredController controller) { new CustomResourceEventSource<>(controller)); } - public DefaultEventHandler getEventHandler() { - return defaultEventHandler; - } - @Override public void close() { try { From ba3edb92212438ded1e7c3d3829a53debdfc7799 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Sat, 18 Sep 2021 11:07:09 +0200 Subject: [PATCH 2/8] feat: introduce inner ControllerExecution to override toString This would allow easier diagnosis when the thread is rejected. --- .../api/config/ConfigurationService.java | 2 +- .../processing/DefaultEventHandler.java | 31 ++++++++++++++----- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index c034f6bf03..9ae50156c3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -1,6 +1,7 @@ package io.javaoperatorsdk.operator.api.config; import java.util.Set; +import java.util.concurrent.ExecutorService; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.CustomResource; @@ -8,7 +9,6 @@ import io.javaoperatorsdk.operator.api.ResourceController; import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.concurrent.ExecutorService; /** An interface from which to retrieve configuration information. */ public interface ConfigurationService { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index 2527ce294a..00b4a2d972 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -7,7 +7,6 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; @@ -123,13 +122,7 @@ private void executeBufferedEvents(String customResourceUid) { latestCustomResource.get(), retryInfo(customResourceUid)); log.debug("Executing events for custom resource. Scope: {}", executionScope); - executor.execute(() -> { - // change thread name for easier debugging - Thread.currentThread().setName("EventHandler-" + controllerName); - PostExecutionControl postExecutionControl = - eventDispatcher.handleExecution(executionScope); - eventProcessingFinished(executionScope, postExecutionControl); - }); + executor.execute(new ControllerExecution(executionScope)); } else { log.debug( "Skipping executing controller for resource id: {}. Events in queue: {}." @@ -276,4 +269,26 @@ private void setUnderExecutionProcessing(String customResourceUid) { private void unsetUnderExecution(String customResourceUid) { underProcessing.remove(customResourceUid); } + + private class ControllerExecution implements Runnable { + private final ExecutionScope executionScope; + + private ControllerExecution(ExecutionScope executionScope) { + this.executionScope = executionScope; + } + + @Override + public void run() { + // change thread name for easier debugging + Thread.currentThread().setName("EventHandler-" + controllerName); + PostExecutionControl postExecutionControl = + eventDispatcher.handleExecution(executionScope); + eventProcessingFinished(executionScope, postExecutionControl); + } + + @Override + public String toString() { + return controllerName + " -> " + executionScope; + } + } } From 9a5b3950ac3d8bd5c4cdff8edaf40666f9accfe8 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 21 Sep 2021 18:15:48 +0200 Subject: [PATCH 3/8] feat: attempt to figure out why thread pool is getting terminated --- .github/workflows/pr.yml | 9 ++-- .../api/config/ExecutorServiceProducer.java | 45 +++++++++++++++++-- 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 44fffbdeba..015484678f 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -15,9 +15,12 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - java: [11, 16] - distribution: [ temurin, adopt-openj9 ] - kubernetes: [ 'v1.17.13','v1.18.20','v1.19.14','v1.20.10','v1.21.4', 'v1.22.1' ] + # java: [11, 16] + java: [ 11 ] + distribution: [ temurin ] + kubernetes: [ 'v1.22.1' ] + # distribution: [ temurin, adopt-openj9 ] + # kubernetes: [ 'v1.17.13','v1.18.20','v1.19.14','v1.20.10','v1.21.4', 'v1.22.1' ] steps: - uses: actions/checkout@v2 - name: Set up Java and Maven diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java index c3edfb2931..5852e964c8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java @@ -1,17 +1,23 @@ package io.javaoperatorsdk.operator.api.config; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; class ExecutorServiceProducer { - private final static AtomicReference executor = + private final static AtomicReference executor = new AtomicReference<>(); static ExecutorService getExecutor(int threadPoolSize) { final var gotSet = - executor.compareAndSet(null, new ScheduledThreadPoolExecutor(threadPoolSize)); + executor.compareAndSet(null, new DebugThreadPoolExecutor(threadPoolSize, threadPoolSize, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>())); final var result = executor.get(); if (!gotSet) { // check that we didn't try to change the pool size @@ -23,4 +29,37 @@ static ExecutorService getExecutor(int threadPoolSize) { } return result; } + + private static class DebugThreadPoolExecutor extends ThreadPoolExecutor { + + public DebugThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + public DebugThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, BlockingQueue workQueue, + ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public DebugThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, BlockingQueue workQueue, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } + + public DebugThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, BlockingQueue workQueue, + ThreadFactory threadFactory, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + @Override + public void shutdown() { + Thread.dumpStack(); + super.shutdown(); + } + } } From 5514f181ff08a9731feee0833c8edf8aebda12c4 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 21 Sep 2021 21:04:40 +0200 Subject: [PATCH 4/8] feat: make debugging thread pool configurable --- .../api/config/ExecutorServiceProducer.java | 38 +++++-------------- .../operator/api/config/Utils.java | 11 +++++- 2 files changed, 19 insertions(+), 30 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java index 5852e964c8..636bb34251 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java @@ -1,10 +1,7 @@ package io.javaoperatorsdk.operator.api.config; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -16,8 +13,7 @@ class ExecutorServiceProducer { static ExecutorService getExecutor(int threadPoolSize) { final var gotSet = - executor.compareAndSet(null, new DebugThreadPoolExecutor(threadPoolSize, threadPoolSize, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>())); + executor.compareAndSet(null, new InstrumentedExecutorService(threadPoolSize)); final var result = executor.get(); if (!gotSet) { // check that we didn't try to change the pool size @@ -30,35 +26,19 @@ static ExecutorService getExecutor(int threadPoolSize) { return result; } - private static class DebugThreadPoolExecutor extends ThreadPoolExecutor { + private static class InstrumentedExecutorService extends ThreadPoolExecutor { + private final boolean debug; - public DebugThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, - TimeUnit unit, - BlockingQueue workQueue) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); - } - - public DebugThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, - TimeUnit unit, BlockingQueue workQueue, - ThreadFactory threadFactory) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); - } - - public DebugThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, - TimeUnit unit, BlockingQueue workQueue, - RejectedExecutionHandler handler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); - } - - public DebugThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, - TimeUnit unit, BlockingQueue workQueue, - ThreadFactory threadFactory, RejectedExecutionHandler handler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + public InstrumentedExecutorService(int corePoolSize) { + super(corePoolSize, corePoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + debug = Utils.debugThreadPool(); } @Override public void shutdown() { - Thread.dumpStack(); + if (debug) { + Thread.dumpStack(); + } super.shutdown(); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java index ec1b6e3d72..1ea7285f89 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java @@ -13,6 +13,7 @@ public class Utils { private static final Logger log = LoggerFactory.getLogger(Utils.class); public static final String CHECK_CRD_ENV_KEY = "JAVA_OPERATOR_SDK_CHECK_CRD"; + public static final String DEBUG_THREAD_POOL_ENV_KEY = "JAVA_OPERATOR_SDK_DEBUG_THREAD_POOL"; /** * Attempts to load version information from a properties file produced at build time, currently @@ -60,7 +61,15 @@ public static boolean isValidateCustomResourcesEnvVarSet() { } public static boolean shouldCheckCRDAndValidateLocalModel() { - final var value = System.getProperty(CHECK_CRD_ENV_KEY); + return getBooleanEnvProperty(CHECK_CRD_ENV_KEY); + } + + private static boolean getBooleanEnvProperty(String envKey) { + final var value = System.getProperty(envKey); return value == null || Boolean.getBoolean(value); } + + public static boolean debugThreadPool() { + return getBooleanEnvProperty(DEBUG_THREAD_POOL_ENV_KEY); + } } From 5a01ff71d8729d040ff8962c783fbb1a310a74c8 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 21 Sep 2021 22:02:59 +0200 Subject: [PATCH 5/8] refactor: encapsulate executor service access via ExecutorServiceManager The goal is to make sure the thread pool is properly started and shut down at appropriate time, while getting it ready for instrumentation. This should also address the tests issue where the executor was shut down at the end of one test and never re-started at the beginning of the next one thus leading to tasks being rejected. --- .../io/javaoperatorsdk/operator/Operator.java | 16 +- .../api/config/ConfigurationService.java | 3 +- .../api/config/ExecutorServiceManager.java | 144 ++++++++++++++++++ .../api/config/ExecutorServiceProducer.java | 45 ------ .../processing/DefaultEventHandler.java | 3 +- 5 files changed, 151 insertions(+), 60 deletions(-) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java delete mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index dc6c2220e5..a04f59c371 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -6,7 +6,6 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,6 +16,7 @@ import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.processing.ConfiguredController; import io.javaoperatorsdk.operator.processing.DefaultEventHandler; import io.javaoperatorsdk.operator.processing.DefaultEventHandler.EventMonitor; @@ -94,6 +94,7 @@ public void start() { throw new OperatorException(error, e); } + ExecutorServiceManager.start(configurationService); controllers.start(); } @@ -103,20 +104,9 @@ public void close() { log.info( "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); - try { - log.debug("Closing executor"); - final var executor = configurationService.getExecutorService(); - executor.shutdown(); - if (!executor.awaitTermination(configurationService.getTerminationTimeoutSeconds(), - TimeUnit.SECONDS)) { - executor.shutdownNow(); // if we timed out, waiting, cancel everything - } - } catch (InterruptedException e) { - log.debug("Exception closing executor: {}", e.getLocalizedMessage()); - } - controllers.close(); + ExecutorServiceManager.instance().close(); k8sClient.close(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 9ae50156c3..cb16793f43 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -2,6 +2,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.CustomResource; @@ -105,6 +106,6 @@ default Metrics getMetrics() { } default ExecutorService getExecutorService() { - return ExecutorServiceProducer.getExecutor(concurrentReconciliationThreads()); + return Executors.newFixedThreadPool(concurrentReconciliationThreads()); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java new file mode 100644 index 0000000000..ed8ee92cb4 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -0,0 +1,144 @@ +package io.javaoperatorsdk.operator.api.config; + +import java.io.Closeable; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExecutorServiceManager implements Closeable { + private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class); + private static ExecutorServiceManager instance; + + private final ExecutorService executor; + private final int terminationTimeoutSeconds; + + private ExecutorServiceManager(ExecutorService executor, int terminationTimeoutSeconds) { + this.executor = executor; + this.terminationTimeoutSeconds = terminationTimeoutSeconds; + } + + public static void start(ConfigurationService configuration) { + if (instance == null) { + instance = new ExecutorServiceManager( + new InstrumentedExecutorService(configuration.getExecutorService()), + configuration.getTerminationTimeoutSeconds()); + } else { + log.debug("Already started, reusing already setup instance!"); + } + } + + public static ExecutorServiceManager instance() { + if (instance == null) { + throw new IllegalStateException( + "ExecutorServiceManager hasn't been started. Call start method before using!"); + } + return instance; + } + + public ExecutorService executorService() { + return executor; + } + + @Override + public void close() { + try { + log.debug("Closing executor"); + executor.shutdown(); + if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { + executor.shutdownNow(); // if we timed out, waiting, cancel everything + } + } catch (InterruptedException e) { + log.debug("Exception closing executor: {}", e.getLocalizedMessage()); + } + } + + private static class InstrumentedExecutorService implements ExecutorService { + private final boolean debug; + private final ExecutorService executor; + + private InstrumentedExecutorService(ExecutorService executor) { + this.executor = executor; + debug = Utils.debugThreadPool(); + } + + @Override + public void shutdown() { + if (debug) { + Thread.dumpStack(); + } + executor.shutdown(); + } + + @Override + public List shutdownNow() { + return executor.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return executor.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executor.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executor.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return executor.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return executor.submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return executor.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException { + return executor.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, + TimeUnit unit) throws InterruptedException { + return executor.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + return executor.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return executor.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + executor.execute(command); + } + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java deleted file mode 100644 index 636bb34251..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceProducer.java +++ /dev/null @@ -1,45 +0,0 @@ -package io.javaoperatorsdk.operator.api.config; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -class ExecutorServiceProducer { - - private final static AtomicReference executor = - new AtomicReference<>(); - - static ExecutorService getExecutor(int threadPoolSize) { - final var gotSet = - executor.compareAndSet(null, new InstrumentedExecutorService(threadPoolSize)); - final var result = executor.get(); - if (!gotSet) { - // check that we didn't try to change the pool size - if (result.getCorePoolSize() != threadPoolSize) { - throw new IllegalArgumentException( - "Cannot change the ExecutorService's thread pool size once set! Was " - + result.getCorePoolSize() + ", attempted to retrieve " + threadPoolSize); - } - } - return result; - } - - private static class InstrumentedExecutorService extends ThreadPoolExecutor { - private final boolean debug; - - public InstrumentedExecutorService(int corePoolSize) { - super(corePoolSize, corePoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); - debug = Utils.debugThreadPool(); - } - - @Override - public void shutdown() { - if (debug) { - Thread.dumpStack(); - } - super.shutdown(); - } - } -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index 00b4a2d972..525293eee9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -16,6 +16,7 @@ import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.api.RetryInfo; import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; @@ -53,7 +54,7 @@ public void failedEvent(String uid, Event event) {} private DefaultEventSourceManager eventSourceManager; public DefaultEventHandler(ConfiguredController controller) { - this(controller.getConfiguration().getConfigurationService().getExecutorService(), + this(ExecutorServiceManager.instance().executorService(), controller.getConfiguration().getName(), new EventDispatcher<>(controller), GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration())); From 8a205dc17eb0038e5487c26c319ff39d74033bb1 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 21 Sep 2021 23:11:19 +0200 Subject: [PATCH 6/8] fix: thread pool needs to be re-created if we restart after a close call This is done by moving the close method to static and nulling the singleton so that it can be re-created on next Operator start if needed. This is needed for proper testing support. --- .../io/javaoperatorsdk/operator/Operator.java | 4 ++-- .../api/config/ExecutorServiceManager.java | 17 ++++++++++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index a04f59c371..834c516c68 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -94,7 +94,7 @@ public void start() { throw new OperatorException(error, e); } - ExecutorServiceManager.start(configurationService); + ExecutorServiceManager.init(configurationService); controllers.start(); } @@ -106,7 +106,7 @@ public void close() { controllers.close(); - ExecutorServiceManager.instance().close(); + ExecutorServiceManager.close(); k8sClient.close(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index ed8ee92cb4..682b004c3d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -1,6 +1,5 @@ package io.javaoperatorsdk.operator.api.config; -import java.io.Closeable; import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; @@ -13,7 +12,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ExecutorServiceManager implements Closeable { +public class ExecutorServiceManager { private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class); private static ExecutorServiceManager instance; @@ -25,7 +24,7 @@ private ExecutorServiceManager(ExecutorService executor, int terminationTimeoutS this.terminationTimeoutSeconds = terminationTimeoutSeconds; } - public static void start(ConfigurationService configuration) { + public static void init(ConfigurationService configuration) { if (instance == null) { instance = new ExecutorServiceManager( new InstrumentedExecutorService(configuration.getExecutorService()), @@ -35,6 +34,15 @@ public static void start(ConfigurationService configuration) { } } + public static void close() { + if (instance != null) { + instance.stop(); + } + // make sure that we remove the singleton so that the thread pool is re-created on next call to + // start + instance = null; + } + public static ExecutorServiceManager instance() { if (instance == null) { throw new IllegalStateException( @@ -47,8 +55,7 @@ public ExecutorService executorService() { return executor; } - @Override - public void close() { + private void stop() { try { log.debug("Closing executor"); executor.shutdown(); From da9f5e639356c6702662fae4ac6fb1ef0a436547 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 21 Sep 2021 23:26:56 +0200 Subject: [PATCH 7/8] feat: thread pool debug mode should be off by default --- .../io/javaoperatorsdk/operator/api/config/Utils.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java index 1ea7285f89..b36c0468cd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java @@ -61,15 +61,10 @@ public static boolean isValidateCustomResourcesEnvVarSet() { } public static boolean shouldCheckCRDAndValidateLocalModel() { - return getBooleanEnvProperty(CHECK_CRD_ENV_KEY); - } - - private static boolean getBooleanEnvProperty(String envKey) { - final var value = System.getProperty(envKey); - return value == null || Boolean.getBoolean(value); + return Boolean.getBoolean(System.getProperty(CHECK_CRD_ENV_KEY, "true")); } public static boolean debugThreadPool() { - return getBooleanEnvProperty(DEBUG_THREAD_POOL_ENV_KEY); + return Boolean.getBoolean(System.getProperty(DEBUG_THREAD_POOL_ENV_KEY, "false")); } } From d05087b1d44923a6a8e95cfee4f92d350c1e9e55 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 21 Sep 2021 23:29:17 +0200 Subject: [PATCH 8/8] fix: restore proper testing matrices (minus Java 16) --- .github/workflows/pr.yml | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 015484678f..2f031c070e 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -15,12 +15,9 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - # java: [11, 16] java: [ 11 ] - distribution: [ temurin ] - kubernetes: [ 'v1.22.1' ] - # distribution: [ temurin, adopt-openj9 ] - # kubernetes: [ 'v1.17.13','v1.18.20','v1.19.14','v1.20.10','v1.21.4', 'v1.22.1' ] + distribution: [ temurin, adopt-openj9 ] + kubernetes: [ 'v1.17.13','v1.18.20','v1.19.14','v1.20.10','v1.21.4', 'v1.22.1' ] steps: - uses: actions/checkout@v2 - name: Set up Java and Maven