Skip to content

fix: using tombstones to account for rapid deletion #2319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,28 @@ private boolean usePreviousAnnotation(Context<P> context) {
.previousAnnotationForDependentResourcesEventFiltering();
}

@Override
protected R handleCreate(R desired, P primary, Context<P> context) {
var id = ResourceID.fromResource(desired);
try {
eventSource().orElseThrow().prepareForAddOrUpdate(id);
return super.handleCreate(desired, primary, context);
} finally {
eventSource().orElseThrow().finishAddOrUpdate(id);
}
}

@Override
protected R handleUpdate(R actual, R desired, P primary, Context<P> context) {
var id = ResourceID.fromResource(desired);
try {
eventSource().orElseThrow().prepareForAddOrUpdate(id);
return super.handleUpdate(actual, desired, primary, context);
} finally {
eventSource().orElseThrow().finishAddOrUpdate(id);
}
}

@Override
protected void handleDelete(P primary, R secondary, Context<P> context) {
if (secondary != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,14 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res
handleRecentCreateOrUpdate(Operation.ADD, resource, null);
}

public void prepareForAddOrUpdate(ResourceID id) {
this.temporaryResourceCache.prepareForAddOrUpdate(id);
}

public void finishAddOrUpdate(ResourceID id) {
this.temporaryResourceCache.finshedAddOrUpdate(id);
}

private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
primaryToSecondaryIndex.onAddOrUpdate(newResource);
temporaryResourceCache.putResource(newResource, Optional.ofNullable(oldResource)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -40,6 +42,9 @@ public class TemporaryResourceCache<T extends HasMetadata> {
private static final int MAX_RESOURCE_VERSIONS = 256;

private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();

private final Map<ResourceID, List<String>> tombstones =
new ConcurrentHashMap<ResourceID, List<String>>();
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
private final boolean parseResourceVersions;
private final Set<String> knownResourceVersions;
Expand All @@ -51,7 +56,7 @@ public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInforme
if (parseResourceVersions) {
knownResourceVersions = Collections.newSetFromMap(new LinkedHashMap<String, Boolean>() {
@Override
protected boolean removeEldestEntry(java.util.Map.Entry<String, Boolean> eldest) {
protected boolean removeEldestEntry(Map.Entry<String, Boolean> eldest) {
return size() >= MAX_RESOURCE_VERSIONS;
}
});
Expand All @@ -60,6 +65,22 @@ protected boolean removeEldestEntry(java.util.Map.Entry<String, Boolean> eldest)
}
}

public void prepareForAddOrUpdate(ResourceID id) {
tombstones.put(id, new ArrayList<>());
}

public void finshedAddOrUpdate(ResourceID id) {
tombstones.remove(id);
}

public synchronized void onDeleteEvent(T resource, boolean unknownState) {
tombstones.computeIfPresent(ResourceID.fromResource(resource), (k, v) -> {
v.add(resource.getMetadata().getUid());
return v;
});
onEvent(resource, unknownState);
}

public synchronized void onEvent(T resource, boolean unknownState) {
cache.computeIfPresent(ResourceID.fromResource(resource),
(id, cached) -> (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null
Expand All @@ -84,14 +105,27 @@ public synchronized void putResource(T newResource, String previousResourceVersi
var cachedResource = getResourceFromCache(resourceId)
.orElse(managedInformerEventSource.get(resourceId).orElse(null));

if ((previousResourceVersion == null && cachedResource == null)
boolean moveAhead = false;
if (previousResourceVersion == null && cachedResource == null) {
if (Optional.ofNullable(tombstones.get(resourceId))
.filter(list -> list.contains(newResource.getMetadata().getUid())).isPresent()) {
log.debug(
"Won't resurrect uid {} for resource id: {}",
newResource.getMetadata().getUid(), resourceId);
return;
}
// we can skip further checks as this is a simple add and there's no previous entry to consider
moveAhead = true;
}

if (moveAhead
|| (cachedResource != null
&& (cachedResource.getMetadata().getResourceVersion().equals(previousResourceVersion))
|| isLaterResourceVersion(resourceId, newResource, cachedResource))) {
log.debug(
"Temporarily moving ahead to target version {} for resource id: {}",
newResource.getMetadata().getResourceVersion(), resourceId);
putToCache(newResource, resourceId);
cache.put(resourceId, newResource);
} else if (cache.remove(resourceId) != null) {
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
}
Expand Down Expand Up @@ -123,10 +157,6 @@ private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T c
return false;
}

private void putToCache(T resource, ResourceID resourceID) {
cache.put(resourceID == null ? ResourceID.fromResource(resource) : resourceID, resource);
}

public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
return Optional.ofNullable(cache.get(resourceID));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,22 @@ void resourceVersionParsing() {
.isNotPresent();
}

@Test
void rapidDeletion() {
var testResource = testResource();
ResourceID id = ResourceID.fromResource(testResource);
temporaryResourceCache.prepareForAddOrUpdate(id);
temporaryResourceCache.onEvent(testResource, false); // create
temporaryResourceCache.onDeleteEvent(new ConfigMapBuilder(testResource).editMetadata()
.withResourceVersion("3").endMetadata().build(), false);

// put should be rejected
temporaryResourceCache.putAddedResource(testResource);

assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
.isEmpty();
}

private ConfigMap propagateTestResourceToCache() {
var testResource = testResource();
when(informerEventSource.get(any())).thenReturn(Optional.empty());
Expand All @@ -127,6 +143,7 @@ ConfigMap testResource() {
configMap.getMetadata().setName("test");
configMap.getMetadata().setNamespace("default");
configMap.getMetadata().setResourceVersion(RESOURCE_VERSION);
configMap.getMetadata().setUid("test-uid");
return configMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.*;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
Expand Down Expand Up @@ -64,24 +71,29 @@ private void updateExternalResource(ExternalStateCustomResource resource,

private void createExternalResource(ExternalStateCustomResource resource,
Context<ExternalStateCustomResource> context) {
var createdResource =
externalService.create(new ExternalResource(resource.getSpec().getData()));
var configMap = new ConfigMapBuilder()
.withMetadata(new ObjectMetaBuilder()
.withName(resource.getMetadata().getName())
.withNamespace(resource.getMetadata().getNamespace())
.build())
.withData(Map.of(ID_KEY, createdResource.getId()))
.build();
configMap.addOwnerReference(resource);
context.getClient().configMaps().resource(configMap).create();

var primaryID = ResourceID.fromResource(resource);
// Making sure that the created resources are in the cache for the next reconciliation.
// This is critical in this case, since on next reconciliation if it would not be in the cache
// it would be created again.
configMapEventSource.handleRecentResourceCreate(primaryID, configMap);
externalResourceEventSource.handleRecentResourceCreate(primaryID, createdResource);
try {
var createdResource =
externalService.create(new ExternalResource(resource.getSpec().getData()));
var configMap = new ConfigMapBuilder()
.withMetadata(new ObjectMetaBuilder()
.withName(resource.getMetadata().getName())
.withNamespace(resource.getMetadata().getNamespace())
.build())
.withData(Map.of(ID_KEY, createdResource.getId()))
.build();
configMap.addOwnerReference(resource);
configMapEventSource.prepareForAddOrUpdate(primaryID);
context.getClient().configMaps().resource(configMap).create();

// Making sure that the created resources are in the cache for the next reconciliation.
// This is critical in this case, since on next reconciliation if it would not be in the cache
// it would be created again.
configMapEventSource.handleRecentResourceCreate(primaryID, configMap);
externalResourceEventSource.handleRecentResourceCreate(primaryID, createdResource);
} finally {
configMapEventSource.finishAddOrUpdate(primaryID);
}
}

@Override
Expand All @@ -94,6 +106,7 @@ public DeleteControl cleanup(ExternalStateCustomResource resource,
return DeleteControl.defaultDelete();
}

@Override
public int getNumberOfExecutions() {
return numberOfExecutions.get();
}
Expand Down