From 78c78433bc39346c0afcf03898211d6ff07f5abc Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 20 Apr 2021 18:10:30 +0200 Subject: [PATCH 1/2] feat: make reconciliation thread number configurable Fixes #399. --- .../io/javaoperatorsdk/operator/Operator.java | 7 ++++++- .../api/config/ConfigurationService.java | 11 ++++++++++ .../processing/DefaultEventHandler.java | 20 +++++++++++++++++-- .../starter/OperatorAutoConfiguration.java | 5 +++++ .../OperatorConfigurationProperties.java | 10 ++++++++++ 5 files changed, 50 insertions(+), 3 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 157f44050b..aa48359501 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -141,7 +141,12 @@ public void register( CustomResourceCache customResourceCache = new CustomResourceCache(objectMapper); DefaultEventHandler defaultEventHandler = - new DefaultEventHandler(customResourceCache, dispatcher, controllerName, retry); + new DefaultEventHandler( + customResourceCache, + dispatcher, + controllerName, + retry, + configurationService.concurrentReconciliationThreads()); DefaultEventSourceManager eventSourceManager = new DefaultEventSourceManager(defaultEventHandler, retry != null); defaultEventHandler.setEventSourceManager(eventSourceManager); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 1fc5ec610d..bd6bd88ffd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -55,4 +55,15 @@ default Config getClientConfiguration() { default boolean checkCRDAndValidateLocalModel() { return true; } + + int DEFAULT_RECONCILIATION_THREADS_NUMBER = 5; + /** + * Retrieves the maximum number of threads the operator can spin out to dispatch reconciliation + * requests to controllers + * + * @return the maximum number of concurrent reconciliation threads + */ + default int concurrentReconciliationThreads() { + return DEFAULT_RECONCILIATION_THREADS_NUMBER; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index 92fa88a375..a638816969 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -6,6 +6,7 @@ import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.api.RetryInfo; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; @@ -44,14 +45,29 @@ public DefaultEventHandler( CustomResourceCache customResourceCache, EventDispatcher eventDispatcher, String relatedControllerName, - Retry retry) { + Retry retry, + int concurrentReconciliationThreads) { this.customResourceCache = customResourceCache; this.eventDispatcher = eventDispatcher; this.retry = retry; eventBuffer = new EventBuffer(); executor = new ScheduledThreadPoolExecutor( - 5, runnable -> new Thread(runnable, "EventHandler-" + relatedControllerName)); + concurrentReconciliationThreads, + runnable -> new Thread(runnable, "EventHandler-" + relatedControllerName)); + } + + public DefaultEventHandler( + CustomResourceCache customResourceCache, + EventDispatcher eventDispatcher, + String relatedControllerName, + Retry retry) { + this( + customResourceCache, + eventDispatcher, + relatedControllerName, + retry, + ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER); } public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) { diff --git a/operator-framework-spring-boot-starter/src/main/java/io/javaoperatorsdk/operator/springboot/starter/OperatorAutoConfiguration.java b/operator-framework-spring-boot-starter/src/main/java/io/javaoperatorsdk/operator/springboot/starter/OperatorAutoConfiguration.java index 9005d2bd79..946f332e99 100644 --- a/operator-framework-spring-boot-starter/src/main/java/io/javaoperatorsdk/operator/springboot/starter/OperatorAutoConfiguration.java +++ b/operator-framework-spring-boot-starter/src/main/java/io/javaoperatorsdk/operator/springboot/starter/OperatorAutoConfiguration.java @@ -134,4 +134,9 @@ public RetryConfiguration getRetryConfiguration() { .orElse(RetryConfiguration.DEFAULT); } } + + @Override + public int concurrentReconciliationThreads() { + return configuration.getConcurrentReconciliationThreads(); + } } diff --git a/operator-framework-spring-boot-starter/src/main/java/io/javaoperatorsdk/operator/springboot/starter/OperatorConfigurationProperties.java b/operator-framework-spring-boot-starter/src/main/java/io/javaoperatorsdk/operator/springboot/starter/OperatorConfigurationProperties.java index 4edb501777..a524e641fb 100644 --- a/operator-framework-spring-boot-starter/src/main/java/io/javaoperatorsdk/operator/springboot/starter/OperatorConfigurationProperties.java +++ b/operator-framework-spring-boot-starter/src/main/java/io/javaoperatorsdk/operator/springboot/starter/OperatorConfigurationProperties.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.springboot.starter; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; import java.util.Collections; import java.util.Map; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -10,6 +11,7 @@ public class OperatorConfigurationProperties { private KubernetesClientProperties client = new KubernetesClientProperties(); private Map controllers = Collections.emptyMap(); private boolean checkCrdAndValidateLocalModel = true; + private int concurrentReconciliationThreads = ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER; public KubernetesClientProperties getClient() { return client; @@ -34,4 +36,12 @@ public boolean getCheckCrdAndValidateLocalModel() { public void setCheckCrdAndValidateLocalModel(boolean checkCrdAndValidateLocalModel) { this.checkCrdAndValidateLocalModel = checkCrdAndValidateLocalModel; } + + public int getConcurrentReconciliationThreads() { + return concurrentReconciliationThreads; + } + + public void setConcurrentReconciliationThreads(int concurrentReconciliationThreads) { + this.concurrentReconciliationThreads = concurrentReconciliationThreads; + } } From 6e76ab0f6db44bf2d1e78b15c0129019e72a5f1b Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 20 Apr 2021 18:35:38 +0200 Subject: [PATCH 2/2] fix: format --- .../springboot/starter/OperatorConfigurationProperties.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/operator-framework-spring-boot-starter/src/main/java/io/javaoperatorsdk/operator/springboot/starter/OperatorConfigurationProperties.java b/operator-framework-spring-boot-starter/src/main/java/io/javaoperatorsdk/operator/springboot/starter/OperatorConfigurationProperties.java index a524e641fb..3591b433c4 100644 --- a/operator-framework-spring-boot-starter/src/main/java/io/javaoperatorsdk/operator/springboot/starter/OperatorConfigurationProperties.java +++ b/operator-framework-spring-boot-starter/src/main/java/io/javaoperatorsdk/operator/springboot/starter/OperatorConfigurationProperties.java @@ -11,7 +11,8 @@ public class OperatorConfigurationProperties { private KubernetesClientProperties client = new KubernetesClientProperties(); private Map controllers = Collections.emptyMap(); private boolean checkCrdAndValidateLocalModel = true; - private int concurrentReconciliationThreads = ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER; + private int concurrentReconciliationThreads = + ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER; public KubernetesClientProperties getClient() { return client;