diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/AbstractWatch.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/AbstractWatch.java new file mode 100644 index 000000000..9a90ddcf9 --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/AbstractWatch.java @@ -0,0 +1,73 @@ +package io.envoyproxy.controlplane.cache; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Consumer; + +public abstract class AbstractWatch { + + private static final AtomicIntegerFieldUpdater isCancelledUpdater = + AtomicIntegerFieldUpdater.newUpdater(AbstractWatch.class, "isCancelled"); + private final V request; + private final Consumer responseConsumer; + private volatile int isCancelled = 0; + private Runnable stop; + + /** + * Construct a watch. + * + * @param request the original request for the watch + * @param responseConsumer handler for outgoing response messages + */ + public AbstractWatch(V request, Consumer responseConsumer) { + this.request = request; + this.responseConsumer = responseConsumer; + } + + /** + * Cancel the watch. A watch must be cancelled in order to complete its resource stream and free resources. Cancel + * may be called multiple times, with each subsequent call being a no-op. + */ + public void cancel() { + if (isCancelledUpdater.compareAndSet(this, 0, 1)) { + if (stop != null) { + stop.run(); + } + } + } + + /** + * Returns boolean indicating whether or not the watch has been cancelled. + */ + public boolean isCancelled() { + return isCancelledUpdater.get(this) == 1; + } + + /** + * Returns the original request for the watch. + */ + public V request() { + return request; + } + + /** + * Sends the given response to the watch's response handler. + * + * @param response the response to be handled + * @throws WatchCancelledException if the watch has already been cancelled + */ + public void respond(T response) throws WatchCancelledException { + if (isCancelled()) { + throw new WatchCancelledException(); + } + + responseConsumer.accept(response); + } + + /** + * Sets the callback method to be executed when the watch is cancelled. Even if cancel is executed multiple times, it + * ensures that this stop callback is only executed once. + */ + public void setStop(Runnable stop) { + this.stop = stop; + } +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/Cache.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/Cache.java index 34529c1db..51237aae0 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/Cache.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/Cache.java @@ -20,5 +20,5 @@ public interface Cache extends ConfigWatcher { * * @param group the node group whose status is being fetched */ - StatusInfo statusInfo(T group); + StatusInfo statusInfo(T group); } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java index 271b4e037..cf4d620b7 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java @@ -1,10 +1,5 @@ package io.envoyproxy.controlplane.cache; -import com.google.common.collect.ImmutableSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.BiFunction; import javax.annotation.concurrent.ThreadSafe; /** @@ -12,141 +7,8 @@ * implementations. */ @ThreadSafe -public class CacheStatusInfo implements StatusInfo { - - private final T nodeGroup; - - private final ConcurrentMap watches = new ConcurrentHashMap<>(); - private final ConcurrentMap deltaWatches = new ConcurrentHashMap<>(); - private volatile long lastWatchRequestTime; - private volatile long lastDeltaWatchRequestTime; - +public class CacheStatusInfo extends MutableStatusInfo { public CacheStatusInfo(T nodeGroup) { - this.nodeGroup = nodeGroup; - } - - /** - * {@inheritDoc} - */ - @Override - public long lastWatchRequestTime() { - return lastWatchRequestTime; - } - - @Override - public long lastDeltaWatchRequestTime() { - return lastDeltaWatchRequestTime; - } - - /** - * {@inheritDoc} - */ - @Override - public T nodeGroup() { - return nodeGroup; - } - - /** - * {@inheritDoc} - */ - @Override - public int numWatches() { - return watches.size(); - } - - @Override - public int numDeltaWatches() { - return deltaWatches.size(); - } - - /** - * Removes the given watch from the tracked collection of watches. - * - * @param watchId the ID for the watch that should be removed - */ - public void removeWatch(long watchId) { - watches.remove(watchId); - } - - /** - * Removes the given delta watch from the tracked collection of watches. - * - * @param watchId the ID for the delta watch that should be removed - */ - public void removeDeltaWatch(long watchId) { - deltaWatches.remove(watchId); - } - - /** - * Sets the timestamp of the last discovery watch request. - * - * @param lastWatchRequestTime the latest watch request timestamp - */ - public void setLastWatchRequestTime(long lastWatchRequestTime) { - this.lastWatchRequestTime = lastWatchRequestTime; - } - - /** - * Sets the timestamp of the last discovery delta watch request. - * - * @param lastDeltaWatchRequestTime the latest delta watch request timestamp - */ - public void setLastDeltaWatchRequestTime(long lastDeltaWatchRequestTime) { - this.lastDeltaWatchRequestTime = lastDeltaWatchRequestTime; - } - - /** - * Adds the given watch to the tracked collection of watches. - * - * @param watchId the ID for the watch that should be added - * @param watch the watch that should be added - */ - public void setWatch(long watchId, Watch watch) { - watches.put(watchId, watch); - } - - /** - * Adds the given watch to the tracked collection of watches. - * - * @param watchId the ID for the watch that should be added - * @param watch the watch that should be added - */ - public void setDeltaWatch(long watchId, DeltaWatch watch) { - deltaWatches.put(watchId, watch); - } - - /** - * Returns the set of IDs for all watched currently being tracked. - */ - public Set watchIds() { - return ImmutableSet.copyOf(watches.keySet()); - } - - /** - * Returns the set of IDs for all watched currently being tracked. - */ - public Set deltaWatchIds() { - return ImmutableSet.copyOf(deltaWatches.keySet()); - } - - /** - * Iterate over all tracked watches and execute the given function. If it returns {@code true}, then the watch is - * removed from the tracked collection. If it returns {@code false}, then the watch is not removed. - * - * @param filter the function to execute on each watch - */ - public void watchesRemoveIf(BiFunction filter) { - watches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue())); - } - - /** - * Iterate over all tracked delta watches and execute the given function. If it returns {@code true}, - * then the watch is removed from the tracked collection. If it returns {@code false}, then - * the watch is not removed. - * - * @param filter the function to execute on each delta watch - */ - public void deltaWatchesRemoveIf(BiFunction filter) { - deltaWatches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue())); + super(nodeGroup); } } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfoAggregator.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfoAggregator.java new file mode 100644 index 000000000..fea19e88a --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfoAggregator.java @@ -0,0 +1,77 @@ +package io.envoyproxy.controlplane.cache; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +class CacheStatusInfoAggregator { + private final ConcurrentMap>> statuses = + new ConcurrentHashMap<>(); + private final ConcurrentMap>> deltaStatuses = + new ConcurrentHashMap<>(); + + public Collection groups() { + return Stream.concat(statuses.keySet().stream(), deltaStatuses.keySet().stream()).collect(Collectors.toSet()); + } + + void remove(T group) { + statuses.remove(group); + deltaStatuses.remove(group); + } + + /** + * Returns map of delta status infos for group identifier. + * + * @param group group identifier. + */ + Map> getDeltaStatus(T group) { + return deltaStatuses.getOrDefault(group, new ConcurrentHashMap<>()); + } + + /** + * Returns map of status infos for group identifier. + * + * @param group group identifier. + */ + Map> getStatus(T group) { + return statuses.getOrDefault(group, new ConcurrentHashMap<>()); + } + + /** + * Check if statuses for specific group have any watcher. + * + * @param group group identifier. + * @return true if statuses for specific group have any watcher. + */ + boolean hasStatuses(T group) { + Map> status = getStatus(group); + Map> deltaStatus = getDeltaStatus(group); + return status.values().stream().mapToLong(CacheStatusInfo::numWatches).sum() + + deltaStatus.values().stream().mapToLong(DeltaCacheStatusInfo::numWatches).sum() > 0; + } + + /** + * Returns delta status info for group identifier and creates new one if it doesn't exist. + * + * @param group group identifier. + * @param resourceType resource type. + */ + DeltaCacheStatusInfo getOrAddDeltaStatusInfo(T group, Resources.ResourceType resourceType) { + return deltaStatuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>()) + .computeIfAbsent(resourceType, s -> new DeltaCacheStatusInfo<>(group)); + } + + /** + * Returns status info for group identifier and creates new one if it doesn't exist. + * + * @param group group identifier. + * @param resourceType resource type. + */ + CacheStatusInfo getOrAddStatusInfo(T group, Resources.ResourceType resourceType) { + return statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>()) + .computeIfAbsent(resourceType, s -> new CacheStatusInfo<>(group)); + } +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaCacheStatusInfo.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaCacheStatusInfo.java new file mode 100644 index 000000000..355a7dfdd --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaCacheStatusInfo.java @@ -0,0 +1,8 @@ +package io.envoyproxy.controlplane.cache; + +public class DeltaCacheStatusInfo extends MutableStatusInfo { + + public DeltaCacheStatusInfo(T nodeGroup) { + super(nodeGroup); + } +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaWatch.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaWatch.java index fc9697291..390e1fb04 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaWatch.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaWatch.java @@ -2,24 +2,17 @@ import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Consumer; /** * {@code Watch} is a dedicated stream of configuration resources produced by the configuration cache and consumed by * the xDS server. */ -public class DeltaWatch { - private static final AtomicIntegerFieldUpdater isCancelledUpdater = - AtomicIntegerFieldUpdater.newUpdater(DeltaWatch.class, "isCancelled"); - private final DeltaXdsRequest request; - private final Consumer responseConsumer; +public class DeltaWatch extends AbstractWatch { private final Map resourceVersions; private final Set pendingResources; private final boolean isWildcard; private final String version; - private volatile int isCancelled = 0; - private Runnable stop; /** * Construct a watch. @@ -35,38 +28,11 @@ public DeltaWatch(DeltaXdsRequest request, String version, boolean isWildcard, Consumer responseConsumer) { - this.request = request; + super(request, responseConsumer); this.resourceVersions = resourceVersions; this.pendingResources = pendingResources; this.version = version; this.isWildcard = isWildcard; - this.responseConsumer = responseConsumer; - } - - /** - * Cancel the watch. A watch must be cancelled in order to complete its resource stream and free resources. Cancel - * may be called multiple times, with each subsequent call being a no-op. - */ - public void cancel() { - if (isCancelledUpdater.compareAndSet(this, 0, 1)) { - if (stop != null) { - stop.run(); - } - } - } - - /** - * Returns boolean indicating whether or not the watch has been cancelled. - */ - public boolean isCancelled() { - return isCancelledUpdater.get(this) == 1; - } - - /** - * Returns the original request for the watch. - */ - public DeltaXdsRequest request() { - return request; } /** @@ -97,25 +63,4 @@ public boolean isWildcard() { return isWildcard; } - /** - * Sends the given response to the watch's response handler. - * - * @param response the response to be handled - * @throws WatchCancelledException if the watch has already been cancelled - */ - public void respond(DeltaResponse response) throws WatchCancelledException { - if (isCancelled()) { - throw new WatchCancelledException(); - } - - responseConsumer.accept(response); - } - - /** - * Sets the callback method to be executed when the watch is cancelled. Even if cancel is executed multiple times, it - * ensures that this stop callback is only executed once. - */ - public void setStop(Runnable stop) { - this.stop = stop; - } } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/GroupCacheStatusInfo.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/GroupCacheStatusInfo.java index 666682848..a2ef899ad 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/GroupCacheStatusInfo.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/GroupCacheStatusInfo.java @@ -9,9 +9,9 @@ */ @ThreadSafe class GroupCacheStatusInfo implements StatusInfo { - private final Collection> statuses; + private final Collection> statuses; - public GroupCacheStatusInfo(Collection> statuses) { + public GroupCacheStatusInfo(Collection> statuses) { this.statuses = new ArrayList<>(statuses); } @@ -20,12 +20,7 @@ public GroupCacheStatusInfo(Collection> statuses) { */ @Override public long lastWatchRequestTime() { - return statuses.stream().mapToLong(CacheStatusInfo::lastWatchRequestTime).max().orElse(0); - } - - @Override - public long lastDeltaWatchRequestTime() { - return statuses.stream().mapToLong(CacheStatusInfo::lastDeltaWatchRequestTime).max().orElse(0); + return statuses.stream().mapToLong(StatusInfo::lastWatchRequestTime).max().orElse(0); } /** @@ -33,7 +28,7 @@ public long lastDeltaWatchRequestTime() { */ @Override public T nodeGroup() { - return statuses.stream().map(CacheStatusInfo::nodeGroup).findFirst().orElse(null); + return statuses.stream().map(StatusInfo::nodeGroup).findFirst().orElse(null); } /** @@ -41,11 +36,7 @@ public T nodeGroup() { */ @Override public int numWatches() { - return statuses.stream().mapToInt(CacheStatusInfo::numWatches).sum(); + return statuses.stream().mapToInt(StatusInfo::numWatches).sum(); } - @Override - public int numDeltaWatches() { - return statuses.stream().mapToInt(CacheStatusInfo::numDeltaWatches).sum(); - } } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/MutableStatusInfo.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/MutableStatusInfo.java new file mode 100644 index 000000000..14a82b07f --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/MutableStatusInfo.java @@ -0,0 +1,85 @@ +package io.envoyproxy.controlplane.cache; + +import com.google.common.collect.ImmutableSet; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiFunction; + +class MutableStatusInfo> implements StatusInfo { + private final ConcurrentMap watches = new ConcurrentHashMap<>(); + private final T nodeGroup; + private volatile long lastWatchRequestTime; + + protected MutableStatusInfo(T nodeGroup) { + this.nodeGroup = nodeGroup; + } + + /** + * {@inheritDoc} + */ + public long lastWatchRequestTime() { + return lastWatchRequestTime; + } + + /** + * {@inheritDoc} + */ + public T nodeGroup() { + return nodeGroup; + } + + /** + * {@inheritDoc} + */ + public int numWatches() { + return watches.size(); + } + + /** + * Removes the given watch from the tracked collection of watches. + * + * @param watchId the ID for the watch that should be removed + */ + public void removeWatch(long watchId) { + watches.remove(watchId); + } + + + /** + * Sets the timestamp of the last discovery watch request. + * + * @param lastWatchRequestTime the latest watch request timestamp + */ + public void setLastWatchRequestTime(long lastWatchRequestTime) { + this.lastWatchRequestTime = lastWatchRequestTime; + } + + /** + * Adds the given watch to the tracked collection of watches. + * + * @param watchId the ID for the watch that should be added + * @param watch the watch that should be added + */ + public void setWatch(long watchId, V watch) { + watches.put(watchId, watch); + } + + /** + * Returns the set of IDs for all watched currently being tracked. + */ + public Set watchIds() { + return ImmutableSet.copyOf(watches.keySet()); + } + + /** + * Iterate over all tracked watches and execute the given function. If it returns {@code true}, then the watch is + * removed from the tracked collection. If it returns {@code false}, then the watch is not removed. + * + * @param filter the function to execute on each watch + */ + public void watchesRemoveIf(BiFunction filter) { + watches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue())); + } +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java index 1aaa082dd..457bdb8d8 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java @@ -16,8 +16,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -25,6 +23,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.concurrent.GuardedBy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,8 +49,7 @@ public abstract class SimpleCache implements SnapshotCach @GuardedBy("lock") private final Map snapshots = new HashMap<>(); - private final ConcurrentMap>> statuses = - new ConcurrentHashMap<>(); + private final CacheStatusInfoAggregator statuses = new CacheStatusInfoAggregator<>(); private AtomicLong watchCount = new AtomicLong(); @@ -72,10 +70,8 @@ public boolean clearSnapshot(T group) { // we take a writeLock to prevent watches from being created writeLock.lock(); try { - Map> status = statuses.get(group); - // If we don't know about this group, do nothing. - if (status != null && status.values().stream().mapToLong(CacheStatusInfo::numWatches).sum() > 0) { + if (statuses.hasStatuses(group)) { LOGGER.warn("tried to clear snapshot for group with existing watches, group={}", group); return false; @@ -122,8 +118,7 @@ public Watch createWatch( // doesn't conflict readLock.lock(); try { - CacheStatusInfo status = statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>()) - .computeIfAbsent(requestResourceType, s -> new CacheStatusInfo<>(group)); + CacheStatusInfo status = statuses.getOrAddStatusInfo(group, requestResourceType); status.setLastWatchRequestTime(System.currentTimeMillis()); U snapshot = snapshots.get(group); @@ -159,21 +154,7 @@ public Watch createWatch( // If the requested version is up-to-date or missing a response, leave an open watch. if (snapshot == null || request.getVersionInfo().equals(version)) { - long watchId = watchCount.incrementAndGet(); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("open watch {} for {}[{}] from node {} for version {}", - watchId, - request.getTypeUrl(), - String.join(", ", request.getResourceNamesList()), - group, - request.getVersionInfo()); - } - - status.setWatch(watchId, watch); - - watch.setStop(() -> status.removeWatch(watchId)); - + openWatch(status, watch, request.getTypeUrl(), request.getResourceNamesList(), group, request.getVersionInfo()); return watch; } @@ -181,20 +162,7 @@ public Watch createWatch( boolean responded = respond(watch, snapshot, group); if (!responded) { - long watchId = watchCount.incrementAndGet(); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("did not respond immediately, leaving open watch {} for {}[{}] from node {} for version {}", - watchId, - request.getTypeUrl(), - String.join(", ", request.getResourceNamesList()), - group, - request.getVersionInfo()); - } - - status.setWatch(watchId, watch); - - watch.setStop(() -> status.removeWatch(watchId)); + openWatch(status, watch, request.getTypeUrl(), request.getResourceNamesList(), group, request.getVersionInfo()); } return watch; @@ -230,8 +198,7 @@ public DeltaWatch createDeltaWatch( // doesn't conflict readLock.lock(); try { - CacheStatusInfo status = statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>()) - .computeIfAbsent(requestResourceType, s -> new CacheStatusInfo<>(group)); + DeltaCacheStatusInfo status = statuses.getOrAddDeltaStatusInfo(group, requestResourceType); status.setLastWatchRequestTime(System.currentTimeMillis()); U snapshot = snapshots.get(group); @@ -247,16 +214,7 @@ public DeltaWatch createDeltaWatch( // If no snapshot, leave an open watch. if (snapshot == null) { - long watchId = setDeltaWatch(status, watch); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("open watch {} for {}[{}] from node {} for version {}", - watchId, - request.getTypeUrl(), - String.join(", ", watch.trackedResources().keySet()), - group, - requesterVersion); - } - + openWatch(status, watch, request.getTypeUrl(), watch.trackedResources().keySet(), group, requesterVersion); return watch; } @@ -276,73 +234,51 @@ public DeltaWatch createDeltaWatch( Collections.emptyList(), version, group); - if (responseState.equals(ResponseState.RESPONDED) || responseState.equals(ResponseState.CANCELLED)) { + if (responseState.isFinished()) { return watch; } } else if (hasClusterChanged && requestResourceType.equals(ResourceType.ENDPOINT)) { - Map> snapshotResources = snapshot.versionedResources(request.getResourceType()); - List removedResources = findRemovedResources(watch, - snapshotResources); - Map> changedResources = findChangedResources(watch, snapshotResources); - ResponseState responseState = respondDelta( - watch, - changedResources, - removedResources, - version, - group); - if (responseState.equals(ResponseState.RESPONDED) || responseState.equals(ResponseState.CANCELLED)) { + ResponseState responseState = respondDelta(request, group, snapshot, version, watch); + if (responseState.isFinished()) { return watch; } } - long watchId = setDeltaWatch(status, watch); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("open watch {} for {}[{}] from node {} for version {}", - watchId, - request.getTypeUrl(), - String.join(", ", watch.trackedResources().keySet()), - group, - requesterVersion); - } + openWatch(status, watch, request.getTypeUrl(), watch.trackedResources().keySet(), group, requesterVersion); return watch; } // Otherwise, version is different, the watch may be responded immediately - Map> snapshotResources = snapshot.versionedResources(request.getResourceType()); - List removedResources = findRemovedResources(watch, - snapshotResources); - Map> changedResources = findChangedResources(watch, snapshotResources); - ResponseState responseState = respondDelta(watch, - changedResources, - removedResources, - version, - group); - if (responseState.equals(ResponseState.RESPONDED) || responseState.equals(ResponseState.CANCELLED)) { + ResponseState responseState = respondDelta(request, group, snapshot, version, watch); + if (responseState.isFinished()) { return watch; } - long watchId = setDeltaWatch(status, watch); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("did not respond immediately, leaving open watch {} for {}[{}] from node {} for version {}", - watchId, - request.getTypeUrl(), - String.join(", ", watch.trackedResources().keySet()), - group, - requesterVersion); - } - + openWatch(status, watch, request.getTypeUrl(), watch.trackedResources().keySet(), group, requesterVersion); return watch; } finally { readLock.unlock(); } } - private long setDeltaWatch(CacheStatusInfo status, DeltaWatch watch) { + private > void openWatch(MutableStatusInfo status, + V watch, + String url, + Collection resources, + T group, + String version) { long watchId = watchCount.incrementAndGet(); - status.setDeltaWatch(watchId, watch); - watch.setStop(() -> status.removeDeltaWatch(watchId)); - return watchId; + status.setWatch(watchId, watch); + watch.setStop(() -> status.removeWatch(watchId)); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("open watch {} for {}[{}] from node {} for version {}", + watchId, + url, + String.join(", ", resources), + group, + version); + } } /** @@ -364,7 +300,7 @@ public U getSnapshot(T group) { */ @Override public Collection groups() { - return ImmutableSet.copyOf(statuses.keySet()); + return ImmutableSet.copyOf(statuses.groups()); } /** @@ -373,40 +309,47 @@ public Collection groups() { @Override public synchronized void setSnapshot(T group, U snapshot) { // we take a writeLock to prevent watches from being created while we update the snapshot - ConcurrentMap> status; + Map> status; + Map> deltaStatus; U previousSnapshot; writeLock.lock(); try { // Update the existing snapshot entry. previousSnapshot = snapshots.put(group, snapshot); - status = statuses.get(group); + status = statuses.getStatus(group); + deltaStatus = statuses.getDeltaStatus(group); } finally { writeLock.unlock(); } - if (status == null) { + if (status.isEmpty() && deltaStatus.isEmpty()) { return; } // Responses should be in specific order and typeUrls has a list of resources in the right // order. - respondWithSpecificOrder(group, previousSnapshot, snapshot, status); + respondWithSpecificOrder(group, previousSnapshot, snapshot, status, deltaStatus); } /** * {@inheritDoc} */ @Override - public StatusInfo statusInfo(T group) { + public StatusInfo statusInfo(T group) { readLock.lock(); try { - ConcurrentMap> statusMap = statuses.get(group); - if (statusMap == null || statusMap.isEmpty()) { + Map> statusMap = statuses.getStatus(group); + Map> deltaStatusMap = statuses.getDeltaStatus(group); + + if (statusMap.isEmpty() && deltaStatusMap.isEmpty()) { return null; } - return new GroupCacheStatusInfo<>(statusMap.values()); + List> collection = Stream.concat(statusMap.values().stream(), + deltaStatusMap.values().stream()).collect(Collectors.toList()); + + return new GroupCacheStatusInfo<>(collection); } finally { readLock.unlock(); } @@ -415,87 +358,87 @@ public StatusInfo statusInfo(T group) { @VisibleForTesting protected void respondWithSpecificOrder(T group, U previousSnapshot, U snapshot, - ConcurrentMap> statusMap) { + Map> statusMap, Map> deltaStatusMap) { for (ResourceType resourceType : RESOURCE_TYPES_IN_ORDER) { CacheStatusInfo status = statusMap.get(resourceType); - if (status == null) { - continue; - } - - status.watchesRemoveIf((id, watch) -> { - if (!watch.request().getResourceType().equals(resourceType)) { - return false; - } - String version = snapshot.version(watch.request().getResourceType(), - watch.request().getResourceNamesList()); - - if (!watch.request().getVersionInfo().equals(version)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("responding to open watch {}[{}] with new version {}", - id, - String.join(", ", watch.request().getResourceNamesList()), - version); + if (status != null) { + status.watchesRemoveIf((id, watch) -> { + if (!watch.request().getResourceType().equals(resourceType)) { + return false; } + String version = snapshot.version(watch.request().getResourceType(), + watch.request().getResourceNamesList()); - respond(watch, snapshot, group); + if (!watch.request().getVersionInfo().equals(version)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("responding to open watch {}[{}] with new version {}", + id, + String.join(", ", watch.request().getResourceNamesList()), + version); + } - // Discard the watch. A new watch will be created for future snapshots once envoy ACKs the response. - return true; - } + respond(watch, snapshot, group); - // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. - return false; - }); - - Map> previousResources = previousSnapshot == null - ? Collections.emptyMap() - : previousSnapshot.versionedResources(resourceType); - Map> snapshotResources = snapshot.versionedResources(resourceType); - - Map> snapshotChangedResources = snapshotResources.entrySet() - .stream() - .filter(entry -> { - VersionedResource versionedResource = previousResources.get(entry.getKey()); - return versionedResource == null || !versionedResource - .version().equals(entry.getValue().version()); - }) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - Set snapshotRemovedResources = previousResources.keySet() - .stream() - .filter(s -> !snapshotResources.containsKey(s)) - .collect(Collectors.toSet()); - - status.deltaWatchesRemoveIf((id, watch) -> { - String version = snapshot.version(watch.request().getResourceType(), Collections.emptyList()); - - if (!watch.version().equals(version)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("responding to open watch {}[{}] with new version {}", - id, - String.join(", ", watch.trackedResources().keySet()), - version); + // Discard the watch. A new watch will be created for future snapshots once envoy ACKs the response. + return true; } - List removedResources = snapshotRemovedResources.stream() - .filter(s -> watch.trackedResources().get(s) != null) - .collect(Collectors.toList()); - - Map> changedResources = findChangedResources(watch, snapshotChangedResources); + // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. + return false; + }); + } - ResponseState responseState = respondDelta(watch, - changedResources, - removedResources, - version, - group); - // Discard the watch if it was responded or cancelled. - // A new watch will be created for future snapshots once envoy ACKs the response. - return ResponseState.RESPONDED.equals(responseState) || ResponseState.CANCELLED.equals(responseState); - } + DeltaCacheStatusInfo deltaStatus = deltaStatusMap.get(resourceType); + if (deltaStatus != null) { + Map> previousResources = previousSnapshot == null + ? Collections.emptyMap() + : previousSnapshot.versionedResources(resourceType); + Map> snapshotResources = snapshot.versionedResources(resourceType); + + Map> snapshotChangedResources = snapshotResources.entrySet() + .stream() + .filter(entry -> { + VersionedResource versionedResource = previousResources.get(entry.getKey()); + return versionedResource == null || !versionedResource + .version().equals(entry.getValue().version()); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Set snapshotRemovedResources = previousResources.keySet() + .stream() + .filter(s -> !snapshotResources.containsKey(s)) + .collect(Collectors.toSet()); + deltaStatus.watchesRemoveIf((id, watch) -> { + String version = snapshot.version(watch.request().getResourceType(), Collections.emptyList()); + + if (!watch.version().equals(version)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("responding to open watch {}[{}] with new version {}", + id, + String.join(", ", watch.trackedResources().keySet()), + version); + } + + List removedResources = snapshotRemovedResources.stream() + .filter(s -> watch.trackedResources().get(s) != null) + .collect(Collectors.toList()); + + Map> changedResources = findChangedResources(watch, snapshotChangedResources); + + ResponseState responseState = respondDelta(watch, + changedResources, + removedResources, + version, + group); + // Discard the watch if it was responded or cancelled. + // A new watch will be created for future snapshots once envoy ACKs the response. + return responseState.isFinished(); + } - // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. - return false; - }); + // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. + return false; + }); + } } } @@ -591,6 +534,19 @@ private Map> findChangedResources(DeltaWatch watch, .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } + private ResponseState respondDelta(DeltaXdsRequest request, T group, U snapshot, String version, DeltaWatch watch) { + Map> snapshotResources = snapshot.versionedResources(request.getResourceType()); + List removedResources = findRemovedResources(watch, + snapshotResources); + Map> changedResources = findChangedResources(watch, snapshotResources); + return respondDelta( + watch, + changedResources, + removedResources, + version, + group); + } + private ResponseState respondDelta(DeltaWatch watch, Map> resources, List removedResources, @@ -623,6 +579,11 @@ private ResponseState respondDelta(DeltaWatch watch, private enum ResponseState { RESPONDED, UNRESPONDED, - CANCELLED + CANCELLED; + + private boolean isFinished() { + return this.equals(RESPONDED) || this.equals(CANCELLED); + } + } } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/StatusInfo.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/StatusInfo.java index 4daafdd84..0b08a882a 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/StatusInfo.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/StatusInfo.java @@ -6,17 +6,11 @@ * {@code StatusInfo} tracks the state for remote envoy nodes. */ public interface StatusInfo { - /** * Returns the timestamp of the last discovery watch request. */ long lastWatchRequestTime(); - /** - * Returns the timestamp of the last discovery delta watch request. - */ - long lastDeltaWatchRequestTime(); - /** * Returns the node grouping represented by this status, generated via * {@link NodeGroup#hash(Node)} or {@link NodeGroup#hash(io.envoyproxy.envoy.api.v2.core.Node)}. @@ -28,8 +22,4 @@ public interface StatusInfo { */ int numWatches(); - /** - * Returns the number of open delta watches. - */ - int numDeltaWatches(); } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/Watch.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/Watch.java index 76605fac3..69bc6e6e0 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/Watch.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/Watch.java @@ -1,20 +1,13 @@ package io.envoyproxy.controlplane.cache; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Consumer; /** * {@code Watch} is a dedicated stream of configuration resources produced by the configuration cache and consumed by * the xDS server. */ -public class Watch { - private static final AtomicIntegerFieldUpdater isCancelledUpdater = - AtomicIntegerFieldUpdater.newUpdater(Watch.class, "isCancelled"); +public class Watch extends AbstractWatch { private final boolean ads; - private final XdsRequest request; - private final Consumer responseConsumer; - private volatile int isCancelled = 0; - private Runnable stop; /** * Construct a watch. @@ -24,9 +17,8 @@ public class Watch { * @param responseConsumer handler for outgoing response messages */ public Watch(boolean ads, XdsRequest request, Consumer responseConsumer) { + super(request, responseConsumer); this.ads = ads; - this.request = request; - this.responseConsumer = responseConsumer; } /** @@ -36,51 +28,4 @@ public boolean ads() { return ads; } - /** - * Cancel the watch. A watch must be cancelled in order to complete its resource stream and free resources. Cancel - * may be called multiple times, with each subsequent call being a no-op. - */ - public void cancel() { - if (isCancelledUpdater.compareAndSet(this, 0, 1)) { - if (stop != null) { - stop.run(); - } - } - } - - /** - * Returns boolean indicating whether or not the watch has been cancelled. - */ - public boolean isCancelled() { - return isCancelledUpdater.get(this) == 1; - } - - /** - * Returns the original request for the watch. - */ - public XdsRequest request() { - return request; - } - - /** - * Sends the given response to the watch's response handler. - * - * @param response the response to be handled - * @throws WatchCancelledException if the watch has already been cancelled - */ - public void respond(Response response) throws WatchCancelledException { - if (isCancelled()) { - throw new WatchCancelledException(); - } - - responseConsumer.accept(response); - } - - /** - * Sets the callback method to be executed when the watch is cancelled. Even if cancel is executed multiple times, it - * ensures that this stop callback is only executed once. - */ - public void setStop(Runnable stop) { - this.stop = stop; - } } diff --git a/cache/src/test/java/io/envoyproxy/controlplane/cache/v2/SnapshotTest.java b/cache/src/test/java/io/envoyproxy/controlplane/cache/v2/SnapshotTest.java index 129872839..39673c010 100644 --- a/cache/src/test/java/io/envoyproxy/controlplane/cache/v2/SnapshotTest.java +++ b/cache/src/test/java/io/envoyproxy/controlplane/cache/v2/SnapshotTest.java @@ -12,6 +12,7 @@ import com.google.common.collect.ImmutableList; import io.envoyproxy.controlplane.cache.SnapshotConsistencyException; import io.envoyproxy.controlplane.cache.TestResources; +import io.envoyproxy.controlplane.cache.VersionedResource; import io.envoyproxy.envoy.api.v2.Cluster; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; import io.envoyproxy.envoy.api.v2.Listener; @@ -114,7 +115,6 @@ public void createSeparateVersionsSetsResourcesCorrectly() { } @Test - @SuppressWarnings("unchecked") public void resourcesReturnsExpectedResources() { Snapshot snapshot = Snapshot.create( ImmutableList.of(CLUSTER), @@ -124,25 +124,21 @@ public void resourcesReturnsExpectedResources() { ImmutableList.of(SECRET), UUID.randomUUID().toString()); - // We have to do some lame casting to appease java's compiler, otherwise it fails to compile due to limitations with - // generic type constraints. - - // todo: come back here - // assertThat(snapshot.resources(CLUSTER_TYPE_URL)) - // .containsEntry(CLUSTER_NAME, CLUSTER) - // .hasSize(1); - // - // assertThat(snapshot.resources(ENDPOINT_TYPE_URL)) - // .containsEntry(CLUSTER_NAME, ENDPOINT) - // .hasSize(1); - // - // assertThat(snapshot.resources(LISTENER_TYPE_URL)) - // .containsEntry(LISTENER_NAME, LISTENER) - // .hasSize(1); - // - // assertThat(snapshot.resources(ROUTE_TYPE_URL)) - // .containsEntry(ROUTE_NAME, ROUTE) - // .hasSize(1); + assertThat(snapshot.resources(CLUSTER_TYPE_URL)) + .containsEntry(CLUSTER_NAME, VersionedResource.create(CLUSTER)) + .hasSize(1); + + assertThat(snapshot.resources(ENDPOINT_TYPE_URL)) + .containsEntry(CLUSTER_NAME, VersionedResource.create(ENDPOINT)) + .hasSize(1); + + assertThat(snapshot.resources(LISTENER_TYPE_URL)) + .containsEntry(LISTENER_NAME, VersionedResource.create(LISTENER)) + .hasSize(1); + + assertThat(snapshot.resources(ROUTE_TYPE_URL)) + .containsEntry(ROUTE_NAME, VersionedResource.create(ROUTE)) + .hasSize(1); String nullString = null; assertThat(snapshot.version(nullString)).isEmpty();