Skip to content

xds: XdsNR should be subscribing to clusters with XdsDepManager #12154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ public void run() {
if (clusterRefs.get(cluster).refCount.get() != 0) {
throw new AssertionError();
}
clusterRefs.remove(cluster);
clusterRefs.remove(cluster).close();
if (resolveState.lastConfigOrStatus.hasValue()) {
updateResolutionResult(resolveState.lastConfigOrStatus.getValue());
} else {
Expand Down Expand Up @@ -793,9 +793,13 @@ private void updateRoutes(
clusterRefs.get(cluster).refCount.incrementAndGet();
} else {
if (clusterNameMap.containsKey(cluster)) {
assert cluster.startsWith("cluster:");
XdsConfig.Subscription subscription =
xdsDependencyManager.subscribeToCluster(cluster.substring("cluster:".length()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not retrieving the subscription in CdsLoadBalancer2. The clusterSubscription field is only ever assigned in the case of dynamic cluster, and not otherwise. So for non dynamic cluster it will still be null and cause "Unable to find non-dynamic root cluster" error?
What is the race condition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RPCs use the current route configuration when they were created. But it takes time for them to progress through the filters, such that the route configuration could be different by the time they get to the terminating filter router and do a pick. So XdsNameResolver already has reference counting to keep clusters alive that are only pointed to by old route configurations that are still in use in RPCs.

When a new route configuration is used that points to different clusters, the old clusters will be removed from the XdsConfig, but XdsNR will be keeping the old CdsLB2 instances alive as long as RPCs still need them. Before A74 CdsLB2 would still have an xdsClient watch for that cluster, but before this change it will be receiving the XdsConfig and see the missing cluster. So as long as the XdsNR is keeping the CdsLB2 instance alive, it also needs to keep the subscription to that cluster for XdsConfig.

This case was tested in FakeControlPlaneXdsIntegrationTest.changeClusterForRoute, which is the test that was flaky.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, thanks.

clusterRefs.put(
cluster,
ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster)));
ClusterRefState.forCluster(
new AtomicInteger(1), clusterNameMap.get(cluster), subscription));
}
if (rlsPluginConfigMap.containsKey(cluster)) {
clusterRefs.put(
Expand Down Expand Up @@ -826,7 +830,7 @@ private void updateRoutes(
for (String cluster : deletedClusters) {
int count = clusterRefs.get(cluster).refCount.decrementAndGet();
if (count == 0) {
clusterRefs.remove(cluster);
clusterRefs.remove(cluster).close();
shouldUpdateResult = true;
}
}
Expand Down Expand Up @@ -879,7 +883,7 @@ private void cleanUpRoutes(Status error) {
for (String cluster : existingClusters) {
int count = clusterRefs.get(cluster).refCount.decrementAndGet();
if (count == 0) {
clusterRefs.remove(cluster);
clusterRefs.remove(cluster).close();
}
}
existingClusters = null;
Expand Down Expand Up @@ -965,15 +969,18 @@ private static class ClusterRefState {
final String traditionalCluster;
@Nullable
final RlsPluginConfig rlsPluginConfig;
@Nullable
final XdsConfig.Subscription subscription;

private ClusterRefState(
AtomicInteger refCount, @Nullable String traditionalCluster,
@Nullable RlsPluginConfig rlsPluginConfig) {
@Nullable RlsPluginConfig rlsPluginConfig, @Nullable XdsConfig.Subscription subscription) {
this.refCount = refCount;
checkArgument(traditionalCluster == null ^ rlsPluginConfig == null,
"There must be exactly one non-null value in traditionalCluster and pluginConfig");
this.traditionalCluster = traditionalCluster;
this.rlsPluginConfig = rlsPluginConfig;
this.subscription = subscription;
}

private Map<String, ?> toLbPolicy() {
Expand All @@ -993,12 +1000,21 @@ private ClusterRefState(
}
}

static ClusterRefState forCluster(AtomicInteger refCount, String name) {
return new ClusterRefState(refCount, name, null);
private void close() {
if (subscription != null) {
subscription.close();
}
}

static ClusterRefState forCluster(
AtomicInteger refCount, String name, XdsConfig.Subscription subscription) {
return new ClusterRefState(refCount, name, null, checkNotNull(subscription, "subscription"));
}

static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) {
return new ClusterRefState(refCount, null, rlsPluginConfig);
static ClusterRefState forRlsPlugin(
AtomicInteger refCount,
RlsPluginConfig rlsPluginConfig) {
return new ClusterRefState(refCount, null, rlsPluginConfig, null);
}
}
}
Loading