@@ -90,6 +90,7 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
90
90
private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER ;
91
91
private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER ;
92
92
private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER ;
93
+ private static final LongCounterMetricInstrument ENDPOINT_SLOW_START_COUNTER ;
93
94
private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM ;
94
95
private static final Logger log = Logger .getLogger (
95
96
WeightedRoundRobinLoadBalancer .class .getName ());
@@ -133,6 +134,14 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
133
134
Lists .newArrayList ("grpc.target" ),
134
135
Lists .newArrayList ("grpc.lb.locality" , "grpc.lb.backend_service" ),
135
136
false );
137
+ ENDPOINT_SLOW_START_COUNTER = metricInstrumentRegistry .registerLongCounter (
138
+ "grpc.lb.wrr.endpoints_in_slow_start" ,
139
+ "EXPERIMENTAL. Number of endpoints from each scheduler update that "
140
+ + "are in slow start window" ,
141
+ "{endpoint}" ,
142
+ Lists .newArrayList ("grpc.target" ),
143
+ Lists .newArrayList ("grpc.lb.locality" , "grpc.lb.backend_service" ),
144
+ false );
136
145
ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry .registerDoubleHistogram (
137
146
"grpc.lb.wrr.endpoint_weights" ,
138
147
"EXPERIMENTAL. The histogram buckets will be endpoint weight ranges." ,
@@ -243,16 +252,21 @@ private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList)
243
252
private void updateWeight (WeightedRoundRobinPicker picker ) {
244
253
Helper helper = getHelper ();
245
254
float [] newWeights = new float [picker .children .size ()];
255
+ float [] newScales = new float [picker .children .size ()];
246
256
AtomicInteger staleEndpoints = new AtomicInteger ();
247
257
AtomicInteger notYetUsableEndpoints = new AtomicInteger ();
258
+ AtomicInteger slowStartEndpoints = new AtomicInteger ();
248
259
for (int i = 0 ; i < picker .children .size (); i ++) {
249
260
double newWeight = ((WeightedChildLbState ) picker .children .get (i )).getWeight (staleEndpoints ,
250
261
notYetUsableEndpoints );
262
+ double newScale = ((WeightedChildLbState ) picker .children .get (i ))
263
+ .getScale (slowStartEndpoints );
251
264
helper .getMetricRecorder ()
252
265
.recordDoubleHistogram (ENDPOINT_WEIGHTS_HISTOGRAM , newWeight ,
253
266
ImmutableList .of (helper .getChannelTarget ()),
254
267
ImmutableList .of (locality , backendService ));
255
268
newWeights [i ] = newWeight > 0 ? (float ) newWeight : 0.0f ;
269
+ newScales [i ] = newScale > 0 ? (float ) newScale : 1.0f ;
256
270
}
257
271
258
272
if (staleEndpoints .get () > 0 ) {
@@ -267,7 +281,13 @@ private void updateWeight(WeightedRoundRobinPicker picker) {
267
281
ImmutableList .of (helper .getChannelTarget ()),
268
282
ImmutableList .of (locality , backendService ));
269
283
}
270
- boolean weightsEffective = picker .updateWeight (newWeights );
284
+ if (slowStartEndpoints .get () > 0 ) {
285
+ helper .getMetricRecorder ()
286
+ .addLongCounter (ENDPOINT_SLOW_START_COUNTER , slowStartEndpoints .get (),
287
+ ImmutableList .of (helper .getChannelTarget ()),
288
+ ImmutableList .of (locality , backendService ));
289
+ }
290
+ boolean weightsEffective = picker .updateWeight (newWeights , newScales );
271
291
if (!weightsEffective ) {
272
292
helper .getMetricRecorder ()
273
293
.addLongCounter (RR_FALLBACK_COUNTER , 1 , ImmutableList .of (helper .getChannelTarget ()),
@@ -289,6 +309,7 @@ final class WeightedChildLbState extends ChildLbState {
289
309
private final Set <WrrSubchannel > subchannels = new HashSet <>();
290
310
private volatile long lastUpdated ;
291
311
private volatile long nonEmptySince ;
312
+ private volatile long readySince ;
292
313
private volatile double weight = 0 ;
293
314
294
315
private OrcaReportListener orcaReportListener ;
@@ -320,6 +341,25 @@ private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsabl
320
341
}
321
342
}
322
343
344
+ private double getScale (AtomicInteger slowStartEndpoints ) {
345
+ if (config == null || config .slowStartConfig == null ) {
346
+ return 1 ;
347
+ }
348
+ long slowStartWindowNanos = config .slowStartConfig .slowStartWindowNanos ;
349
+ if (slowStartWindowNanos <= 0 ) {
350
+ return 1 ;
351
+ }
352
+ long now = ticker .nanoTime ();
353
+ if (now - readySince >= slowStartWindowNanos ) {
354
+ return 1 ;
355
+ } else {
356
+ slowStartEndpoints .incrementAndGet ();
357
+ double timeFactor = Math .max (now - readySince , 1.0 ) / slowStartWindowNanos ;
358
+ double weightPercent = Math .pow (timeFactor , 1.0 / config .slowStartConfig .aggression );
359
+ return Math .max (config .slowStartConfig .minWeightPercent / 100.0 , weightPercent );
360
+ }
361
+ }
362
+
323
363
public void addSubchannel (WrrSubchannel wrrSubchannel ) {
324
364
subchannels .add (wrrSubchannel );
325
365
}
@@ -439,6 +479,7 @@ public void start(SubchannelStateListener listener) {
439
479
public void onSubchannelState (ConnectivityStateInfo newState ) {
440
480
if (newState .getState ().equals (ConnectivityState .READY )) {
441
481
owner .nonEmptySince = infTime ;
482
+ owner .readySince = ticker .nanoTime ();
442
483
}
443
484
listener .onSubchannelState (newState );
444
485
}
@@ -517,8 +558,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
517
558
}
518
559
519
560
/** Returns {@code true} if weights are different than round_robin. */
520
- private boolean updateWeight (float [] newWeights ) {
521
- this .scheduler = new StaticStrideScheduler (newWeights , sequence );
561
+ private boolean updateWeight (float [] newWeights , float [] newScales ) {
562
+ this .scheduler = new StaticStrideScheduler (newWeights , newScales , sequence );
522
563
return !this .scheduler .usesRoundRobin ();
523
564
}
524
565
@@ -604,7 +645,7 @@ static final class StaticStrideScheduler {
604
645
private static final double K_MAX_RATIO = 10 ;
605
646
private static final double K_MIN_RATIO = 0.1 ;
606
647
607
- StaticStrideScheduler (float [] weights , AtomicInteger sequence ) {
648
+ StaticStrideScheduler (float [] weights , float [] scales , AtomicInteger sequence ) {
608
649
checkArgument (weights .length >= 1 , "Couldn't build scheduler: requires at least one weight" );
609
650
int numChannels = weights .length ;
610
651
int numWeightedChannels = 0 ;
@@ -643,12 +684,14 @@ static final class StaticStrideScheduler {
643
684
int weightLowerBound = (int ) Math .ceil (scalingFactor * unscaledMeanWeight * K_MIN_RATIO );
644
685
short [] scaledWeights = new short [numChannels ];
645
686
for (int i = 0 ; i < numChannels ; i ++) {
687
+ double curScalingFactor = scalingFactor * scales [i ];
688
+ int weight ;
646
689
if (weights [i ] <= 0 ) {
647
- scaledWeights [ i ] = (short ) Math .round (scalingFactor * unscaledMeanWeight );
690
+ weight = (int ) Math .round (curScalingFactor * unscaledMeanWeight );
648
691
} else {
649
- int weight = (int ) Math .round (scalingFactor * Math .min (weights [i ], unscaledMaxWeight ));
650
- scaledWeights [i ] = (short ) Math .max (weight , weightLowerBound );
692
+ weight = (int ) Math .round (curScalingFactor * Math .min (weights [i ], unscaledMaxWeight ));
651
693
}
694
+ scaledWeights [i ] = (short ) Math .max (weight , weightLowerBound );
652
695
}
653
696
654
697
this .scaledWeights = scaledWeights ;
0 commit comments