Skip to content

Implementing SlowStartConfig for WRR Load Balancing Policy #12200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ def grpc_java_repositories(bzlmod = False):
if not native.existing_rule("com_github_cncf_xds"):
http_archive(
name = "com_github_cncf_xds",
strip_prefix = "xds-024c85f92f20cab567a83acc50934c7f9711d124",
sha256 = "5f403aa681711500ca8e62387be3e37d971977db6e88616fc21862a406430649",
strip_prefix = "xds-2ac532fd44436293585084f8d94c6bdb17835af0",
sha256 = "790c4c83b6950bb602fec221f6a529d9f368cdc8852aae7d2592d0d04b015f37",
urls = [
"https://github.com/cncf/xds/archive/024c85f92f20cab567a83acc50934c7f9711d124.tar.gz",
"https://github.com/cncf/xds/archive/2ac532fd44436293585084f8d94c6bdb17835af0.tar.gz",
],
)
if not bzlmod and not native.existing_rule("com_github_grpc_grpc"):
Expand Down Expand Up @@ -141,10 +141,10 @@ def grpc_java_repositories(bzlmod = False):
if not native.existing_rule("envoy_api"):
http_archive(
name = "envoy_api",
sha256 = "ecf71817233eba19cc8b4ee14e126ffd5838065d5b5a92b2506258a42ac55199",
strip_prefix = "data-plane-api-0bc95493c5e88b7b07e62758d23b39341813a827",
sha256 = "e1c59ea08b84ad2994ccfc8d0cb6fc02358f24f659ea42dd8fc7755ff6887d01",
strip_prefix = "envoy-464320b866ca5d2fddaa609a62d77d7d12f0f078/api",
urls = [
"https://github.com/envoyproxy/data-plane-api/archive/0bc95493c5e88b7b07e62758d23b39341813a827.tar.gz",
"https://github.com/envoyproxy/envoy/archive/464320b866ca5d2fddaa609a62d77d7d12f0f078.tar.gz",
],
)

Expand Down
48 changes: 46 additions & 2 deletions xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy;
import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy.Policy;
import io.envoyproxy.envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin;
import io.envoyproxy.envoy.extensions.load_balancing_policies.common.v3.SlowStartConfig;
import io.envoyproxy.envoy.extensions.load_balancing_policies.least_request.v3.LeastRequest;
import io.envoyproxy.envoy.extensions.load_balancing_policies.pick_first.v3.PickFirst;
import io.envoyproxy.envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash;
Expand Down Expand Up @@ -92,6 +93,14 @@ class LoadBalancerConfigFactory {

static final String ERROR_UTILIZATION_PENALTY = "errorUtilizationPenalty";

static final String AGGRESSION = "aggression";

static final String SLOW_START_WINDOW = "slowStartWindow";

static final String MIN_WEIGHT_PERCENT = "minWeightPercent";

static final String SLOW_START_CONFIG = "slowStartConfig";

/**
* Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link
* Cluster}.
Expand Down Expand Up @@ -138,7 +147,8 @@ class LoadBalancerConfigFactory {
String oobReportingPeriod,
Boolean enableOobLoadReport,
String weightUpdatePeriod,
Float errorUtilizationPenalty) {
Float errorUtilizationPenalty,
ImmutableMap<String, ?> slowStartConfig) {
ImmutableMap.Builder<String, Object> configBuilder = ImmutableMap.builder();
if (blackoutPeriod != null) {
configBuilder.put(BLACK_OUT_PERIOD, blackoutPeriod);
Expand All @@ -158,10 +168,29 @@ class LoadBalancerConfigFactory {
if (errorUtilizationPenalty != null) {
configBuilder.put(ERROR_UTILIZATION_PENALTY, errorUtilizationPenalty);
}
if (slowStartConfig != null) {
configBuilder.put(SLOW_START_CONFIG, slowStartConfig);
}
return ImmutableMap.of(WeightedRoundRobinLoadBalancerProvider.SCHEME,
configBuilder.buildOrThrow());
}

private static ImmutableMap<String, ?> buildSlowStartConfig(Double minWeightPercent,
Double aggression,
String slowStartWindow) {
ImmutableMap.Builder<String, Object> configBuilder = ImmutableMap.builder();
if (minWeightPercent != null) {
configBuilder.put(MIN_WEIGHT_PERCENT, minWeightPercent);
}
if (aggression != null) {
configBuilder.put(AGGRESSION, aggression);
}
if (slowStartWindow != null) {
configBuilder.put(SLOW_START_WINDOW, slowStartWindow);
}
return configBuilder.buildOrThrow();
}

/**
* Builds a service config JSON object for the least_request load balancer config based on the
* given config values.
Expand Down Expand Up @@ -293,13 +322,28 @@ static class LoadBalancingPolicyConverter {
wrr.hasOobReportingPeriod() ? Durations.toString(wrr.getOobReportingPeriod()) : null,
wrr.hasEnableOobLoadReport() ? wrr.getEnableOobLoadReport().getValue() : null,
wrr.hasWeightUpdatePeriod() ? Durations.toString(wrr.getWeightUpdatePeriod()) : null,
wrr.hasErrorUtilizationPenalty() ? wrr.getErrorUtilizationPenalty().getValue() : null);
wrr.hasErrorUtilizationPenalty() ? wrr.getErrorUtilizationPenalty().getValue() : null,
wrr.hasSlowStartConfig() ? convertSlotStartConfig(wrr.getSlowStartConfig()) : null);
} catch (IllegalArgumentException ex) {
throw new ResourceInvalidException("Invalid duration in weighted round robin config: "
+ ex.getMessage());
}
}

private static ImmutableMap<String, ?> convertSlotStartConfig(
SlowStartConfig config) throws ResourceInvalidException {
try {
return buildSlowStartConfig(
config.hasMinWeightPercent() ? config.getMinWeightPercent().getValue() : null,
config.hasAggression() ? config.getAggression().getDefaultValue() : null,
config.hasSlowStartWindow() ? Durations.toString(config.getSlowStartWindow()) : null
);
} catch (IllegalArgumentException ex) {
throw new ResourceInvalidException("Invalid duration in slow start slowStart: "
+ ex.getMessage());
}
}

/**
* Converts a wrr_locality {@link Any} configuration to service config format.
*/
Expand Down
64 changes: 64 additions & 0 deletions xds/src/main/java/io/grpc/xds/SlowStartConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

final class SlowStartConfig {
final double minWeightPercent;
final double aggression;
final long slowStartWindowNanos;

public static Builder newBuilder() {
return new Builder();
}

private SlowStartConfig(double minWeightPercent, double aggression, long slowStartWindowNanos) {
this.minWeightPercent = minWeightPercent;
this.aggression = aggression;
this.slowStartWindowNanos = slowStartWindowNanos;
}

static final class Builder {
private double minWeightPercent = 10.0;
private double aggression = 1.0;
private long slowStartWindowNanos = 0L;

private Builder() {
}

@SuppressWarnings("UnusedReturnValue")
Builder setMinWeightPercent(double minWeightPercent) {
this.minWeightPercent = minWeightPercent;
return this;
}

@SuppressWarnings("UnusedReturnValue")
Builder setAggression(double aggression) {
this.aggression = aggression;
return this;
}

@SuppressWarnings("UnusedReturnValue")
Builder setSlowStartWindowNanos(long slowStartWindowNanos) {
this.slowStartWindowNanos = slowStartWindowNanos;
return this;
}

SlowStartConfig build() {
return new SlowStartConfig(minWeightPercent, aggression, slowStartWindowNanos);
}
}
}
77 changes: 67 additions & 10 deletions xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@
* "\"oobReportingPeriod\":\"10s\"," +
* "\"weightExpirationPeriod\":\"180s\"," +
* "\"errorUtilizationPenalty\":\"1.0\"," +
* "\"weightUpdatePeriod\":\"1s\"}}]}";
* "\"weightUpdatePeriod\":\"1s\"," +
* "\"slowStartConfig\":{" +
* "\"minWeightPercent\":10.0," +
* "\"aggression\":1.0," +
* "\"slowStartWindow\":\"30s\"}}}]}";
* serviceConfig = (Map<String, ?>) JsonParser.parse(wrrConfig);
* channel = ManagedChannelBuilder.forTarget("test:///lb.test.grpc.io")
* .defaultServiceConfig(serviceConfig)
Expand All @@ -90,6 +94,7 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER;
private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER;
private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER;
private static final LongCounterMetricInstrument ENDPOINT_SLOW_START_COUNTER;
private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM;
private static final Logger log = Logger.getLogger(
WeightedRoundRobinLoadBalancer.class.getName());
Expand Down Expand Up @@ -133,6 +138,14 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
false);
ENDPOINT_SLOW_START_COUNTER = metricInstrumentRegistry.registerLongCounter(
"grpc.lb.wrr.endpoints_in_slow_start",
"EXPERIMENTAL. Number of endpoints from each scheduler update that "
+ "are in slow start window",
"{endpoint}",
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
false);
ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
"grpc.lb.wrr.endpoint_weights",
"EXPERIMENTAL. The histogram buckets will be endpoint weight ranges.",
Expand Down Expand Up @@ -243,16 +256,21 @@ private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList)
private void updateWeight(WeightedRoundRobinPicker picker) {
Helper helper = getHelper();
float[] newWeights = new float[picker.children.size()];
float[] newScales = new float[picker.children.size()];
AtomicInteger staleEndpoints = new AtomicInteger();
AtomicInteger notYetUsableEndpoints = new AtomicInteger();
AtomicInteger slowStartEndpoints = new AtomicInteger();
for (int i = 0; i < picker.children.size(); i++) {
double newWeight = ((WeightedChildLbState) picker.children.get(i)).getWeight(staleEndpoints,
notYetUsableEndpoints);
double newScale = ((WeightedChildLbState) picker.children.get(i))
.getScale(slowStartEndpoints);
helper.getMetricRecorder()
.recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(locality, backendService));
newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
newScales[i] = newScale > 0 ? (float) newScale : 1.0f;
}

if (staleEndpoints.get() > 0) {
Expand All @@ -267,7 +285,13 @@ private void updateWeight(WeightedRoundRobinPicker picker) {
ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(locality, backendService));
}
boolean weightsEffective = picker.updateWeight(newWeights);
if (slowStartEndpoints.get() > 0) {
helper.getMetricRecorder()
.addLongCounter(ENDPOINT_SLOW_START_COUNTER, slowStartEndpoints.get(),
ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(locality, backendService));
}
boolean weightsEffective = picker.updateWeight(newWeights, newScales);
if (!weightsEffective) {
helper.getMetricRecorder()
.addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
Expand All @@ -289,6 +313,7 @@ final class WeightedChildLbState extends ChildLbState {
private final Set<WrrSubchannel> subchannels = new HashSet<>();
private volatile long lastUpdated;
private volatile long nonEmptySince;
private volatile long readySince;
private volatile double weight = 0;

private OrcaReportListener orcaReportListener;
Expand Down Expand Up @@ -320,6 +345,25 @@ private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsabl
}
}

private double getScale(AtomicInteger slowStartEndpoints) {
if (config == null || config.slowStartConfig == null) {
return 1;
}
long slowStartWindowNanos = config.slowStartConfig.slowStartWindowNanos;
if (slowStartWindowNanos <= 0) {
return 1;
}
long now = ticker.nanoTime();
if (now - readySince >= slowStartWindowNanos) {
return 1;
} else {
slowStartEndpoints.incrementAndGet();
double timeFactor = Math.max(now - readySince, 1.0) / slowStartWindowNanos;
double weightPercent = Math.pow(timeFactor, 1.0 / config.slowStartConfig.aggression);
return Math.max(config.slowStartConfig.minWeightPercent / 100.0, weightPercent);
}
}

public void addSubchannel(WrrSubchannel wrrSubchannel) {
subchannels.add(wrrSubchannel);
}
Expand Down Expand Up @@ -439,6 +483,7 @@ public void start(SubchannelStateListener listener) {
public void onSubchannelState(ConnectivityStateInfo newState) {
if (newState.getState().equals(ConnectivityState.READY)) {
owner.nonEmptySince = infTime;
owner.readySince = ticker.nanoTime();
}
listener.onSubchannelState(newState);
}
Expand Down Expand Up @@ -517,8 +562,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
}

/** Returns {@code true} if weights are different than round_robin. */
private boolean updateWeight(float[] newWeights) {
this.scheduler = new StaticStrideScheduler(newWeights, sequence);
private boolean updateWeight(float[] newWeights, float[] newScales) {
this.scheduler = new StaticStrideScheduler(newWeights, newScales, sequence);
return !this.scheduler.usesRoundRobin();
}

Expand Down Expand Up @@ -604,7 +649,7 @@ static final class StaticStrideScheduler {
private static final double K_MAX_RATIO = 10;
private static final double K_MIN_RATIO = 0.1;

StaticStrideScheduler(float[] weights, AtomicInteger sequence) {
StaticStrideScheduler(float[] weights, float[] scales, AtomicInteger sequence) {
checkArgument(weights.length >= 1, "Couldn't build scheduler: requires at least one weight");
int numChannels = weights.length;
int numWeightedChannels = 0;
Expand Down Expand Up @@ -643,12 +688,14 @@ static final class StaticStrideScheduler {
int weightLowerBound = (int) Math.ceil(scalingFactor * unscaledMeanWeight * K_MIN_RATIO);
short[] scaledWeights = new short[numChannels];
for (int i = 0; i < numChannels; i++) {
double curScalingFactor = scalingFactor * scales[i];
int weight;
if (weights[i] <= 0) {
scaledWeights[i] = (short) Math.round(scalingFactor * unscaledMeanWeight);
weight = (int) Math.round(curScalingFactor * unscaledMeanWeight);
} else {
int weight = (int) Math.round(scalingFactor * Math.min(weights[i], unscaledMaxWeight));
scaledWeights[i] = (short) Math.max(weight, weightLowerBound);
weight = (int) Math.round(curScalingFactor * Math.min(weights[i], unscaledMaxWeight));
}
scaledWeights[i] = (short) Math.max(weight, weightLowerBound);
}

this.scaledWeights = scaledWeights;
Expand Down Expand Up @@ -719,6 +766,7 @@ static final class WeightedRoundRobinLoadBalancerConfig {
final long oobReportingPeriodNanos;
final long weightUpdatePeriodNanos;
final float errorUtilizationPenalty;
final SlowStartConfig slowStartConfig;

public static Builder newBuilder() {
return new Builder();
Expand All @@ -729,13 +777,15 @@ private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos,
boolean enableOobLoadReport,
long oobReportingPeriodNanos,
long weightUpdatePeriodNanos,
float errorUtilizationPenalty) {
float errorUtilizationPenalty,
SlowStartConfig slowStartConfig) {
this.blackoutPeriodNanos = blackoutPeriodNanos;
this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
this.enableOobLoadReport = enableOobLoadReport;
this.oobReportingPeriodNanos = oobReportingPeriodNanos;
this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
this.errorUtilizationPenalty = errorUtilizationPenalty;
this.slowStartConfig = slowStartConfig;
}

static final class Builder {
Expand All @@ -745,6 +795,7 @@ static final class Builder {
long oobReportingPeriodNanos = 10_000_000_000L; // 10s
long weightUpdatePeriodNanos = 1_000_000_000L; // 1s
float errorUtilizationPenalty = 1.0F;
SlowStartConfig slowStartConfig = SlowStartConfig.newBuilder().build();

private Builder() {

Expand Down Expand Up @@ -782,10 +833,16 @@ Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) {
return this;
}

@SuppressWarnings("UnusedReturnValue")
Builder setSlowStartConfig(SlowStartConfig slowStartConfig) {
this.slowStartConfig = slowStartConfig;
return this;
}

WeightedRoundRobinLoadBalancerConfig build() {
return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos,
weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos,
weightUpdatePeriodNanos, errorUtilizationPenalty);
weightUpdatePeriodNanos, errorUtilizationPenalty, slowStartConfig);
}
}
}
Expand Down
Loading
Loading