diff --git a/docs/documentation/features.md b/docs/documentation/features.md index 7a604dc14e..91778ee275 100644 --- a/docs/documentation/features.md +++ b/docs/documentation/features.md @@ -324,6 +324,41 @@ intersections: will still happen, but won't reset the retry, will be still marked as the last attempt in the retry info. The point (1) still holds, but in case of an error, no retry will happen. +## Rate Limiting + +It is possible to rate limit reconciliation on a per-resource basis. The rate limit also takes +precedence over retry/re-schedule configurations: for example, even if a retry was scheduled for +the next second but this request would make the resource go over its rate limit, the next +reconciliation will be postponed according to the rate limiting rules. Note that the +reconciliation is never cancelled, it will just be executed as early as possible based on rate +limitations. + +Rate limiting is by default turned **off**, since correct configuration depends on the reconciler +implementation, in particular, on how long a typical reconciliation takes. +(The parallelism of reconciliation itself can be +limited [`ConfigurationService`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L120-L120) +by configuring the `ExecutorService` appropriately.) + +A default rate limiter implementation is provided, see: +[`PeriodRateLimiter`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java#L14-L14) +. +Users can override it by implementing their own +[`RateLimiter`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java) +. + +To configure the default rate limiter use `@ControllerConfiguration` annotation. The following +configuration limits +each resource to reconcile at most twice within a 3 second interval: + +`@ControllerConfiguration(rateLimit = @RateLimit(limitForPeriod = 2,refreshPeriod = 3,refreshPeriodTimeUnit = TimeUnit.SECONDS))` +. + +Thus, if a given resource was reconciled twice in one second, no further reconciliation for this +resource will happen before two seconds have elapsed. Note that, since rate is limited on a +per-resource basis, other resources can still be reconciled at the same time, as long, of course, +that they stay within their own rate limits. + + ## Handling Related Events with Event Sources See also this [blog post](https://csviri.medium.com/java-operator-sdk-introduction-to-event-sources-a1aab5af4b7b). diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java index 3cee86802c..560a79e5be 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java @@ -27,6 +27,8 @@ import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResourceConfig; import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition; +import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters; import io.javaoperatorsdk.operator.processing.event.source.filter.VoidGenericFilter; @@ -150,6 +152,17 @@ public Optional reconciliationMaxInterval() { } } + @Override + public RateLimiter getRateLimiter() { + if (annotation.rateLimit() != null) { + return new PeriodRateLimiter(Duration.of(annotation.rateLimit().refreshPeriod(), + annotation.rateLimit().refreshPeriodTimeUnit().toChronoUnit()), + annotation.rateLimit().limitForPeriod()); + } else { + return io.javaoperatorsdk.operator.api.config.ControllerConfiguration.super.getRateLimiter(); + } + } + @Override @SuppressWarnings("unchecked") public Optional> onAddFilter() { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java index 60d47d2b47..339e06f894 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java @@ -8,6 +8,8 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; +import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; @@ -15,6 +17,8 @@ public interface ControllerConfiguration extends ResourceConfiguration { + RateLimiter DEFAULT_RATE_LIMITER = new PeriodRateLimiter(); + default String getName() { return ReconcilerUtils.getDefaultReconcilerName(getAssociatedReconcilerClassName()); } @@ -43,6 +47,10 @@ default RetryConfiguration getRetryConfiguration() { return RetryConfiguration.DEFAULT; } + default RateLimiter getRateLimiter() { + return DEFAULT_RATE_LIMITER; + } + /** * Allow controllers to filter events before they are passed to the * {@link io.javaoperatorsdk.operator.processing.event.EventHandler}. diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java index dc579bb67f..440a076428 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java @@ -13,6 +13,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResourceConfig; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; import io.javaoperatorsdk.operator.processing.retry.Retry; @@ -35,6 +36,7 @@ public class ControllerConfigurationOverrider { private Predicate onAddFilter; private BiPredicate onUpdateFilter; private Predicate genericFilter; + private RateLimiter rateLimiter; private ControllerConfigurationOverrider(ControllerConfiguration original) { finalizer = original.getFinalizerName(); @@ -52,6 +54,7 @@ private ControllerConfigurationOverrider(ControllerConfiguration original) { this.genericFilter = original.genericFilter().orElse(null); dependentResources.forEach(drs -> namedDependentResourceSpecs.put(drs.getName(), drs)); this.original = original; + this.rateLimiter = original.getRateLimiter(); } public ControllerConfigurationOverrider withFinalizer(String finalizer) { @@ -114,6 +117,11 @@ public ControllerConfigurationOverrider withRetry(RetryConfiguration retry) { return this; } + public ControllerConfigurationOverrider withRateLimiter(RateLimiter rateLimiter) { + this.rateLimiter = rateLimiter; + return this; + } + public ControllerConfigurationOverrider withLabelSelector(String labelSelector) { this.labelSelector = labelSelector; return this; @@ -196,6 +204,7 @@ public ControllerConfiguration build() { onAddFilter, onUpdateFilter, genericFilter, + rateLimiter, newDependentSpecs); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java index b68c5e8f4f..9cc6fd1608 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java @@ -10,6 +10,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; import io.javaoperatorsdk.operator.processing.retry.Retry; @@ -27,6 +28,7 @@ public class DefaultControllerConfiguration private final ResourceEventFilter resourceEventFilter; private final List dependents; private final Duration reconciliationMaxInterval; + private final RateLimiter rateLimiter; // NOSONAR constructor is meant to provide all information public DefaultControllerConfiguration( @@ -44,6 +46,7 @@ public DefaultControllerConfiguration( Predicate onAddFilter, BiPredicate onUpdateFilter, Predicate genericFilter, + RateLimiter rateLimiter, List dependents) { super(labelSelector, resourceClass, onAddFilter, onUpdateFilter, genericFilter, namespaces); this.associatedControllerClassName = associatedControllerClassName; @@ -57,7 +60,7 @@ public DefaultControllerConfiguration( ? ControllerConfiguration.super.getRetry() : retry; this.resourceEventFilter = resourceEventFilter; - + this.rateLimiter = rateLimiter; this.dependents = dependents != null ? dependents : Collections.emptyList(); } @@ -105,4 +108,9 @@ public List getDependentResources() { public Optional reconciliationMaxInterval() { return Optional.ofNullable(reconciliationMaxInterval); } + + @Override + public RateLimiter getRateLimiter() { + return rateLimiter; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java index 4c6621bb57..3d86c74779 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java @@ -91,6 +91,9 @@ ReconciliationMaxInterval reconciliationMaxInterval() default @ReconciliationMaxInterval( interval = 10); + + RateLimit rateLimit() default @RateLimit; + /** * Optional list of {@link Dependent} configurations which associate a resource type to a * {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} implementation diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimit.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimit.java new file mode 100644 index 0000000000..6c4feb2b2a --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimit.java @@ -0,0 +1,23 @@ +package io.javaoperatorsdk.operator.api.reconciler; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.concurrent.TimeUnit; + +import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface RateLimit { + + int limitForPeriod() default PeriodRateLimiter.NO_LIMIT_PERIOD; + + int refreshPeriod() default PeriodRateLimiter.DEFAULT_REFRESH_PERIOD_SECONDS; + + /** + * @return time unit for max delay between reconciliations + */ + TimeUnit refreshPeriodTimeUnit() default TimeUnit.SECONDS; +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index b87f43f43d..2580a1318e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.processing.event; +import java.time.Duration; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -20,6 +21,7 @@ import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.MDCUtils; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.Cache; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -32,7 +34,9 @@ class EventProcessor implements EventHandler, LifecycleAware { private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); + private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50; + private volatile boolean running; private final Set underProcessing = new HashSet<>(); private final ReconciliationDispatcher reconciliationDispatcher; private final Retry retry; @@ -40,10 +44,10 @@ class EventProcessor implements EventHandler, LifecycleAw private final ExecutorService executor; private final String controllerName; private final Metrics metrics; - private volatile boolean running; private final Cache cache; private final EventSourceManager eventSourceManager; private final EventMarker eventMarker = new EventMarker(); + private final RateLimiter rateLimiter; EventProcessor(EventSourceManager eventSourceManager) { this( @@ -53,6 +57,7 @@ class EventProcessor implements EventHandler, LifecycleAw new ReconciliationDispatcher<>(eventSourceManager.getController()), eventSourceManager.getController().getConfiguration().getRetry(), ConfigurationServiceProvider.instance().getMetrics(), + eventSourceManager.getController().getConfiguration().getRateLimiter(), eventSourceManager); } @@ -61,6 +66,7 @@ class EventProcessor implements EventHandler, LifecycleAw EventSourceManager eventSourceManager, String relatedControllerName, Retry retry, + RateLimiter rateLimiter, Metrics metrics) { this( eventSourceManager.getControllerResourceEventSource(), @@ -69,6 +75,7 @@ class EventProcessor implements EventHandler, LifecycleAw reconciliationDispatcher, retry, metrics, + rateLimiter, eventSourceManager); } @@ -79,6 +86,7 @@ private EventProcessor( ReconciliationDispatcher reconciliationDispatcher, Retry retry, Metrics metrics, + RateLimiter rateLimiter, EventSourceManager eventSourceManager) { this.running = false; this.executor = @@ -92,6 +100,7 @@ private EventProcessor( this.cache = cache; this.metrics = metrics != null ? metrics : Metrics.NOOP; this.eventSourceManager = eventSourceManager; + this.rateLimiter = rateLimiter; } @Override @@ -128,6 +137,11 @@ private void submitReconciliationExecution(ResourceID resourceID) { Optional latest = cache.get(resourceID); latest.ifPresent(MDCUtils::addResourceInfo); if (!controllerUnderExecution && latest.isPresent()) { + var rateLimiterPermission = rateLimiter.acquirePermission(resourceID); + if (rateLimiterPermission.isPresent()) { + handleRateLimitedSubmission(resourceID, rateLimiterPermission.get()); + return; + } setUnderExecutionProcessing(resourceID); final var retryInfo = retryInfo(resourceID); ExecutionScope executionScope = new ExecutionScope<>(latest.get(), retryInfo); @@ -193,6 +207,14 @@ private boolean isResourceMarkedForDeletion(ResourceEvent resourceEvent) { return resourceEvent.getResource().map(HasMetadata::isMarkedForDeletion).orElse(false); } + private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimalDuration) { + var minimalDurationMillis = minimalDuration.toMillis(); + log.debug("Rate limited resource: {}, rescheduled in {} millis", resourceID, + minimalDurationMillis); + retryEventSource().scheduleOnce(resourceID, + Math.max(minimalDurationMillis, MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION)); + } + private RetryInfo retryInfo(ResourceID resourceID) { return retryState.get(resourceID); } @@ -251,11 +273,10 @@ private void reScheduleExecutionIfInstructed( postExecutionControl .getReScheduleDelay() .ifPresent(delay -> { - if (log.isDebugEnabled()) { - log.debug("ReScheduling event for resource: {} with delay: {}", - ResourceID.fromResource(customResource), delay); - } - retryEventSource().scheduleOnce(customResource, delay); + var resourceID = ResourceID.fromResource(customResource); + log.debug("ReScheduling event for resource: {} with delay: {}", + resourceID, delay); + retryEventSource().scheduleOnce(resourceID, delay); }); } @@ -289,7 +310,7 @@ private void handleRetryOnException( delay, resourceID); metrics.failedReconciliation(resourceID, exception); - retryEventSource().scheduleOnce(executionScope.getResource(), delay); + retryEventSource().scheduleOnce(resourceID, delay); }, () -> log.error("Exhausted retries for {}", executionScope)); } @@ -315,6 +336,7 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) private void cleanupForDeletedEvent(ResourceID resourceID) { log.debug("Cleaning up for delete event for: {}", resourceID); eventMarker.cleanup(resourceID); + rateLimiter.clear(resourceID); metrics.cleanupDoneFor(resourceID); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java new file mode 100644 index 0000000000..caa6075ee7 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java @@ -0,0 +1,61 @@ +package io.javaoperatorsdk.operator.processing.event.rate; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +/** + * A Simple rate limiter that limits the number of permission for a time interval. + */ +public class PeriodRateLimiter implements RateLimiter { + + public static final int DEFAULT_REFRESH_PERIOD_SECONDS = 10; + public static final int DEFAULT_LIMIT_FOR_PERIOD = 3; + public static final Duration DEFAULT_REFRESH_PERIOD = + Duration.ofSeconds(DEFAULT_REFRESH_PERIOD_SECONDS); + + /** To turn off rate limiting set limit fod period to a non-positive number */ + public static final int NO_LIMIT_PERIOD = -1; + + private Duration refreshPeriod; + private int limitForPeriod; + + private Map limitData = new HashMap<>(); + + public PeriodRateLimiter() { + this(DEFAULT_REFRESH_PERIOD, DEFAULT_LIMIT_FOR_PERIOD); + } + + public PeriodRateLimiter(Duration refreshPeriod, int limitForPeriod) { + this.refreshPeriod = refreshPeriod; + this.limitForPeriod = limitForPeriod; + } + + @Override + public Optional acquirePermission(ResourceID resourceID) { + if (limitForPeriod <= 0) { + return Optional.empty(); + } + var actualState = limitData.computeIfAbsent(resourceID, r -> RateState.initialState()); + if (actualState.getCount() < limitForPeriod) { + actualState.increaseCount(); + return Optional.empty(); + } else if (actualState.getLastRefreshTime() + .isBefore(LocalDateTime.now().minus(refreshPeriod))) { + actualState.reset(); + actualState.increaseCount(); + return Optional.empty(); + } else { + return Optional.of(Duration.between(actualState.getLastRefreshTime(), LocalDateTime.now())); + } + } + + @Override + public void clear(ResourceID resourceID) { + limitData.remove(resourceID); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java new file mode 100644 index 0000000000..7281315e98 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java @@ -0,0 +1,22 @@ +package io.javaoperatorsdk.operator.processing.event.rate; + +import java.time.Duration; +import java.util.Optional; + +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public interface RateLimiter { + + /** + * @param resourceID id of the resource + * @return empty if permission acquired or minimal duration until a permission could be acquired + * again + */ + Optional acquirePermission(ResourceID resourceID); + + /** + * Cleanup state. Called when resource is deleted. + */ + void clear(ResourceID resourceID); + +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateState.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateState.java new file mode 100644 index 0000000000..f1302b077d --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateState.java @@ -0,0 +1,35 @@ +package io.javaoperatorsdk.operator.processing.event.rate; + +import java.time.LocalDateTime; + +class RateState { + + private LocalDateTime lastRefreshTime; + private int count; + + public static RateState initialState() { + return new RateState(LocalDateTime.now(), 0); + } + + RateState(LocalDateTime lastRefreshTime, int count) { + this.lastRefreshTime = lastRefreshTime; + this.count = count; + } + + public void increaseCount() { + count = count + 1; + } + + public void reset() { + lastRefreshTime = LocalDateTime.now(); + count = 0; + } + + public LocalDateTime getLastRefreshTime() { + return lastRefreshTime; + } + + public int getCount() { + return count; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java index 28acfdc3ac..f22400453a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java @@ -24,17 +24,20 @@ public class TimerEventSource private final AtomicBoolean running = new AtomicBoolean(); private final Map onceTasks = new ConcurrentHashMap<>(); - public void scheduleOnce(R resource, long delay) { + scheduleOnce(ResourceID.fromResource(resource), delay); + } + + public void scheduleOnce(ResourceID resourceID, long delay) { if (!running.get()) { throw new IllegalStateException("The TimerEventSource is not running"); } - ResourceID resourceUid = ResourceID.fromResource(resource); - if (onceTasks.containsKey(resourceUid)) { - cancelOnceSchedule(resourceUid); + + if (onceTasks.containsKey(resourceID)) { + cancelOnceSchedule(resourceID); } - EventProducerTimeTask task = new EventProducerTimeTask(resourceUid); - onceTasks.put(resourceUid, task); + EventProducerTimeTask task = new EventProducerTimeTask(resourceID); + onceTasks.put(resourceID, task); timer.schedule(task, delay); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java index 6c7358d86e..b328ace132 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java @@ -63,7 +63,7 @@ private static class TestControllerConfiguration public TestControllerConfiguration(Reconciler controller, Class crClass) { super(null, getControllerName(controller), CustomResource.getCRDName(crClass), null, false, null, null, null, null, crClass, - null, null, null, null, null); + null, null, null, null, null, null); this.controller = controller; } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index dc1c32aeda..788325ebe8 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -17,6 +17,8 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.RetryConfiguration; import io.javaoperatorsdk.operator.api.monitoring.Metrics; +import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -30,8 +32,19 @@ import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.*; - +import static org.mockito.Mockito.after; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings({"rawtypes", "unchecked"}) class EventProcessorTest { private static final Logger log = LoggerFactory.getLogger(EventProcessorTest.class); @@ -40,15 +53,16 @@ class EventProcessorTest { public static final int SEPARATE_EXECUTION_TIMEOUT = 450; public static final String TEST_NAMESPACE = "default-event-handler-test"; - private ReconciliationDispatcher reconciliationDispatcherMock = + private final ReconciliationDispatcher reconciliationDispatcherMock = mock(ReconciliationDispatcher.class); - private EventSourceManager eventSourceManagerMock = mock(EventSourceManager.class); - private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); - private ControllerResourceEventSource controllerResourceEventSourceMock = + private final EventSourceManager eventSourceManagerMock = mock(EventSourceManager.class); + private final TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); + private final ControllerResourceEventSource controllerResourceEventSourceMock = mock(ControllerResourceEventSource.class); - private Metrics metricsMock = mock(Metrics.class); + private final Metrics metricsMock = mock(Metrics.class); private EventProcessor eventProcessor; private EventProcessor eventProcessorWithRetry; + private final RateLimiter rateLimiterMock = mock(RateLimiter.class); @BeforeEach void setup() { @@ -56,14 +70,15 @@ void setup() { .thenReturn(controllerResourceEventSourceMock); eventProcessor = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, - null)); + rateLimiterMock, null)); eventProcessor.start(); eventProcessorWithRetry = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", - GenericRetry.defaultLimitedExponentialRetry(), null)); + GenericRetry.defaultLimitedExponentialRetry(), rateLimiterMock, null)); eventProcessorWithRetry.start(); when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock); when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); + when(rateLimiterMock.acquirePermission(any())).thenReturn(Optional.empty()); } @Test @@ -85,7 +100,7 @@ void skipProcessingIfLatestCustomResourceNotInCache() { } @Test - void ifExecutionInProgressWaitsUntilItsFinished() throws InterruptedException { + void ifExecutionInProgressWaitsUntilItsFinished() { ResourceID resourceUid = eventAlreadyUnderProcessing(); eventProcessor.handleEvent(nonCREvent(resourceUid)); @@ -105,7 +120,8 @@ void schedulesAnEventRetryOnException() { eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl); verify(retryTimerEventSourceMock, times(1)) - .scheduleOnce(eq(customResource), eq(RetryConfiguration.DEFAULT_INITIAL_INTERVAL)); + .scheduleOnce(eq(ResourceID.fromResource(customResource)), + eq(RetryConfiguration.DEFAULT_INITIAL_INTERVAL)); } @Test @@ -136,7 +152,8 @@ void executesTheControllerInstantlyAfterErrorIfNewEventsReceived() { List allValues = executionScopeArgumentCaptor.getAllValues(); assertThat(allValues).hasSize(2); verify(retryTimerEventSourceMock, never()) - .scheduleOnce(eq(customResource), eq(RetryConfiguration.DEFAULT_INITIAL_INTERVAL)); + .scheduleOnce(eq(ResourceID.fromResource(customResource)), + eq(RetryConfiguration.DEFAULT_INITIAL_INTERVAL)); } @Test @@ -198,7 +215,7 @@ void scheduleTimedEventIfInstructedByPostExecutionControl() { eventProcessor.handleEvent(prepareCREvent()); verify(retryTimerEventSourceMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(1)) - .scheduleOnce(any(), eq(testDelay)); + .scheduleOnce((ResourceID) any(), eq(testDelay)); } @Test @@ -214,7 +231,7 @@ void reScheduleOnlyIfNotExecutedEventsReceivedMeanwhile() throws InterruptedExce verify(retryTimerEventSourceMock, after((long) (FAKE_CONTROLLER_EXECUTION_DURATION * 1.5)).times(0)) - .scheduleOnce(any(), eq(testDelay)); + .scheduleOnce((ResourceID) any(), eq(testDelay)); } @Test @@ -241,6 +258,7 @@ void startProcessedMarkedEventReceivedBefore() { var crID = new ResourceID("test-cr", TEST_NAMESPACE); eventProcessor = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, + new PeriodRateLimiter(), metricsMock)); when(controllerResourceEventSourceMock.get(eq(crID))) .thenReturn(Optional.of(testCustomResource())); @@ -327,6 +345,25 @@ void newResourceAfterMissedDeleteEvent() { verify(reconciliationDispatcherMock, timeout(50).times(1)).handleExecution(any()); } + @Test + void rateLimitsReconciliationSubmission() { + // the refresh period value does not matter here + var refreshPeriod = Duration.ofMillis(100); + var event = prepareCREvent(); + + when(rateLimiterMock.acquirePermission(event.getRelatedCustomResourceID())) + .thenReturn(Optional.empty()) + .thenReturn(Optional.of(refreshPeriod)); + + eventProcessor.handleEvent(event); + verify(reconciliationDispatcherMock, after(FAKE_CONTROLLER_EXECUTION_DURATION).times(1)) + .handleExecution(any()); + verify(retryTimerEventSourceMock, times(0)).scheduleOnce((ResourceID) any(), anyLong()); + + eventProcessor.handleEvent(event); + verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong()); + } + private ResourceID eventAlreadyUnderProcessing() { when(reconciliationDispatcherMock.handleExecution(any())) .then( diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiterTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiterTest.java new file mode 100644 index 0000000000..90ad8447cf --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiterTest.java @@ -0,0 +1,65 @@ +package io.javaoperatorsdk.operator.processing.event.rate; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; + +import io.javaoperatorsdk.operator.TestUtils; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +import static org.assertj.core.api.Assertions.assertThat; + +class PeriodRateLimiterTest { + + public static final Duration REFRESH_PERIOD = Duration.ofMillis(300); + ResourceID resourceID = ResourceID.fromResource(TestUtils.testCustomResource()); + + @Test + void acquirePermissionForNewResource() { + var rl = new PeriodRateLimiter(REFRESH_PERIOD, 2); + var res = rl.acquirePermission(resourceID); + assertThat(res).isEmpty(); + res = rl.acquirePermission(resourceID); + assertThat(res).isEmpty(); + + res = rl.acquirePermission(resourceID); + assertThat(res).isNotEmpty(); + } + + @Test + void returnsMinimalDurationToAcquirePermission() { + var rl = new PeriodRateLimiter(REFRESH_PERIOD, 1); + var res = rl.acquirePermission(resourceID); + assertThat(res).isEmpty(); + + res = rl.acquirePermission(resourceID); + + assertThat(res).isPresent(); + assertThat(res.get()).isLessThan(REFRESH_PERIOD); + } + + @Test + void resetsPeriodAfterLimit() throws InterruptedException { + var rl = new PeriodRateLimiter(REFRESH_PERIOD, 1); + var res = rl.acquirePermission(resourceID); + assertThat(res).isEmpty(); + res = rl.acquirePermission(resourceID); + assertThat(res).isPresent(); + + // sleep plus some slack + Thread.sleep(REFRESH_PERIOD.toMillis() + REFRESH_PERIOD.toMillis() / 3); + + res = rl.acquirePermission(resourceID); + assertThat(res).isEmpty(); + } + + @Test + void rateLimitCanBeTurnedOff() { + var rl = new PeriodRateLimiter(REFRESH_PERIOD, PeriodRateLimiter.NO_LIMIT_PERIOD); + + var res = rl.acquirePermission(resourceID); + + assertThat(res).isEmpty(); + } + +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java index bee750a324..ed073f950c 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java @@ -20,11 +20,8 @@ import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.DefaultControllerConfiguration; import io.javaoperatorsdk.operator.api.config.Version; -import io.javaoperatorsdk.operator.api.reconciler.Constants; -import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.Reconciler; -import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static org.assertj.core.api.Assertions.assertThat; @@ -55,7 +52,10 @@ void setUpResources() { configurationService = spy(ConfigurationService.class); when(configurationService.checkCRDAndValidateLocalModel()).thenReturn(false); when(configurationService.getVersion()).thenReturn(new Version("1", "1", new Date())); + // make sure not the same config instance is used for the controller, so rate limiter is not + // shared when(configurationService.getConfigurationFor(any(MyController.class))) + .thenReturn(new MyConfiguration()) .thenReturn(new MyConfiguration()); } @@ -136,8 +136,8 @@ public static class MyConfiguration extends DefaultControllerConfiguration null, TestCustomResource.class, null, - onAddFilter, onUpdateFilter, genericFilter, null); + onAddFilter, onUpdateFilter, genericFilter, null, null); } } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSourceTest.java index 2dc42c0b16..3fe3a5db58 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSourceTest.java @@ -37,9 +37,9 @@ public void setup() { @Test public void schedulesOnce() { - TestCustomResource customResource = TestUtils.testCustomResource(); + var resourceID = ResourceID.fromResource(TestUtils.testCustomResource()); - source.scheduleOnce(customResource, PERIOD); + source.scheduleOnce(resourceID, PERIOD); untilAsserted(() -> assertThat(eventHandler.events).hasSize(1)); untilAsserted(PERIOD * 2, 0, () -> assertThat(eventHandler.events).hasSize(1)); @@ -47,20 +47,20 @@ public void schedulesOnce() { @Test public void canCancelOnce() { - TestCustomResource customResource = TestUtils.testCustomResource(); + var resourceID = ResourceID.fromResource(TestUtils.testCustomResource()); - source.scheduleOnce(customResource, PERIOD); - source.cancelOnceSchedule(ResourceID.fromResource(customResource)); + source.scheduleOnce(resourceID, PERIOD); + source.cancelOnceSchedule(resourceID); untilAsserted(() -> assertThat(eventHandler.events).isEmpty()); } @Test public void canRescheduleOnceEvent() { - TestCustomResource customResource = TestUtils.testCustomResource(); + var resourceID = ResourceID.fromResource(TestUtils.testCustomResource()); - source.scheduleOnce(customResource, PERIOD); - source.scheduleOnce(customResource, 2 * PERIOD); + source.scheduleOnce(resourceID, PERIOD); + source.scheduleOnce(resourceID, 2 * PERIOD); untilAsserted(PERIOD * 2, PERIOD, () -> assertThat(eventHandler.events).hasSize(1)); } @@ -69,7 +69,7 @@ public void canRescheduleOnceEvent() { public void deRegistersOnceEventSources() { TestCustomResource customResource = TestUtils.testCustomResource(); - source.scheduleOnce(customResource, PERIOD); + source.scheduleOnce(ResourceID.fromResource(customResource), PERIOD); source.onResourceDeleted(customResource); untilAsserted(() -> assertThat(eventHandler.events).isEmpty()); @@ -77,16 +77,16 @@ public void deRegistersOnceEventSources() { @Test public void eventNotRegisteredIfStopped() throws IOException { - TestCustomResource customResource = TestUtils.testCustomResource(); + var resourceID = ResourceID.fromResource(TestUtils.testCustomResource()); source.stop(); assertThatExceptionOfType(IllegalStateException.class).isThrownBy( - () -> source.scheduleOnce(customResource, PERIOD)); + () -> source.scheduleOnce(resourceID, PERIOD)); } @Test public void eventNotFiredIfStopped() throws IOException { - source.scheduleOnce(TestUtils.testCustomResource(), PERIOD); + source.scheduleOnce(ResourceID.fromResource(TestUtils.testCustomResource()), PERIOD); source.stop(); untilAsserted(() -> assertThat(eventHandler.events).isEmpty()); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/RateLimitIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/RateLimitIT.java new file mode 100644 index 0000000000..25509155de --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/RateLimitIT.java @@ -0,0 +1,62 @@ +package io.javaoperatorsdk.operator; + +import java.time.Duration; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.sample.ratelimit.RateLimitCustomResource; +import io.javaoperatorsdk.operator.sample.ratelimit.RateLimitCustomResourceSpec; +import io.javaoperatorsdk.operator.sample.ratelimit.RateLimitReconciler; + +import static io.javaoperatorsdk.operator.sample.ratelimit.RateLimitReconciler.REFRESH_PERIOD; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class RateLimitIT { + + private final static Logger log = LoggerFactory.getLogger(RateLimitIT.class); + + @RegisterExtension + LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension.builder() + .withReconciler(new RateLimitReconciler()) + .build(); + + @Test + void rateLimitsExecution() { + var res = operator.create(RateLimitCustomResource.class, createResource()); + IntStream.rangeClosed(1, 5).forEach(i -> { + log.debug("replacing resource version: {}", i); + var resource = createResource(); + resource.getSpec().setNumber(i); + operator.replace(RateLimitCustomResource.class, resource); + }); + await().pollInterval(Duration.ofMillis(100)) + .pollDelay(Duration.ofMillis(REFRESH_PERIOD / 2)) + .untilAsserted(() -> assertThat( + operator.getReconcilerOfType(RateLimitReconciler.class).getNumberOfExecutions()) + .isEqualTo(1)); + + await().pollDelay(Duration.ofMillis(REFRESH_PERIOD)) + .untilAsserted(() -> assertThat( + operator.getReconcilerOfType(RateLimitReconciler.class).getNumberOfExecutions()) + .isEqualTo(2)); + } + + public RateLimitCustomResource createResource() { + RateLimitCustomResource res = new RateLimitCustomResource(); + res.setMetadata(new ObjectMetaBuilder() + .withName("test") + .build()); + res.setSpec(new RateLimitCustomResourceSpec()); + res.getSpec().setNumber(0); + return res; + } + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResource.java new file mode 100644 index 0000000000..60456e6a4d --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResource.java @@ -0,0 +1,16 @@ +package io.javaoperatorsdk.operator.sample.ratelimit; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("rlc") +public class RateLimitCustomResource + extends CustomResource + implements Namespaced { + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceSpec.java new file mode 100644 index 0000000000..7dbee7f75a --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceSpec.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.ratelimit; + +public class RateLimitCustomResourceSpec { + + private int number; + + public int getNumber() { + return number; + } + + public RateLimitCustomResourceSpec setNumber(int number) { + this.number = number; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceStatus.java new file mode 100644 index 0000000000..087408fc16 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceStatus.java @@ -0,0 +1,7 @@ +package io.javaoperatorsdk.operator.sample.ratelimit; + +import io.javaoperatorsdk.operator.api.ObservedGenerationAwareStatus; + +public class RateLimitCustomResourceStatus extends ObservedGenerationAwareStatus { + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitReconciler.java new file mode 100644 index 0000000000..b4a77a8fc0 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitReconciler.java @@ -0,0 +1,30 @@ +package io.javaoperatorsdk.operator.sample.ratelimit; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.javaoperatorsdk.operator.api.reconciler.*; + +@ControllerConfiguration(rateLimit = @RateLimit(limitForPeriod = 1, + refreshPeriod = RateLimitReconciler.REFRESH_PERIOD, + refreshPeriodTimeUnit = TimeUnit.MILLISECONDS)) +public class RateLimitReconciler + implements Reconciler { + + public static final int REFRESH_PERIOD = 3000; + + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + + @Override + public UpdateControl reconcile( + RateLimitCustomResource resource, + Context context) { + + numberOfExecutions.addAndGet(1); + return UpdateControl.noUpdate(); + } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } +} diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageReconciler.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageReconciler.java index 19b130da88..2aa1674564 100644 --- a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageReconciler.java +++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageReconciler.java @@ -3,6 +3,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -18,14 +19,7 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; -import io.javaoperatorsdk.operator.api.reconciler.Reconciler; -import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.*; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; @@ -33,7 +27,8 @@ import static io.javaoperatorsdk.operator.sample.WebPageManagedDependentsReconciler.SELECTOR; /** Shows how to implement reconciler using the low level api directly. */ -@ControllerConfiguration +@ControllerConfiguration(rateLimit = @RateLimit(limitForPeriod = 2, refreshPeriod = 3, + refreshPeriodTimeUnit = TimeUnit.SECONDS)) public class WebPageReconciler implements Reconciler, ErrorStatusHandler, EventSourceInitializer {