diff --git a/xds/src/main/java/io/grpc/xds/client/Bootstrapper.java b/xds/src/main/java/io/grpc/xds/client/Bootstrapper.java index 90babd1e8d0..4fa75f6b335 100644 --- a/xds/src/main/java/io/grpc/xds/client/Bootstrapper.java +++ b/xds/src/main/java/io/grpc/xds/client/Bootstrapper.java @@ -63,17 +63,20 @@ public abstract static class ServerInfo { public abstract boolean isTrustedXdsServer(); + public abstract boolean resourceTimerIsTransientError(); + @VisibleForTesting public static ServerInfo create(String target, @Nullable Object implSpecificConfig) { - return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig, false, false); + return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig, + false, false, false); } @VisibleForTesting public static ServerInfo create( String target, Object implSpecificConfig, boolean ignoreResourceDeletion, - boolean isTrustedXdsServer) { + boolean isTrustedXdsServer, boolean resourceTimerIsTransientError) { return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig, - ignoreResourceDeletion, isTrustedXdsServer); + ignoreResourceDeletion, isTrustedXdsServer, resourceTimerIsTransientError); } } diff --git a/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java b/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java index c00685f1781..423c1a118e8 100644 --- a/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java @@ -43,6 +43,8 @@ public abstract class BootstrapperImpl extends Bootstrapper { public static final String GRPC_EXPERIMENTAL_XDS_FALLBACK = "GRPC_EXPERIMENTAL_XDS_FALLBACK"; + public static final String GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING = + "GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING"; // Client features. @VisibleForTesting @@ -54,10 +56,16 @@ public abstract class BootstrapperImpl extends Bootstrapper { // Server features. private static final String SERVER_FEATURE_IGNORE_RESOURCE_DELETION = "ignore_resource_deletion"; private static final String SERVER_FEATURE_TRUSTED_XDS_SERVER = "trusted_xds_server"; + private static final String + SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR = "resource_timer_is_transient_error"; @VisibleForTesting static boolean enableXdsFallback = GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_FALLBACK, true); + @VisibleForTesting + public static boolean xdsDataErrorHandlingEnabled + = GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING, false); + protected final XdsLogger logger; protected FileReader reader = LocalFileReader.INSTANCE; @@ -247,6 +255,7 @@ private List parseServerInfos(List rawServerConfigs, XdsLogger lo Object implSpecificConfig = getImplSpecificConfig(serverConfig, serverUri); + boolean resourceTimerIsTransientError = false; boolean ignoreResourceDeletion = false; // "For forward compatibility reasons, the client will ignore any entry in the list that it // does not understand, regardless of type." @@ -254,11 +263,14 @@ private List parseServerInfos(List rawServerConfigs, XdsLogger lo if (serverFeatures != null) { logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures); ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION); + resourceTimerIsTransientError = xdsDataErrorHandlingEnabled + && serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR); } servers.add( ServerInfo.create(serverUri, implSpecificConfig, ignoreResourceDeletion, serverFeatures != null - && serverFeatures.contains(SERVER_FEATURE_TRUSTED_XDS_SERVER))); + && serverFeatures.contains(SERVER_FEATURE_TRUSTED_XDS_SERVER), + resourceTimerIsTransientError)); } return servers.build(); } diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClient.java b/xds/src/main/java/io/grpc/xds/client/XdsClient.java index edbb0b2d74c..e2dd4169bca 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClient.java @@ -199,6 +199,10 @@ public static ResourceMetadata newResourceMetadataDoesNotExist() { return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, false, null, null); } + public static ResourceMetadata newResourceMetadataTimeout() { + return new ResourceMetadata(ResourceMetadataStatus.TIMEOUT, "", 0, false, null, null); + } + public static ResourceMetadata newResourceMetadataAcked( Any rawResource, String version, long updateTimeNanos) { checkNotNull(rawResource, "rawResource"); @@ -256,7 +260,7 @@ public UpdateFailureState getErrorState() { * config_dump.proto */ public enum ResourceMetadataStatus { - UNKNOWN, REQUESTED, DOES_NOT_EXIST, ACKED, NACKED + UNKNOWN, REQUESTED, DOES_NOT_EXIST, ACKED, NACKED, TIMEOUT } /** diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 2b25d4db977..d70cfd841ef 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.xds.client.BootstrapperImpl.xdsDataErrorHandlingEnabled; import static io.grpc.xds.client.XdsResourceType.ParsedResource; import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate; @@ -67,6 +68,7 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore { // Longest time to wait, since the subscription to some resource, for concluding its absence. @VisibleForTesting public static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15; + public static final int EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC = 30; private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @@ -738,6 +740,9 @@ void restartTimer() { // When client becomes ready, it triggers a restartTimer for all relevant subscribers. return; } + ServerInfo serverInfo = activeCpc.getServerInfo(); + int timeoutSec = xdsDataErrorHandlingEnabled && serverInfo.resourceTimerIsTransientError() + ? EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC : INITIAL_RESOURCE_FETCH_TIMEOUT_SEC; class ResourceNotFound implements Runnable { @Override @@ -761,8 +766,7 @@ public String toString() { respTimer.cancel(); } respTimer = syncContext.schedule( - new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, - timeService); + new ResourceNotFound(), timeoutSec, TimeUnit.SECONDS, timeService); } void stopTimer() { @@ -840,6 +844,8 @@ void onAbsent(@Nullable ProcessingTracker processingTracker, ServerInfo serverIn // Ignore deletion of State of the World resources when this feature is on, // and the resource is reusable. boolean ignoreResourceDeletionEnabled = serverInfo.ignoreResourceDeletion(); + boolean resourceTimerIsTransientError = + xdsDataErrorHandlingEnabled && serverInfo.resourceTimerIsTransientError(); if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) { if (!resourceDeletionIgnored) { logger.log(XdsLogLevel.FORCE_WARNING, @@ -854,14 +860,20 @@ void onAbsent(@Nullable ProcessingTracker processingTracker, ServerInfo serverIn if (!absent) { data = null; absent = true; - metadata = ResourceMetadata.newResourceMetadataDoesNotExist(); + metadata = resourceTimerIsTransientError ? ResourceMetadata.newResourceMetadataTimeout() : + ResourceMetadata.newResourceMetadataDoesNotExist(); for (ResourceWatcher watcher : watchers.keySet()) { if (processingTracker != null) { processingTracker.startTask(); } watchers.get(watcher).execute(() -> { try { - watcher.onResourceDoesNotExist(resource); + if (resourceTimerIsTransientError) { + watcher.onError(Status.UNAVAILABLE.withDescription( + "Timed out waiting for resource " + resource + " from xDS server")); + } else { + watcher.onResourceDoesNotExist(resource); + } } finally { if (processingTracker != null) { processingTracker.onComplete(); diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java index 6167f491930..3650e5d5bb9 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java @@ -3549,7 +3549,7 @@ private static Filter buildHttpConnectionManagerFilter(HttpFilter... httpFilters private XdsResourceType.Args getXdsResourceTypeArgs(boolean isTrustedServer) { return new XdsResourceType.Args( - ServerInfo.create("http://td", "", false, isTrustedServer), "1.0", null, null, null, null + ServerInfo.create("http://td", "", false, isTrustedServer, false), "1.0", null, null, null, null ); } } diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 32361684a6d..cd1c19666f0 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -85,6 +85,7 @@ import io.grpc.xds.client.Bootstrapper.BootstrapInfo; import io.grpc.xds.client.Bootstrapper.CertificateProviderInfo; import io.grpc.xds.client.Bootstrapper.ServerInfo; +import io.grpc.xds.client.BootstrapperImpl; import io.grpc.xds.client.EnvoyProtoData.Node; import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.client.Locality; @@ -304,6 +305,30 @@ public long currentTimeNanos() { private final BindableService adsService = createAdsService(); private final BindableService lrsService = createLrsService(); + private XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() { + @Override + public XdsTransport create(ServerInfo serverInfo) { + if (serverInfo.target().equals(SERVER_URI)) { + return new GrpcXdsTransport(channel); + } + if (serverInfo.target().equals(SERVER_URI_CUSTOME_AUTHORITY)) { + if (channelForCustomAuthority == null) { + channelForCustomAuthority = cleanupRule.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); + } + return new GrpcXdsTransport(channelForCustomAuthority); + } + if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) { + if (channelForEmptyAuthority == null) { + channelForEmptyAuthority = cleanupRule.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); + } + return new GrpcXdsTransport(channelForEmptyAuthority); + } + throw new IllegalArgumentException("Can not create channel for " + serverInfo); + } + }; + @Before public void setUp() throws IOException { when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); @@ -322,32 +347,9 @@ public void setUp() throws IOException { .start()); channel = cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); - XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() { - @Override - public XdsTransport create(ServerInfo serverInfo) { - if (serverInfo.target().equals(SERVER_URI)) { - return new GrpcXdsTransport(channel); - } - if (serverInfo.target().equals(SERVER_URI_CUSTOME_AUTHORITY)) { - if (channelForCustomAuthority == null) { - channelForCustomAuthority = cleanupRule.register( - InProcessChannelBuilder.forName(serverName).directExecutor().build()); - } - return new GrpcXdsTransport(channelForCustomAuthority); - } - if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) { - if (channelForEmptyAuthority == null) { - channelForEmptyAuthority = cleanupRule.register( - InProcessChannelBuilder.forName(serverName).directExecutor().build()); - } - return new GrpcXdsTransport(channelForEmptyAuthority); - } - throw new IllegalArgumentException("Can not create channel for " + serverInfo); - } - }; xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, ignoreResourceDeletion(), - true); + true, false); BootstrapInfo bootstrapInfo = Bootstrapper.BootstrapInfo.builder() .servers(Collections.singletonList(xdsServerInfo)) @@ -3155,6 +3157,108 @@ public void flowControlAbsent() throws Exception { verify(anotherWatcher).onError(any()); } + @Test + @SuppressWarnings("unchecked") + public void resourceTimerIsTransientError_schedulesExtendedTimeout() { + BootstrapperImpl.xdsDataErrorHandlingEnabled = true; + ServerInfo serverInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, + false, true, true); + BootstrapInfo bootstrapInfo = + Bootstrapper.BootstrapInfo.builder() + .servers(Collections.singletonList(serverInfo)) + .node(NODE) + .authorities(ImmutableMap.of( + "", + AuthorityInfo.create( + "xdstp:///envoy.config.listener.v3.Listener/%s", + ImmutableList.of(Bootstrapper.ServerInfo.create( + SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS))))) + .certProviders(ImmutableMap.of()) + .build(); + xdsClient = new XdsClientImpl( + xdsTransportFactory, + bootstrapInfo, + fakeClock.getScheduledExecutorService(), + backoffPolicyProvider, + fakeClock.getStopwatchSupplier(), + timeProvider, + MessagePrinter.INSTANCE, + new TlsContextManagerImpl(bootstrapInfo), + xdsClientMetricReporter); + ResourceWatcher watcher = mock(ResourceWatcher.class); + String resourceName = "cluster.googleapis.com"; + + xdsClient.watchXdsResource( + XdsClusterResource.getInstance(), + resourceName, + watcher, + fakeClock.getScheduledExecutorService()); + + ScheduledTask task = Iterables.getOnlyElement( + fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); + assertThat(task.getDelay(TimeUnit.SECONDS)) + .isEqualTo(XdsClientImpl.EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC); + fakeClock.runDueTasks(); + BootstrapperImpl.xdsDataErrorHandlingEnabled = false; + } + + @Test + @SuppressWarnings("unchecked") + public void resourceTimerIsTransientError_callsOnErrorUnavailable() { + BootstrapperImpl.xdsDataErrorHandlingEnabled = true; + xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, ignoreResourceDeletion(), + true, true); + BootstrapInfo bootstrapInfo = + Bootstrapper.BootstrapInfo.builder() + .servers(Collections.singletonList(xdsServerInfo)) + .node(NODE) + .authorities(ImmutableMap.of( + "authority.xds.com", + AuthorityInfo.create( + "xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s", + ImmutableList.of(Bootstrapper.ServerInfo.create( + SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))), + "", + AuthorityInfo.create( + "xdstp:///envoy.config.listener.v3.Listener/%s", + ImmutableList.of(Bootstrapper.ServerInfo.create( + SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS))))) + .certProviders(ImmutableMap.of("cert-instance-name", + CertificateProviderInfo.create("file-watcher", ImmutableMap.of()))) + .build(); + xdsClient = new XdsClientImpl( + xdsTransportFactory, + bootstrapInfo, + fakeClock.getScheduledExecutorService(), + backoffPolicyProvider, + fakeClock.getStopwatchSupplier(), + timeProvider, + MessagePrinter.INSTANCE, + new TlsContextManagerImpl(bootstrapInfo), + xdsClientMetricReporter); + String timeoutResource = CDS_RESOURCE + "_timeout"; + ResourceWatcher timeoutWatcher = mock(ResourceWatcher.class); + + xdsClient.watchXdsResource( + XdsClusterResource.getInstance(), + timeoutResource, + timeoutWatcher, + fakeClock.getScheduledExecutorService()); + + assertThat(resourceDiscoveryCalls).hasSize(1); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(CDS, ImmutableList.of(timeoutResource), "", "", NODE); + fakeClock.forwardTime(XdsClientImpl.EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + fakeClock.runDueTasks(); + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(Status.class); + verify(timeoutWatcher).onError(errorCaptor.capture()); + Status error = errorCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(error.getDescription()).isEqualTo( + "Timed out waiting for resource " + timeoutResource + " from xDS server"); + BootstrapperImpl.xdsDataErrorHandlingEnabled = false; + } + private Answer blockUpdate(CyclicBarrier barrier) { return new Answer() { @Override @@ -4220,7 +4324,7 @@ private XdsClientImpl createXdsClient(String serverUri) { private BootstrapInfo buildBootStrap(String serverUri) { ServerInfo xdsServerInfo = ServerInfo.create(serverUri, CHANNEL_CREDENTIALS, - ignoreResourceDeletion(), true); + ignoreResourceDeletion(), true, false); return Bootstrapper.BootstrapInfo.builder() .servers(Collections.singletonList(xdsServerInfo)) diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index f5c40fa2117..5a577c7405f 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -367,13 +367,13 @@ public void resolving_targetAuthorityInAuthoritiesMap() { String serviceAuthority = "[::FFFF:129.144.52.38]:80"; bootstrapInfo = BootstrapInfo.builder() .servers(ImmutableList.of(ServerInfo.create( - "td.googleapis.com", InsecureChannelCredentials.create(), true, true))) + "td.googleapis.com", InsecureChannelCredentials.create(), true, true, false))) .node(Node.newBuilder().build()) .authorities( ImmutableMap.of(targetAuthority, AuthorityInfo.create( "xdstp://" + targetAuthority + "/envoy.config.listener.v3.Listener/%s?foo=1&bar=2", ImmutableList.of(ServerInfo.create( - "td.googleapis.com", InsecureChannelCredentials.create(), true, true))))) + "td.googleapis.com", InsecureChannelCredentials.create(), true, true, false))))) .build(); expectedLdsResourceName = "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/" + "%5B::FFFF:129.144.52.38%5D:80?bar=2&foo=1"; // query param canonified diff --git a/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java b/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java index 485970741c1..754e903f8a9 100644 --- a/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java +++ b/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java @@ -203,7 +203,7 @@ public static Bootstrapper.BootstrapInfo buildBootStrap(List serverUris) List serverInfos = new ArrayList<>(); for (String uri : serverUris) { - serverInfos.add(ServerInfo.create(uri, CHANNEL_CREDENTIALS, false, true)); + serverInfos.add(ServerInfo.create(uri, CHANNEL_CREDENTIALS, false, true, false)); } EnvoyProtoData.Node node = EnvoyProtoData.Node.newBuilder().setId("node-id").build();