Skip to content

A75 Aggregate cluster fixes #12186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ed5072e
Add Docker fiels for xds example server and client.
kannanjgithub May 20, 2025
c8be933
Merge branch 'grpc:master' into master
kannanjgithub Jun 3, 2025
63997fd
Merge branch 'grpc:master' into master
kannanjgithub Jun 6, 2025
5140beb
Merge branch 'grpc:master' into master
kannanjgithub Jun 11, 2025
029db63
Merge branch 'grpc:master' into master
kannanjgithub Jun 13, 2025
f472436
Merge branch 'grpc:master' into master
kannanjgithub Jun 23, 2025
459208c
in-progress changes.
kannanjgithub Jun 23, 2025
6cddc61
in-progress changes.
kannanjgithub Jun 24, 2025
f038f4c
in-progress changes.
kannanjgithub Jun 25, 2025
ad9b1b3
in-progress changes.
kannanjgithub Jun 25, 2025
d92902d
in-progress changes.
kannanjgithub Jun 26, 2025
60fae37
in-progress changes.
kannanjgithub Jun 26, 2025
6f0a920
Revert "Add Docker fiels for xds example server and client."
kannanjgithub Jun 26, 2025
d816b8e
Changes
kannanjgithub Jun 27, 2025
c8eca59
whitespace changes nightmares
kannanjgithub Jun 27, 2025
4b92066
whitespace changes nightmares
kannanjgithub Jun 27, 2025
3867cc7
whitespace changes nightmares
kannanjgithub Jun 27, 2025
c4bfaa5
Replace clusters array with single cluster in ClusterResolverLB
kannanjgithub Jun 30, 2025
400b776
Review comments, excepting the child cluster type change handling.
kannanjgithub Jul 9, 2025
fa21404
nit
kannanjgithub Jul 9, 2025
084c51c
Handle cluster type changes by using Graceful switch load balancer as…
kannanjgithub Jul 11, 2025
f2bfa63
Fix test
kannanjgithub Jul 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 53 additions & 74 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.CheckReturnValue;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
Expand All @@ -33,6 +33,7 @@
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
import io.grpc.xds.XdsConfig.Subscription;
Expand All @@ -41,10 +42,11 @@
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Load balancer for cds_experimental LB policy. One instance per top-level cluster.
Expand All @@ -55,19 +57,15 @@
private final XdsLogger logger;
private final Helper helper;
private final LoadBalancerRegistry lbRegistry;
private final GracefulSwitchLoadBalancer delegate;
// Following fields are effectively final.
private String clusterName;
private Subscription clusterSubscription;
private LoadBalancer childLb;

CdsLoadBalancer2(Helper helper) {
this(helper, LoadBalancerRegistry.getDefaultRegistry());
}

@VisibleForTesting
CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
this.helper = checkNotNull(helper, "helper");
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
this.delegate = new GracefulSwitchLoadBalancer(helper);
logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
logger.log(XdsLogLevel.INFO, "Created");
}
Expand All @@ -91,7 +89,7 @@
if (clusterSubscription == null) {
// Should be impossible, because XdsDependencyManager wouldn't have generated this
return fail(Status.INTERNAL.withDescription(
errorPrefix() + "Unable to find non-dynamic root cluster"));
errorPrefix() + "Unable to find non-dynamic cluster"));

Check warning on line 92 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L92

Added line #L92 was not covered by tests
}
// The dynamic cluster must not have loaded yet
return Status.OK;
Expand All @@ -100,42 +98,25 @@
return fail(clusterConfigOr.getStatus());
}
XdsClusterConfig clusterConfig = clusterConfigOr.getValue();
List<String> leafNames;
if (clusterConfig.getChildren() instanceof AggregateConfig) {
leafNames = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
} else if (clusterConfig.getChildren() instanceof EndpointConfig) {
leafNames = ImmutableList.of(clusterName);
} else {
return fail(Status.INTERNAL.withDescription(
errorPrefix() + "Unexpected cluster children type: "
+ clusterConfig.getChildren().getClass()));
}
if (leafNames.isEmpty()) {
// Should be impossible, because XdsClusterResource validated this
return fail(Status.UNAVAILABLE.withDescription(
errorPrefix() + "Zero leaf clusters for root cluster " + clusterName));
}

Status noneFoundError = Status.INTERNAL
.withDescription(errorPrefix() + "No leaves and no error; this is a bug");
List<DiscoveryMechanism> instances = new ArrayList<>();
for (String leafName : leafNames) {
StatusOr<XdsClusterConfig> leafConfigOr = xdsConfig.getClusters().get(leafName);
if (!leafConfigOr.hasValue()) {
noneFoundError = leafConfigOr.getStatus();
continue;
}
if (!(leafConfigOr.getValue().getChildren() instanceof EndpointConfig)) {
noneFoundError = Status.INTERNAL.withDescription(
errorPrefix() + "Unexpected child " + leafName + " cluster children type: "
+ leafConfigOr.getValue().getChildren().getClass());
continue;
NameResolver.ConfigOrError configOrError;
Object gracefulConfig;
if (clusterConfig.getChildren() instanceof EndpointConfig) {
// The LB policy config is provided in service_config.proto/JSON format.
configOrError =
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()),
lbRegistry);
if (configOrError.getError() != null) {
// Should be impossible, because XdsClusterResource validated this
return fail(Status.INTERNAL.withDescription(
errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));

Check warning on line 113 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L112-L113

Added lines #L112 - L113 were not covered by tests
}
CdsUpdate result = leafConfigOr.getValue().getClusterResource();
CdsUpdate result = clusterConfig.getClusterResource();
DiscoveryMechanism instance;
if (result.clusterType() == ClusterType.EDS) {
instance = DiscoveryMechanism.forEds(
leafName,
clusterName,
result.edsServiceName(),
result.lrsServerInfo(),
result.maxConcurrentRequests(),
Expand All @@ -144,45 +125,49 @@
result.outlierDetection());
} else {
instance = DiscoveryMechanism.forLogicalDns(
leafName,
clusterName,
result.dnsHostName(),
result.lrsServerInfo(),
result.maxConcurrentRequests(),
result.upstreamTlsContext(),
result.filterMetadata());
}
instances.add(instance);
}
if (instances.isEmpty()) {
return fail(noneFoundError);
}

// The LB policy config is provided in service_config.proto/JSON format.
NameResolver.ConfigOrError configOrError =
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()), lbRegistry);
if (configOrError.getError() != null) {
// Should be impossible, because XdsClusterResource validated this
gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME),
new ClusterResolverConfig(
instance,
configOrError.getConfig(),
clusterConfig.getClusterResource().isHttp11ProxyAvailable()));
} else if (clusterConfig.getChildren() instanceof AggregateConfig) {
Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
List<String> leafClusters = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
for (String childCluster: leafClusters) {
priorityChildConfigs.put(childCluster,
new PriorityChildConfig(
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
lbRegistry.getProvider(CDS_POLICY_NAME),
new CdsConfig(childCluster)),
false));
}
gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
lbRegistry.getProvider(PRIORITY_POLICY_NAME),
new PriorityLoadBalancerProvider.PriorityLbConfig(
Collections.unmodifiableMap(priorityChildConfigs), leafClusters));
} else {
return fail(Status.INTERNAL.withDescription(
errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));
errorPrefix() + "Unexpected cluster children type: "
+ clusterConfig.getChildren().getClass()));

Check warning on line 159 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L158-L159

Added lines #L158 - L159 were not covered by tests
}

ClusterResolverConfig config = new ClusterResolverConfig(
Collections.unmodifiableList(instances),
configOrError.getConfig(),
clusterConfig.getClusterResource().isHttp11ProxyAvailable());
if (childLb == null) {
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
}
return childLb.acceptResolvedAddresses(
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
return delegate.acceptResolvedAddresses(
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(gracefulConfig).build());
}

@Override
public void handleNameResolutionError(Status error) {
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
if (childLb != null) {
childLb.handleNameResolutionError(error);
if (delegate != null) {
delegate.handleNameResolutionError(error);
} else {
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
Expand All @@ -192,10 +177,7 @@
@Override
public void shutdown() {
logger.log(XdsLogLevel.INFO, "Shutdown");
if (childLb != null) {
childLb.shutdown();
childLb = null;
}
delegate.shutdown();
if (clusterSubscription != null) {
clusterSubscription.close();
clusterSubscription = null;
Expand All @@ -204,10 +186,7 @@

@CheckReturnValue // don't forget to return up the stack after the fail call
private Status fail(Status error) {
if (childLb != null) {
childLb.shutdown();
childLb = null;
}
delegate.shutdown();
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter
Expand Down
18 changes: 17 additions & 1 deletion xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.internal.JsonUtil;
Expand Down Expand Up @@ -51,9 +52,24 @@ public String getPolicyName() {
return XdsLbPolicies.CDS_POLICY_NAME;
}

private final LoadBalancerRegistry loadBalancerRegistry;

public CdsLoadBalancerProvider() {
this.loadBalancerRegistry = null;
}

public CdsLoadBalancerProvider(LoadBalancerRegistry loadBalancerRegistry) {
this.loadBalancerRegistry = loadBalancerRegistry;
}

@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new CdsLoadBalancer2(helper);
LoadBalancerRegistry loadBalancerRegistry = this.loadBalancerRegistry;
if (loadBalancerRegistry == null) {
loadBalancerRegistry = LoadBalancerRegistry.getDefaultRegistry();
}

return new CdsLoadBalancer2(helper, loadBalancerRegistry);
}

@Override
Expand Down
74 changes: 31 additions & 43 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@
*/
private final class ClusterResolverLbState extends LoadBalancer {
private final Helper helper;
private final List<String> clusters = new ArrayList<>();
private final Map<String, ClusterState> clusterStates = new HashMap<>();
private String cluster;
private Object endpointLbConfig;
private ResolvedAddresses resolvedAddresses;
private LoadBalancer childLb;
Expand All @@ -185,21 +185,20 @@
ClusterResolverConfig config =
(ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
endpointLbConfig = config.lbConfig;
for (DiscoveryMechanism instance : config.discoveryMechanisms) {
clusters.add(instance.cluster);
ClusterState state;
if (instance.type == DiscoveryMechanism.Type.EDS) {
state = new EdsClusterState(instance.cluster, instance.edsServiceName,
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
instance.filterMetadata, instance.outlierDetection);
} else { // logical DNS
state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName,
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
instance.filterMetadata);
}
clusterStates.put(instance.cluster, state);
state.start();
}
DiscoveryMechanism instance = config.discoveryMechanism;
cluster = instance.cluster;
ClusterState state;
if (instance.type == DiscoveryMechanism.Type.EDS) {
state = new EdsClusterState(instance.cluster, instance.edsServiceName,
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
instance.filterMetadata, instance.outlierDetection);
} else { // logical DNS
state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName,
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
instance.filterMetadata);
}
clusterStates.put(instance.cluster, state);
state.start();
return Status.OK;
}

Expand Down Expand Up @@ -229,24 +228,22 @@
List<String> priorities = new ArrayList<>(); // totally ordered priority list

Status endpointNotFound = Status.OK;
for (String cluster : clusters) {
ClusterState state = clusterStates.get(cluster);
// Propagate endpoints to the child LB policy only after all clusters have been resolved.
if (!state.resolved && state.status.isOk()) {
return;
}
if (state.result != null) {
addresses.addAll(state.result.addresses);
priorityChildConfigs.putAll(state.result.priorityChildConfigs);
priorities.addAll(state.result.priorities);
} else {
endpointNotFound = state.status;
}
ClusterState state = clusterStates.get(cluster);
// Propagate endpoints to the child LB policy only after all clusters have been resolved.
if (!state.resolved && state.status.isOk()) {
return;

Check warning on line 234 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L234

Added line #L234 was not covered by tests
}
if (state.result != null) {
addresses.addAll(state.result.addresses);
priorityChildConfigs.putAll(state.result.priorityChildConfigs);
priorities.addAll(state.result.priorities);
} else {
endpointNotFound = state.status;
}
if (addresses.isEmpty()) {
if (endpointNotFound.isOk()) {
endpointNotFound = Status.UNAVAILABLE.withDescription(
"No usable endpoint from cluster(s): " + clusters);
"No usable endpoint from cluster: " + cluster);
} else {
endpointNotFound =
Status.UNAVAILABLE.withCause(endpointNotFound.getCause())
Expand Down Expand Up @@ -274,22 +271,13 @@
}

private void handleEndpointResolutionError() {
boolean allInError = true;
Status error = null;
for (String cluster : clusters) {
ClusterState state = clusterStates.get(cluster);
if (state.status.isOk()) {
allInError = false;
} else {
error = state.status;
}
}
if (allInError) {
ClusterState state = clusterStates.get(cluster);
if (!state.status.isOk()) {
if (childLb != null) {
childLb.handleNameResolutionError(error);
childLb.handleNameResolutionError(state.status);
} else {
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(state.status)));
}
}
}
Expand Down
Loading