Skip to content

eventing refinements mentioned on #2012 #2034

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -364,4 +364,31 @@ default Set<Class<? extends HasMetadata>> defaultNonSSAResource() {
return Set.of(ConfigMap.class, Secret.class);
}

/**
* If an annotation should be used so that the operator sdk can detect events from its own updates
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an annotation is an implementation detail towards achieving filtering of own updates. If we want to explain that an annotation is used for this purpose, either we need to document in greater details how the annotation is used (so that users can actually use that information) or not mention it at all, imo.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than specifying what the annotation name is, there isn't much greater detail that is needed here - the user is not expected to do anything with these annotations.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it would make sense to document the annotation name so that people can look it up in case people notice it in their resources and wonder what that is.

* of dependent resources and then filter them.
* <p>
* Disable this if you want to react to your own dependent resource updates
*
* @since 4.5.0
*/
default boolean previousAnnotationForDependentResources() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather see a more explicit method name that conveys better the intent, i.e. something like:

Suggested change
default boolean previousAnnotationForDependentResources() {
default boolean processOwnDependentResourceUpdates() {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there have been multiple mechanism for this the thinking is to use specific rather than general terminology, so that if it changes again the user can be explicit about which mechanism(s) are in play.

If you strongly prefer the general feature name, that's fine too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be too generic.
previousAnnotationForDependentResourcesEventFiltering or somethink would make more sense to me.

return true;
}

/**
* If the event logic should parse the resourceVersion to determine the ordering of events. This
* is typically not needed.
* <p>
* Disabled by default as Kubernetes does not support, and discourages, this interpretation of
* resourceVersions. Enable only if your api server event processing seems to lag the operator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a more concrete example of when setting this to true would help. As is, it's not obvious when people might want to use this and I would rather not expose this at all if this isn't sufficiently / broadly applicable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's basically for feature parity with the mutable cache that is in go client - I'm not aware of user request for the feature, so if you feel strongly about it, then it could be hidden or removed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@metacosm @csviri what do you think - would it be better to pull out or hide the resourceVersion parsing logic?

* logic and you want to further minimize the the amount of work done / updates issued by the
* operator.
*
* @since 4.5.0
*/
default boolean parseResourceVersions() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, a more explicit method name would be better, imo.

Suggested change
default boolean parseResourceVersions() {
default boolean useResourceVersionsForEventOrdering() {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's decide if you even want this before making a name change.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I'm not against this. Also would stick with the original name, thus parseResourceVersions since this can have later effect on other parts.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseResourceVersions doesn't mean anything to me as a user… Parse how? For what purpose? Why would I care? Sure, this can be mitigated by documentation but as we know documentation is not always up to date and I'd rather have explicit method names for user-facing options.

return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class ConfigurationServiceOverrider {
private ResourceClassResolver resourceClassResolver;
private Boolean ssaBasedCreateUpdateMatchForDependentResources;
private Set<Class<? extends HasMetadata>> defaultNonSSAResource;
private Boolean previousAnnotationForDependentResources;
private Boolean parseResourceVersions;

ConfigurationServiceOverrider(ConfigurationService original) {
this.original = original;
Expand Down Expand Up @@ -158,6 +160,18 @@ public ConfigurationServiceOverrider withDefaultNonSSAResource(
return this;
}

public ConfigurationServiceOverrider withPreviousAnnotationForDependentResources(
boolean value) {
this.previousAnnotationForDependentResources = value;
return this;
}

public ConfigurationServiceOverrider wihtParseResourceVersions(
boolean value) {
this.parseResourceVersions = value;
return this;
}

public ConfigurationService build() {
return new BaseConfigurationService(original.getVersion(), cloner, client) {
@Override
Expand Down Expand Up @@ -270,6 +284,20 @@ public Set<Class<? extends HasMetadata>> defaultNonSSAResource() {
return defaultNonSSAResource != null ? defaultNonSSAResource
: super.defaultNonSSAResource();
}

@Override
public boolean previousAnnotationForDependentResources() {
return previousAnnotationForDependentResources != null
? previousAnnotationForDependentResources
: super.previousAnnotationForDependentResources();
}

@Override
public boolean parseResourceVersions() {
return parseResourceVersions != null
? parseResourceVersions
: super.parseResourceVersions();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper() {
return secondaryToPrimaryMapper;
}

@Override
public Optional<OnDeleteFilter<? super R>> onDeleteFilter() {
return Optional.ofNullable(onDeleteFilter);
}
Expand Down Expand Up @@ -95,12 +96,15 @@ public <P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondary
*/
SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper();

@Override
Optional<OnAddFilter<? super R>> onAddFilter();

@Override
Optional<OnUpdateFilter<? super R>> onUpdateFilter();

Optional<OnDeleteFilter<? super R>> onDeleteFilter();

@Override
Optional<GenericFilter<? super R>> genericFilter();

<P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondaryMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public R create(R desired, P primary, Context<P> context) {
desired.getMetadata().setResourceVersion("1");
}
}
addMetadata(false, null, desired, primary);
addMetadata(false, null, desired, primary, context);
sanitizeDesired(desired, null, primary, context);
final var resource = prepare(desired, primary, "Creating");
return useSSA(context)
Expand All @@ -130,7 +130,7 @@ public R update(R actual, R desired, P primary, Context<P> context) {
actual.getMetadata().getResourceVersion());
}
R updatedResource;
addMetadata(false, actual, desired, primary);
addMetadata(false, actual, desired, primary, context);
sanitizeDesired(desired, actual, primary, context);
if (useSSA(context)) {
updatedResource = prepare(desired, primary, "Updating")
Expand Down Expand Up @@ -163,7 +163,7 @@ public Result<R> match(R actualResource, R desired, P primary, Context<P> contex
public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMatcher<R> matcher,
Context<P> context) {
final boolean matches;
addMetadata(true, actualResource, desired, primary);
addMetadata(true, actualResource, desired, primary, context);
if (useSSA(context)) {
matches = SSABasedGenericKubernetesResourceMatcher.getInstance()
.matches(actualResource, desired, context);
Expand All @@ -173,8 +173,9 @@ public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMa
return Result.computed(matches, desired);
}

protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary) {
if (forMatch) { // keep the current
protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary,
Context<P> context) {
if (forMatch) { // keep the current previous annotation
String actual = actualResource.getMetadata().getAnnotations()
.get(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
Map<String, String> annotations = target.getMetadata().getAnnotations();
Expand All @@ -183,7 +184,7 @@ protected void addMetadata(boolean forMatch, R actualResource, final R target, P
} else {
annotations.remove(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
}
} else { // set a new one
} else if (usePreviousAnnotation(context)) { // set a new one
eventSource().orElseThrow().addPreviousAnnotation(
Optional.ofNullable(actualResource).map(r -> r.getMetadata().getResourceVersion())
.orElse(null),
Expand All @@ -208,6 +209,11 @@ protected boolean useSSA(Context<P> context) {
.ssaBasedCreateUpdateMatchForDependentResources());
}

private boolean usePreviousAnnotation(Context<P> context) {
return context.getControllerConfiguration().getConfigurationService()
.previousAnnotationForDependentResources();
}

@Override
protected void handleDelete(P primary, R secondary, Context<P> context) {
if (secondary != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public class ControllerResourceEventSource<T extends HasMetadata>

@SuppressWarnings({"unchecked", "rawtypes"})
public ControllerResourceEventSource(Controller<T> controller) {
super(controller.getCRClient(), controller.getConfiguration());
super(controller.getCRClient(), controller.getConfiguration(), false);
this.controller = controller;

final var config = controller.getConfiguration();
OnUpdateFilter internalOnUpdateFilter =
(OnUpdateFilter<T>) onUpdateFinalizerNeededAndApplied(controller.useFinalizer(),
onUpdateFinalizerNeededAndApplied(controller.useFinalizer(),
config.getFinalizerName())
.or(onUpdateGenerationAware(config.isGenerationAware()))
.or(onUpdateMarkedForDeletion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
extends ManagedInformerEventSource<R, P, InformerConfiguration<R>>
implements ResourceEventHandler<R> {

private static final int MAX_RESOURCE_VERSIONS = 256;

public static String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous";

private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);
Expand All @@ -78,14 +80,31 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
private Map<String, Function<R, List<String>>> indexerBuffer = new HashMap<>();
private final String id = UUID.randomUUID().toString();
private final Set<String> knownResourceVersions;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document what are the situations this is solving


public InformerEventSource(
InformerConfiguration<R> configuration, EventSourceContext<P> context) {
this(configuration, context.getClient());
this(configuration, context.getClient(),
context.getControllerConfiguration().getConfigurationService().parseResourceVersions());
}

public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client) {
super(client.resources(configuration.getResourceClass()), configuration);
this(configuration, client, false);
}

public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client,
boolean parseResourceVersions) {
super(client.resources(configuration.getResourceClass()), configuration, parseResourceVersions);
if (parseResourceVersions) {
knownResourceVersions = Collections.newSetFromMap(new LinkedHashMap<String, Boolean>() {
@Override
protected boolean removeEldestEntry(java.util.Map.Entry<String, Boolean> eldest) {
return size() >= MAX_RESOURCE_VERSIONS;
}
});
} else {
knownResourceVersions = null;
}

// If there is a primary to secondary mapper there is no need for primary to secondary index.
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
Expand Down Expand Up @@ -169,6 +188,10 @@ private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldO
}

private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
if (knownResourceVersions != null
&& knownResourceVersions.contains(newObject.getMetadata().getResourceVersion())) {
return true;
}
var res = temporaryResourceCache.getResourceFromCache(resourceID);
if (res.isEmpty()) {
return isEventKnownFromAnnotation(newObject, oldObject);
Expand Down Expand Up @@ -262,6 +285,9 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res

private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
primaryToSecondaryIndex.onAddOrUpdate(newResource);
if (knownResourceVersions != null) {
knownResourceVersions.add(newResource.getMetadata().getResourceVersion());
}
temporaryResourceCache.putResource(newResource, Optional.ofNullable(oldResource)
.map(r -> r.getMetadata().getResourceVersion()).orElse(null));
}
Expand All @@ -275,7 +301,6 @@ public boolean allowsNamespaceChanges() {
return configuration().followControllerNamespaceChanges();
}


private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) {
if (genericFilter != null && !genericFilter.accept(newObject)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,27 @@ public abstract class ManagedInformerEventSource<R extends HasMetadata, P extend
protected MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client;

protected ManagedInformerEventSource(
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration) {
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration,
boolean parseResourceVersions) {
super(configuration.getResourceClass());
this.client = client;
temporaryResourceCache = new TemporaryResourceCache<>(this);
temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions);
this.cache = new InformerManager<>(client, configuration, this);
}

@Override
public void onAdd(R resource) {
temporaryResourceCache.removeResourceFromCache(resource);
temporaryResourceCache.onEvent(resource, false);
}

@Override
public void onUpdate(R oldObj, R newObj) {
temporaryResourceCache.removeResourceFromCache(newObj);
temporaryResourceCache.onEvent(newObj, false);
}

@Override
public void onDelete(R obj, boolean deletedFinalStateUnknown) {
temporaryResourceCache.removeResourceFromCache(obj);
temporaryResourceCache.onEvent(obj, deletedFinalStateUnknown);
}

protected InformerManager<R, C> manager() {
Expand Down Expand Up @@ -127,6 +128,7 @@ void setTemporalResourceCache(TemporaryResourceCache<R> temporaryResourceCache)
this.temporaryResourceCache = temporaryResourceCache;
}

@Override
public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
cache.addIndexers(indexers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

Expand Down Expand Up @@ -36,13 +37,18 @@ public class TemporaryResourceCache<T extends HasMetadata> {

private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
private final boolean parseResourceVersions;

public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource,
boolean parseResourceVersions) {
this.managedInformerEventSource = managedInformerEventSource;
this.parseResourceVersions = parseResourceVersions;
}

public synchronized Optional<T> removeResourceFromCache(T resource) {
return Optional.ofNullable(cache.remove(ResourceID.fromResource(resource)));
public synchronized void onEvent(T resource, boolean unknownState) {
cache.computeIfPresent(ResourceID.fromResource(resource),
(id, cached) -> (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null
: cached);
}

public synchronized void putAddedResource(T newResource) {
Expand All @@ -61,18 +67,37 @@ public synchronized void putResource(T newResource, String previousResourceVersi
.orElse(managedInformerEventSource.get(resourceId).orElse(null));

if ((previousResourceVersion == null && cachedResource == null)
|| (cachedResource != null && previousResourceVersion != null
&& cachedResource.getMetadata().getResourceVersion()
.equals(previousResourceVersion))) {
|| (cachedResource != null
&& (cachedResource.getMetadata().getResourceVersion().equals(previousResourceVersion))
|| isLaterResourceVersion(resourceId, newResource, cachedResource))) {
log.debug(
"Temporarily moving ahead to target version {} for resource id: {}",
newResource.getMetadata().getResourceVersion(), resourceId);
putToCache(newResource, resourceId);
} else {
if (cache.remove(resourceId) != null) {
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
} else if (cache.remove(resourceId) != null) {
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
}
}

/**
* @return true if {@link InformerConfiguration#parseResourceVersions()} is enabled and the
* resourceVersion of newResource is numerically greater than cachedResource, otherwise
* false
*/
private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) {
try {
if (parseResourceVersions
&& Long.compare(Long.parseLong(newResource.getMetadata().getResourceVersion()),
Long.parseLong(cachedResource.getMetadata().getResourceVersion())) > 0) {
return true;
}
} catch (NumberFormatException e) {
log.debug(
"Could not compare resourceVersions {} and {} for {}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be this a warning? even if it could pollute cache? (no strong opinion)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't pollute the cache, it would exhibit the same behavior as it does now. It can of course be at whatever level you want.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean pollute logs :)

newResource.getMetadata().getResourceVersion(),
cachedResource.getMetadata().getResourceVersion(), resourceId);
}
return false;
}

private void putToCache(T resource, ResourceID resourceID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
informerEventSource.onUpdate(cachedDeployment, testDeployment());

verify(eventHandlerMock, times(1)).handleEvent(any());
verify(temporaryResourceCacheMock, times(1)).removeResourceFromCache(any());
verify(temporaryResourceCacheMock, times(1)).onEvent(testDeployment(), false);
}

@Test
Expand Down
Loading