From 7660113386962ad4bad8b568d5bcf57749fd5aa6 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Mon, 30 Jun 2025 17:03:03 +0530 Subject: [PATCH] lb: pick first metrics --- .../internal/PickFirstLeafLoadBalancer.java | 45 +++++++++++++++++++ .../PickFirstLeafLoadBalancerTest.java | 17 +++++++ 2 files changed, 62 insertions(+) diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index bbc144ea775..9cd16bb05ef 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -32,6 +32,8 @@ import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; +import io.grpc.LongCounterMetricInstrument; +import io.grpc.MetricInstrumentRegistry; import io.grpc.Status; import io.grpc.SynchronizationContext.ScheduledHandle; import java.net.Inet4Address; @@ -57,6 +59,9 @@ * list and sticking to the first that works. */ final class PickFirstLeafLoadBalancer extends LoadBalancer { + private static final LongCounterMetricInstrument DISCONNECTIONS; + private static final LongCounterMetricInstrument CONNECTION_ATTEMPTS_SUCCEEDED; + private static final LongCounterMetricInstrument CONNECTION_ATTEMPTS_FAILED; private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName()); @VisibleForTesting static final int CONNECTION_DELAY_INTERVAL_MS = 250; @@ -78,6 +83,33 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { private ScheduledHandle reconnectTask = null; private final boolean serializingRetries = isSerializingRetries(); + // The metric instruments are only registered once and shared by all instances of this LB. + static { + MetricInstrumentRegistry metricInstrumentRegistry + = MetricInstrumentRegistry.getDefaultRegistry(); + DISCONNECTIONS = metricInstrumentRegistry.registerLongCounter( + "grpc.lb.pick_first.disconnections", + "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected", + "{disconnection}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList(), + false); + CONNECTION_ATTEMPTS_SUCCEEDED = metricInstrumentRegistry.registerLongCounter( + "grpc.lb.pick_first.connection_attempts_succeeded", + "EXPERIMENTAL. Number of successful connection attempts", + "{attempt}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList(), + false); + CONNECTION_ATTEMPTS_FAILED = metricInstrumentRegistry.registerLongCounter( + "grpc.lb.pick_first.connection_attempts_failed", + "EXPERIMENTAL. Number of failed connection attempts", + "{attempt}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList(), + false); + } + PickFirstLeafLoadBalancer(Helper helper) { this.helper = checkNotNull(helper, "helper"); } @@ -276,6 +308,13 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo } } + // if the previous state was ready, count it as a disconnection + if (rawConnectivityState == READY || concludedState == READY) { + helper.getMetricRecorder().addLongCounter(DISCONNECTIONS, 1, + Collections.singletonList(helper.getChannelTarget()), + Collections.emptyList()); + } + switch (newState) { case IDLE: // Shutdown when ready: connect from beginning when prompted @@ -293,11 +332,17 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo shutdownRemaining(subchannelData); addressIndex.seekTo(getAddress(subchannelData.subchannel)); rawConnectivityState = READY; + helper.getMetricRecorder().addLongCounter(CONNECTION_ATTEMPTS_SUCCEEDED, 1, + Collections.singletonList(helper.getChannelTarget()), + Collections.emptyList()); updateHealthCheckedState(subchannelData); break; case TRANSIENT_FAILURE: // If we are looking at current channel, request a connection if possible + helper.getMetricRecorder().addLongCounter(CONNECTION_ATTEMPTS_FAILED, 1, + Collections.singletonList(helper.getChannelTarget()), + Collections.emptyList()); if (addressIndex.isValid() && subchannels.get(addressIndex.getCurrentAddress()) == subchannelData) { if (addressIndex.increment()) { diff --git a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java index 0e902bfdd56..a640fd85e99 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java @@ -506,6 +506,8 @@ public void healthCheckFlow() { verify(mockHelper, atLeast(0)).getSynchronizationContext(); verify(mockHelper, atLeast(0)).getScheduledExecutorService(); + verify(mockHelper, atLeast(0)).getMetricRecorder(); + verify(mockHelper, atLeast(0)).getChannelTarget(); verifyNoMoreInteractions(mockHelper); // subchannel | state | health @@ -525,6 +527,8 @@ public void healthCheckFlow() { .getSubchannel()).isSameInstanceAs(mockSubchannel1); verify(mockHelper, atLeast(0)).getSynchronizationContext(); verify(mockHelper, atLeast(0)).getScheduledExecutorService(); + verify(mockHelper, atLeast(0)).getChannelTarget(); + verify(mockHelper, atLeast(0)).getMetricRecorder(); verifyNoMoreInteractions(mockHelper); healthListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); @@ -618,6 +622,8 @@ public void pickAfterResolutionAfterTransientValue() { stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); verify(mockHelper, atLeast(0)).getSynchronizationContext(); verify(mockHelper, atLeast(0)).getScheduledExecutorService(); + verify(mockHelper, atLeast(0)).getMetricRecorder(); + verify(mockHelper, atLeast(0)).getChannelTarget(); verifyNoMoreInteractions(mockHelper); assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); } @@ -650,6 +656,8 @@ public void pickWithDupAddressesUpDownUp() { stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); verify(mockHelper, atLeast(0)).getSynchronizationContext(); verify(mockHelper, atLeast(0)).getScheduledExecutorService(); + verify(mockHelper, atLeast(0)).getMetricRecorder(); + verify(mockHelper, atLeast(0)).getChannelTarget(); verifyNoMoreInteractions(mockHelper); assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); @@ -684,6 +692,8 @@ public void pickWithDupEagsUpDownUp() { stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); verify(mockHelper, atLeast(0)).getSynchronizationContext(); verify(mockHelper, atLeast(0)).getScheduledExecutorService(); + verify(mockHelper, atLeast(0)).getMetricRecorder(); + verify(mockHelper, atLeast(0)).getChannelTarget(); verifyNoMoreInteractions(mockHelper); assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); @@ -1859,6 +1869,8 @@ public void updateAddresses_identical_transient_failure() { // No new connections are requested, subchannels responsible for completing their own backoff verify(mockHelper, atLeast(0)).getSynchronizationContext(); // Don't care verify(mockHelper, atLeast(0)).getScheduledExecutorService(); + verify(mockHelper, atLeast(0)).getMetricRecorder(); + verify(mockHelper, atLeast(0)).getChannelTarget(); verifyNoMoreInteractions(mockHelper); // First connection attempt is successful @@ -2923,6 +2935,11 @@ public String getAuthority() { return null; } + @Override + public String getChannelTarget() { + return null; + } + @Override public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { // ignore