diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 37a8e19ef3f..25918facf7e 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -548,7 +548,7 @@ public void run() { if (clusterRefs.get(cluster).refCount.get() != 0) { throw new AssertionError(); } - clusterRefs.remove(cluster); + clusterRefs.remove(cluster).close(); if (resolveState.lastConfigOrStatus.hasValue()) { updateResolutionResult(resolveState.lastConfigOrStatus.getValue()); } else { @@ -793,9 +793,13 @@ private void updateRoutes( clusterRefs.get(cluster).refCount.incrementAndGet(); } else { if (clusterNameMap.containsKey(cluster)) { + assert cluster.startsWith("cluster:"); + XdsConfig.Subscription subscription = + xdsDependencyManager.subscribeToCluster(cluster.substring("cluster:".length())); clusterRefs.put( cluster, - ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster))); + ClusterRefState.forCluster( + new AtomicInteger(1), clusterNameMap.get(cluster), subscription)); } if (rlsPluginConfigMap.containsKey(cluster)) { clusterRefs.put( @@ -826,7 +830,7 @@ private void updateRoutes( for (String cluster : deletedClusters) { int count = clusterRefs.get(cluster).refCount.decrementAndGet(); if (count == 0) { - clusterRefs.remove(cluster); + clusterRefs.remove(cluster).close(); shouldUpdateResult = true; } } @@ -879,7 +883,7 @@ private void cleanUpRoutes(Status error) { for (String cluster : existingClusters) { int count = clusterRefs.get(cluster).refCount.decrementAndGet(); if (count == 0) { - clusterRefs.remove(cluster); + clusterRefs.remove(cluster).close(); } } existingClusters = null; @@ -965,15 +969,18 @@ private static class ClusterRefState { final String traditionalCluster; @Nullable final RlsPluginConfig rlsPluginConfig; + @Nullable + final XdsConfig.Subscription subscription; private ClusterRefState( AtomicInteger refCount, @Nullable String traditionalCluster, - @Nullable RlsPluginConfig rlsPluginConfig) { + @Nullable RlsPluginConfig rlsPluginConfig, @Nullable XdsConfig.Subscription subscription) { this.refCount = refCount; checkArgument(traditionalCluster == null ^ rlsPluginConfig == null, "There must be exactly one non-null value in traditionalCluster and pluginConfig"); this.traditionalCluster = traditionalCluster; this.rlsPluginConfig = rlsPluginConfig; + this.subscription = subscription; } private Map toLbPolicy() { @@ -993,12 +1000,21 @@ private ClusterRefState( } } - static ClusterRefState forCluster(AtomicInteger refCount, String name) { - return new ClusterRefState(refCount, name, null); + private void close() { + if (subscription != null) { + subscription.close(); + } + } + + static ClusterRefState forCluster( + AtomicInteger refCount, String name, XdsConfig.Subscription subscription) { + return new ClusterRefState(refCount, name, null, checkNotNull(subscription, "subscription")); } - static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) { - return new ClusterRefState(refCount, null, rlsPluginConfig); + static ClusterRefState forRlsPlugin( + AtomicInteger refCount, + RlsPluginConfig rlsPluginConfig) { + return new ClusterRefState(refCount, null, rlsPluginConfig, null); } } }