diff --git a/internal/xds/clients/xdsclient/internal/internal.go b/internal/xds/clients/xdsclient/internal/internal.go index 7adb67190939..38d2473e5c6c 100644 --- a/internal/xds/clients/xdsclient/internal/internal.go +++ b/internal/xds/clients/xdsclient/internal/internal.go @@ -21,10 +21,6 @@ package internal import "time" var ( - // WatchExpiryTimeout is the watch expiry timeout for xDS client. It can be - // overridden by tests to change the default watch expiry timeout. - WatchExpiryTimeout time.Duration - // StreamBackoff is the stream backoff for xDS client. It can be overridden // by tests to change the default backoff strategy. StreamBackoff func(int) time.Duration diff --git a/internal/xds/clients/xdsclient/test/ads_stream_watch_test.go b/internal/xds/clients/xdsclient/test/ads_stream_watch_test.go index 2875d537cee9..40ca8eaaba0e 100644 --- a/internal/xds/clients/xdsclient/test/ads_stream_watch_test.go +++ b/internal/xds/clients/xdsclient/test/ads_stream_watch_test.go @@ -28,6 +28,7 @@ import ( "github.com/google/uuid" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/xds/clients" "google.golang.org/grpc/internal/xds/clients/grpctransport" "google.golang.org/grpc/internal/xds/clients/internal/testutils" "google.golang.org/grpc/internal/xds/clients/xdsclient" @@ -175,9 +176,34 @@ func (s) TestADS_WatchState_TimerFires(t *testing.T) { // short resource expiry timeout. nodeID := uuid.New().String() configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} - overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout) - client := createXDSClient(t, mgmtServer.Address, nodeID, grpctransport.NewBuilder(configs)) - + resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType} + si := clients.ServerIdentifier{ + ServerURI: mgmtServer.Address, + Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}, + } + + xdsClientConfig := xdsclient.Config{ + Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}}, + Node: clients.Node{ID: nodeID, UserAgentName: "user-agent", UserAgentVersion: "0.0.0.0"}, + TransportBuilder: grpctransport.NewBuilder(configs), + ResourceTypes: resourceTypes, + // Xdstp resource names used in this test do not specify an + // authority. These will end up looking up an entry with the + // empty key in the authorities map. Having an entry with an + // empty key and empty configuration, results in these + // resources also using the top-level configuration. + Authorities: map[string]xdsclient.Authority{ + "": {XDSServers: []xdsclient.ServerConfig{}}, + }, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + } + + // Create an xDS client with the above config. + client, err := xdsclient.New(xdsClientConfig) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + t.Cleanup(func() { client.Close() }) // Create a watch for the first listener resource and verify that the timer // is running and the watch state is `requested`. const listenerName = "listener" diff --git a/internal/xds/clients/xdsclient/test/authority_test.go b/internal/xds/clients/xdsclient/test/authority_test.go index 424ae15c931e..9941b989e9ab 100644 --- a/internal/xds/clients/xdsclient/test/authority_test.go +++ b/internal/xds/clients/xdsclient/test/authority_test.go @@ -105,10 +105,10 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T) (*testutils.Liste testAuthority2: {XDSServers: []xdsclient.ServerConfig{}}, testAuthority3: {XDSServers: []xdsclient.ServerConfig{{ServerIdentifier: siNonDefault}}}, }, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, } // Create an xDS client with the above config. - overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout) client, err := xdsclient.New(xdsClientConfig) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/internal/xds/clients/xdsclient/test/lds_watchers_test.go b/internal/xds/clients/xdsclient/test/lds_watchers_test.go index e41375590f9a..ae4783921aec 100644 --- a/internal/xds/clients/xdsclient/test/lds_watchers_test.go +++ b/internal/xds/clients/xdsclient/test/lds_watchers_test.go @@ -35,7 +35,6 @@ import ( "google.golang.org/grpc/internal/xds/clients/internal/testutils" "google.golang.org/grpc/internal/xds/clients/internal/testutils/e2e" "google.golang.org/grpc/internal/xds/clients/xdsclient" - xdsclientinternal "google.golang.org/grpc/internal/xds/clients/xdsclient/internal" "google.golang.org/grpc/internal/xds/clients/xdsclient/internal/xdsresource" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" @@ -115,12 +114,6 @@ func badListenerResource(t *testing.T, name string) *v3listenerpb.Listener { } } -func overrideWatchExpiryTimeout(t *testing.T, watchExpiryTimeout time.Duration) { - originalWatchExpiryTimeout := xdsclientinternal.WatchExpiryTimeout - xdsclientinternal.WatchExpiryTimeout = watchExpiryTimeout - t.Cleanup(func() { xdsclientinternal.WatchExpiryTimeout = originalWatchExpiryTimeout }) -} - // verifyNoListenerUpdate verifies that no listener update is received on the // provided update channel, and returns an error if an update is received. // @@ -726,11 +719,10 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { Node: clients.Node{ID: nodeID}, TransportBuilder: grpctransport.NewBuilder(configs), ResourceTypes: resourceTypes, + // Override the default watch expiry timeout. + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, } - // Create an xDS client with the above config and override the default - // watch expiry timeout. - overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout) client, err := xdsclient.New(xdsClientConfig) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -777,11 +769,11 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { Node: clients.Node{ID: nodeID}, TransportBuilder: grpctransport.NewBuilder(configs), ResourceTypes: resourceTypes, + // Override the default watch expiry timeout. + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, } - // Create an xDS client with the above config and override the default - // watch expiry timeout. - overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout) + // Create an xDS client with the above config. client, err := xdsclient.New(xdsClientConfig) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/internal/xds/clients/xdsclient/xdsclient.go b/internal/xds/clients/xdsclient/xdsclient.go index a1949cfa5bce..cc7d5c4e264d 100644 --- a/internal/xds/clients/xdsclient/xdsclient.go +++ b/internal/xds/clients/xdsclient/xdsclient.go @@ -61,7 +61,6 @@ var ( ) func init() { - xdsclientinternal.WatchExpiryTimeout = defaultWatchExpiryTimeout xdsclientinternal.StreamBackoff = defaultExponentialBackoff xdsclientinternal.ResourceWatchStateForTesting = resourceWatchStateForTesting } @@ -108,7 +107,9 @@ func New(config Config) (*XDSClient, error) { case config.Authorities == nil && config.Servers == nil: return nil, errors.New("xdsclient: no servers or authorities specified") } - + if config.WatchExpiryTimeout == 0 { + config.WatchExpiryTimeout = defaultWatchExpiryTimeout + } client, err := newClient(&config, name) if err != nil { return nil, err @@ -116,12 +117,6 @@ func New(config Config) (*XDSClient, error) { return client, nil } -// SetWatchExpiryTimeoutForTesting override the default watch expiry timeout -// with provided timeout value. -func (c *XDSClient) SetWatchExpiryTimeoutForTesting(watchExpiryTimeout time.Duration) { - c.watchExpiryTimeout = watchExpiryTimeout -} - // newClient returns a new XDSClient with the given config. func newClient(config *Config, target string) (*XDSClient, error) { ctx, cancel := context.WithCancel(context.Background()) @@ -130,7 +125,7 @@ func newClient(config *Config, target string) (*XDSClient, error) { done: syncutil.NewEvent(), authorities: make(map[string]*authority), config: config, - watchExpiryTimeout: xdsclientinternal.WatchExpiryTimeout, + watchExpiryTimeout: config.WatchExpiryTimeout, backoff: xdsclientinternal.StreamBackoff, serializer: syncutil.NewCallbackSerializer(ctx), serializerClose: cancel, diff --git a/internal/xds/clients/xdsclient/xdsconfig.go b/internal/xds/clients/xdsclient/xdsconfig.go index 8d3c101e63dc..9d376e508c4f 100644 --- a/internal/xds/clients/xdsclient/xdsconfig.go +++ b/internal/xds/clients/xdsclient/xdsconfig.go @@ -19,6 +19,8 @@ package xdsclient import ( + "time" + "google.golang.org/grpc/internal/xds/clients" ) @@ -60,6 +62,13 @@ type Config struct { // MetricsReporter is used to report registered metrics. If unset, no // metrics will be reported. MetricsReporter clients.MetricsReporter + + // WatchExpiryTimeout is the duration after which a resource watch expires + // if the requested resource is not received from the management server. + // Most users will not need to set this. If zero, a default value of 15 + // seconds is used as specified here: + // envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#knowing-when-a-requested-resource-does-not-exist + WatchExpiryTimeout time.Duration } // ServerConfig contains configuration for an xDS management server. diff --git a/internal/xds/xdsclient/clientimpl.go b/internal/xds/xdsclient/clientimpl.go index dab08620ac1d..b1f797993fd7 100644 --- a/internal/xds/xdsclient/clientimpl.go +++ b/internal/xds/xdsclient/clientimpl.go @@ -120,8 +120,8 @@ func (mr *metricsReporter) ReportMetric(metric any) { } } -func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string) (*clientImpl, error) { - gConfig, err := buildXDSClientConfig(config, metricsRecorder, target) +func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string, watchExpiryTimeout time.Duration) (*clientImpl, error) { + gConfig, err := buildXDSClientConfig(config, metricsRecorder, target, watchExpiryTimeout) if err != nil { return nil, err } @@ -163,7 +163,7 @@ func (c *clientImpl) decrRef() int32 { } // buildXDSClientConfig builds the xdsclient.Config from the bootstrap.Config. -func buildXDSClientConfig(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string) (xdsclient.Config, error) { +func buildXDSClientConfig(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string, watchExpiryTimeout time.Duration) (xdsclient.Config, error) { grpcTransportConfigs := make(map[string]grpctransport.Config) gServerCfgMap := make(map[xdsclient.ServerConfig]*bootstrap.ServerConfig) @@ -218,12 +218,13 @@ func buildXDSClientConfig(config *bootstrap.Config, metricsRecorder estats.Metri } return xdsclient.Config{ - Authorities: gAuthorities, - Servers: gServerCfgs, - Node: gNode, - TransportBuilder: grpctransport.NewBuilder(grpcTransportConfigs), - ResourceTypes: supportedResourceTypes(config, gServerCfgMap), - MetricsReporter: &metricsReporter{recorder: metricsRecorder, target: target}, + Authorities: gAuthorities, + Servers: gServerCfgs, + Node: gNode, + TransportBuilder: grpctransport.NewBuilder(grpcTransportConfigs), + ResourceTypes: supportedResourceTypes(config, gServerCfgMap), + MetricsReporter: &metricsReporter{recorder: metricsRecorder, target: target}, + WatchExpiryTimeout: watchExpiryTimeout, }, nil } diff --git a/internal/xds/xdsclient/clientimpl_test.go b/internal/xds/xdsclient/clientimpl_test.go index 85792a3d648d..d2fc8f7f9332 100644 --- a/internal/xds/xdsclient/clientimpl_test.go +++ b/internal/xds/xdsclient/clientimpl_test.go @@ -208,7 +208,7 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) { if err != nil { t.Fatalf("Failed to create bootstrap config: %v", err) } - gotCfg, err := buildXDSClientConfig(bootstrapConfig, stats.NewTestMetricsRecorder(), testTargetName) + gotCfg, err := buildXDSClientConfig(bootstrapConfig, stats.NewTestMetricsRecorder(), testTargetName, 0) if err != nil { t.Fatalf("Failed to build XDSClientConfig: %v", err) } diff --git a/internal/xds/xdsclient/pool.go b/internal/xds/xdsclient/pool.go index d1cc84762307..eb0197e09a7f 100644 --- a/internal/xds/xdsclient/pool.go +++ b/internal/xds/xdsclient/pool.go @@ -99,7 +99,7 @@ func NewPool(config *bootstrap.Config) *Pool { // expected to invoke once they are done using the client. It is safe for the // caller to invoke this close function multiple times. func (p *Pool) NewClient(name string, metricsRecorder estats.MetricsRecorder) (XDSClient, func(), error) { - return p.newRefCounted(name, metricsRecorder) + return p.newRefCounted(name, metricsRecorder, defaultWatchExpiryTimeout) } // NewClientForTesting returns an xDS client configured with the provided @@ -126,11 +126,10 @@ func (p *Pool) NewClientForTesting(opts OptionsForTesting) (XDSClient, func(), e if opts.MetricsRecorder == nil { opts.MetricsRecorder = istats.NewMetricsRecorderList(nil) } - c, cancel, err := p.newRefCounted(opts.Name, opts.MetricsRecorder) + c, cancel, err := p.newRefCounted(opts.Name, opts.MetricsRecorder, opts.WatchExpiryTimeout) if err != nil { return nil, nil, err } - c.SetWatchExpiryTimeoutForTesting(opts.WatchExpiryTimeout) return c, cancel, nil } @@ -252,7 +251,7 @@ func (p *Pool) clientRefCountedClose(name string) { // newRefCounted creates a new reference counted xDS client implementation for // name, if one does not exist already. If an xDS client for the given name // exists, it gets a reference to it and returns it. -func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder) (*clientImpl, func(), error) { +func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder, watchExpiryTimeout time.Duration) (*clientImpl, func(), error) { p.mu.Lock() defer p.mu.Unlock() @@ -276,7 +275,7 @@ func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil } - c, err := newClientImpl(config, metricsRecorder, name) + c, err := newClientImpl(config, metricsRecorder, name, watchExpiryTimeout) if err != nil { return nil, nil, err } diff --git a/internal/xds/xdsclient/tests/loadreport_test.go b/internal/xds/xdsclient/tests/loadreport_test.go index e33f3799cd8b..95a9354d0511 100644 --- a/internal/xds/xdsclient/tests/loadreport_test.go +++ b/internal/xds/xdsclient/tests/loadreport_test.go @@ -481,8 +481,6 @@ func (s) TestConcurrentReportLoad(t *testing.T) { // concurrently with a shared XDSClient, each of which will create a new LRS // stream without any race. func (s) TestConcurrentChannels(t *testing.T) { - // TODO(emchandwani) : Unskip after https://github.com/grpc/grpc-go/pull/8526 gets merged. - t.Skip() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel()