-
Notifications
You must be signed in to change notification settings - Fork 141
Delta xds non breaking hash bytes refactor #181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
lukidzi
merged 3 commits into
envoyproxy:delta-xds-non-breaking-slonka-hash-bytes
from
Ferdudas97:delta-xds-non-breaking-hash-bytes-refactor
Oct 21, 2021
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
73 changes: 73 additions & 0 deletions
73
cache/src/main/java/io/envoyproxy/controlplane/cache/AbstractWatch.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package io.envoyproxy.controlplane.cache; | ||
|
||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; | ||
import java.util.function.Consumer; | ||
|
||
public abstract class AbstractWatch<V, T> { | ||
|
||
private static final AtomicIntegerFieldUpdater<AbstractWatch> isCancelledUpdater = | ||
AtomicIntegerFieldUpdater.newUpdater(AbstractWatch.class, "isCancelled"); | ||
private final V request; | ||
private final Consumer<T> responseConsumer; | ||
private volatile int isCancelled = 0; | ||
private Runnable stop; | ||
|
||
/** | ||
* Construct a watch. | ||
* | ||
* @param request the original request for the watch | ||
* @param responseConsumer handler for outgoing response messages | ||
*/ | ||
public AbstractWatch(V request, Consumer<T> responseConsumer) { | ||
this.request = request; | ||
this.responseConsumer = responseConsumer; | ||
} | ||
|
||
/** | ||
* Cancel the watch. A watch must be cancelled in order to complete its resource stream and free resources. Cancel | ||
* may be called multiple times, with each subsequent call being a no-op. | ||
*/ | ||
public void cancel() { | ||
if (isCancelledUpdater.compareAndSet(this, 0, 1)) { | ||
if (stop != null) { | ||
stop.run(); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Returns boolean indicating whether or not the watch has been cancelled. | ||
*/ | ||
public boolean isCancelled() { | ||
return isCancelledUpdater.get(this) == 1; | ||
} | ||
|
||
/** | ||
* Returns the original request for the watch. | ||
*/ | ||
public V request() { | ||
return request; | ||
} | ||
|
||
/** | ||
* Sends the given response to the watch's response handler. | ||
* | ||
* @param response the response to be handled | ||
* @throws WatchCancelledException if the watch has already been cancelled | ||
*/ | ||
public void respond(T response) throws WatchCancelledException { | ||
if (isCancelled()) { | ||
throw new WatchCancelledException(); | ||
} | ||
|
||
responseConsumer.accept(response); | ||
} | ||
|
||
/** | ||
* Sets the callback method to be executed when the watch is cancelled. Even if cancel is executed multiple times, it | ||
* ensures that this stop callback is only executed once. | ||
*/ | ||
public void setStop(Runnable stop) { | ||
this.stop = stop; | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
142 changes: 2 additions & 140 deletions
142
cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,152 +1,14 @@ | ||
package io.envoyproxy.controlplane.cache; | ||
|
||
import com.google.common.collect.ImmutableSet; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
import java.util.function.BiFunction; | ||
import javax.annotation.concurrent.ThreadSafe; | ||
|
||
/** | ||
* {@code CacheStatusInfo} provides a default implementation of {@link StatusInfo} for use in {@link Cache} | ||
* implementations. | ||
*/ | ||
@ThreadSafe | ||
public class CacheStatusInfo<T> implements StatusInfo<T> { | ||
|
||
private final T nodeGroup; | ||
|
||
private final ConcurrentMap<Long, Watch> watches = new ConcurrentHashMap<>(); | ||
private final ConcurrentMap<Long, DeltaWatch> deltaWatches = new ConcurrentHashMap<>(); | ||
private volatile long lastWatchRequestTime; | ||
private volatile long lastDeltaWatchRequestTime; | ||
|
||
public class CacheStatusInfo<T> extends MutableStatusInfo<T, Watch> { | ||
public CacheStatusInfo(T nodeGroup) { | ||
this.nodeGroup = nodeGroup; | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public long lastWatchRequestTime() { | ||
return lastWatchRequestTime; | ||
} | ||
|
||
@Override | ||
public long lastDeltaWatchRequestTime() { | ||
return lastDeltaWatchRequestTime; | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public T nodeGroup() { | ||
return nodeGroup; | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public int numWatches() { | ||
return watches.size(); | ||
} | ||
|
||
@Override | ||
public int numDeltaWatches() { | ||
return deltaWatches.size(); | ||
} | ||
|
||
/** | ||
* Removes the given watch from the tracked collection of watches. | ||
* | ||
* @param watchId the ID for the watch that should be removed | ||
*/ | ||
public void removeWatch(long watchId) { | ||
watches.remove(watchId); | ||
} | ||
|
||
/** | ||
* Removes the given delta watch from the tracked collection of watches. | ||
* | ||
* @param watchId the ID for the delta watch that should be removed | ||
*/ | ||
public void removeDeltaWatch(long watchId) { | ||
deltaWatches.remove(watchId); | ||
} | ||
|
||
/** | ||
* Sets the timestamp of the last discovery watch request. | ||
* | ||
* @param lastWatchRequestTime the latest watch request timestamp | ||
*/ | ||
public void setLastWatchRequestTime(long lastWatchRequestTime) { | ||
this.lastWatchRequestTime = lastWatchRequestTime; | ||
} | ||
|
||
/** | ||
* Sets the timestamp of the last discovery delta watch request. | ||
* | ||
* @param lastDeltaWatchRequestTime the latest delta watch request timestamp | ||
*/ | ||
public void setLastDeltaWatchRequestTime(long lastDeltaWatchRequestTime) { | ||
this.lastDeltaWatchRequestTime = lastDeltaWatchRequestTime; | ||
} | ||
|
||
/** | ||
* Adds the given watch to the tracked collection of watches. | ||
* | ||
* @param watchId the ID for the watch that should be added | ||
* @param watch the watch that should be added | ||
*/ | ||
public void setWatch(long watchId, Watch watch) { | ||
watches.put(watchId, watch); | ||
} | ||
|
||
/** | ||
* Adds the given watch to the tracked collection of watches. | ||
* | ||
* @param watchId the ID for the watch that should be added | ||
* @param watch the watch that should be added | ||
*/ | ||
public void setDeltaWatch(long watchId, DeltaWatch watch) { | ||
deltaWatches.put(watchId, watch); | ||
} | ||
|
||
/** | ||
* Returns the set of IDs for all watched currently being tracked. | ||
*/ | ||
public Set<Long> watchIds() { | ||
return ImmutableSet.copyOf(watches.keySet()); | ||
} | ||
|
||
/** | ||
* Returns the set of IDs for all watched currently being tracked. | ||
*/ | ||
public Set<Long> deltaWatchIds() { | ||
return ImmutableSet.copyOf(deltaWatches.keySet()); | ||
} | ||
|
||
/** | ||
* Iterate over all tracked watches and execute the given function. If it returns {@code true}, then the watch is | ||
* removed from the tracked collection. If it returns {@code false}, then the watch is not removed. | ||
* | ||
* @param filter the function to execute on each watch | ||
*/ | ||
public void watchesRemoveIf(BiFunction<Long, Watch, Boolean> filter) { | ||
watches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue())); | ||
} | ||
|
||
/** | ||
* Iterate over all tracked delta watches and execute the given function. If it returns {@code true}, | ||
* then the watch is removed from the tracked collection. If it returns {@code false}, then | ||
* the watch is not removed. | ||
* | ||
* @param filter the function to execute on each delta watch | ||
*/ | ||
public void deltaWatchesRemoveIf(BiFunction<Long, DeltaWatch, Boolean> filter) { | ||
deltaWatches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue())); | ||
super(nodeGroup); | ||
} | ||
} |
77 changes: 77 additions & 0 deletions
77
cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfoAggregator.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package io.envoyproxy.controlplane.cache; | ||
|
||
import java.util.Collection; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
class CacheStatusInfoAggregator<T> { | ||
private final ConcurrentMap<T, ConcurrentMap<Resources.ResourceType, CacheStatusInfo<T>>> statuses = | ||
new ConcurrentHashMap<>(); | ||
private final ConcurrentMap<T, ConcurrentMap<Resources.ResourceType, DeltaCacheStatusInfo<T>>> deltaStatuses = | ||
new ConcurrentHashMap<>(); | ||
|
||
public Collection<T> groups() { | ||
return Stream.concat(statuses.keySet().stream(), deltaStatuses.keySet().stream()).collect(Collectors.toSet()); | ||
} | ||
|
||
void remove(T group) { | ||
statuses.remove(group); | ||
deltaStatuses.remove(group); | ||
} | ||
|
||
/** | ||
* Returns map of delta status infos for group identifier. | ||
* | ||
* @param group group identifier. | ||
*/ | ||
Map<Resources.ResourceType, DeltaCacheStatusInfo<T>> getDeltaStatus(T group) { | ||
return deltaStatuses.getOrDefault(group, new ConcurrentHashMap<>()); | ||
} | ||
|
||
/** | ||
* Returns map of status infos for group identifier. | ||
* | ||
* @param group group identifier. | ||
*/ | ||
Map<Resources.ResourceType, CacheStatusInfo<T>> getStatus(T group) { | ||
return statuses.getOrDefault(group, new ConcurrentHashMap<>()); | ||
} | ||
|
||
/** | ||
* Check if statuses for specific group have any watcher. | ||
* | ||
* @param group group identifier. | ||
* @return true if statuses for specific group have any watcher. | ||
*/ | ||
boolean hasStatuses(T group) { | ||
Map<Resources.ResourceType, CacheStatusInfo<T>> status = getStatus(group); | ||
Map<Resources.ResourceType, DeltaCacheStatusInfo<T>> deltaStatus = getDeltaStatus(group); | ||
return status.values().stream().mapToLong(CacheStatusInfo::numWatches).sum() | ||
+ deltaStatus.values().stream().mapToLong(DeltaCacheStatusInfo::numWatches).sum() > 0; | ||
} | ||
|
||
/** | ||
* Returns delta status info for group identifier and creates new one if it doesn't exist. | ||
* | ||
* @param group group identifier. | ||
* @param resourceType resource type. | ||
*/ | ||
DeltaCacheStatusInfo<T> getOrAddDeltaStatusInfo(T group, Resources.ResourceType resourceType) { | ||
return deltaStatuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>()) | ||
.computeIfAbsent(resourceType, s -> new DeltaCacheStatusInfo<>(group)); | ||
} | ||
|
||
/** | ||
* Returns status info for group identifier and creates new one if it doesn't exist. | ||
* | ||
* @param group group identifier. | ||
* @param resourceType resource type. | ||
*/ | ||
CacheStatusInfo<T> getOrAddStatusInfo(T group, Resources.ResourceType resourceType) { | ||
return statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>()) | ||
.computeIfAbsent(resourceType, s -> new CacheStatusInfo<>(group)); | ||
} | ||
} |
8 changes: 8 additions & 0 deletions
8
cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaCacheStatusInfo.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package io.envoyproxy.controlplane.cache; | ||
|
||
public class DeltaCacheStatusInfo<T> extends MutableStatusInfo<T, DeltaWatch> { | ||
|
||
public DeltaCacheStatusInfo(T nodeGroup) { | ||
super(nodeGroup); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it used somewhere ? :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is used via isCancelledUpdater in cancel() and in isCancelled()