Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions internal/xds/clients/xdsclient/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ package internal
import "time"

var (
// WatchExpiryTimeout is the watch expiry timeout for xDS client. It can be
// overridden by tests to change the default watch expiry timeout.
WatchExpiryTimeout time.Duration

// StreamBackoff is the stream backoff for xDS client. It can be overridden
// by tests to change the default backoff strategy.
StreamBackoff func(int) time.Duration
Expand Down
32 changes: 29 additions & 3 deletions internal/xds/clients/xdsclient/test/ads_stream_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/google/uuid"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/grpctransport"
"google.golang.org/grpc/internal/xds/clients/internal/testutils"
"google.golang.org/grpc/internal/xds/clients/xdsclient"
Expand Down Expand Up @@ -175,9 +176,34 @@ func (s) TestADS_WatchState_TimerFires(t *testing.T) {
// short resource expiry timeout.
nodeID := uuid.New().String()
configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}}
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
client := createXDSClient(t, mgmtServer.Address, nodeID, grpctransport.NewBuilder(configs))

resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType}
si := clients.ServerIdentifier{
ServerURI: mgmtServer.Address,
Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"},
}

xdsClientConfig := xdsclient.Config{
Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}},
Node: clients.Node{ID: nodeID, UserAgentName: "user-agent", UserAgentVersion: "0.0.0.0"},
TransportBuilder: grpctransport.NewBuilder(configs),
ResourceTypes: resourceTypes,
// Xdstp resource names used in this test do not specify an
// authority. These will end up looking up an entry with the
// empty key in the authorities map. Having an entry with an
// empty key and empty configuration, results in these
// resources also using the top-level configuration.
Authorities: map[string]xdsclient.Authority{
"": {XDSServers: []xdsclient.ServerConfig{}},
},
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
}

// Create an xDS client with the above config.
client, err := xdsclient.New(xdsClientConfig)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
t.Cleanup(func() { client.Close() })
// Create a watch for the first listener resource and verify that the timer
// is running and the watch state is `requested`.
const listenerName = "listener"
Expand Down
2 changes: 1 addition & 1 deletion internal/xds/clients/xdsclient/test/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T) (*testutils.Liste
testAuthority2: {XDSServers: []xdsclient.ServerConfig{}},
testAuthority3: {XDSServers: []xdsclient.ServerConfig{{ServerIdentifier: siNonDefault}}},
},
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
}

// Create an xDS client with the above config.
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
client, err := xdsclient.New(xdsClientConfig)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
Expand Down
18 changes: 5 additions & 13 deletions internal/xds/clients/xdsclient/test/lds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"google.golang.org/grpc/internal/xds/clients/internal/testutils"
"google.golang.org/grpc/internal/xds/clients/internal/testutils/e2e"
"google.golang.org/grpc/internal/xds/clients/xdsclient"
xdsclientinternal "google.golang.org/grpc/internal/xds/clients/xdsclient/internal"
"google.golang.org/grpc/internal/xds/clients/xdsclient/internal/xdsresource"

v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
Expand Down Expand Up @@ -115,12 +114,6 @@ func badListenerResource(t *testing.T, name string) *v3listenerpb.Listener {
}
}

func overrideWatchExpiryTimeout(t *testing.T, watchExpiryTimeout time.Duration) {
originalWatchExpiryTimeout := xdsclientinternal.WatchExpiryTimeout
xdsclientinternal.WatchExpiryTimeout = watchExpiryTimeout
t.Cleanup(func() { xdsclientinternal.WatchExpiryTimeout = originalWatchExpiryTimeout })
}

// verifyNoListenerUpdate verifies that no listener update is received on the
// provided update channel, and returns an error if an update is received.
//
Expand Down Expand Up @@ -726,11 +719,10 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
Node: clients.Node{ID: nodeID},
TransportBuilder: grpctransport.NewBuilder(configs),
ResourceTypes: resourceTypes,
// Override the default watch expiry timeout.
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
}

// Create an xDS client with the above config and override the default
// watch expiry timeout.
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
client, err := xdsclient.New(xdsClientConfig)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
Expand Down Expand Up @@ -777,11 +769,11 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
Node: clients.Node{ID: nodeID},
TransportBuilder: grpctransport.NewBuilder(configs),
ResourceTypes: resourceTypes,
// Override the default watch expiry timeout.
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
}

// Create an xDS client with the above config and override the default
// watch expiry timeout.
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
// Create an xDS client with the above config.
client, err := xdsclient.New(xdsClientConfig)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
Expand Down
13 changes: 4 additions & 9 deletions internal/xds/clients/xdsclient/xdsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ var (
)

func init() {
xdsclientinternal.WatchExpiryTimeout = defaultWatchExpiryTimeout
xdsclientinternal.StreamBackoff = defaultExponentialBackoff
xdsclientinternal.ResourceWatchStateForTesting = resourceWatchStateForTesting
}
Expand Down Expand Up @@ -108,20 +107,16 @@ func New(config Config) (*XDSClient, error) {
case config.Authorities == nil && config.Servers == nil:
return nil, errors.New("xdsclient: no servers or authorities specified")
}

if config.WatchExpiryTimeout == 0 {
config.WatchExpiryTimeout = defaultWatchExpiryTimeout
}
client, err := newClient(&config, name)
if err != nil {
return nil, err
}
return client, nil
}

// SetWatchExpiryTimeoutForTesting override the default watch expiry timeout
// with provided timeout value.
func (c *XDSClient) SetWatchExpiryTimeoutForTesting(watchExpiryTimeout time.Duration) {
c.watchExpiryTimeout = watchExpiryTimeout
}

// newClient returns a new XDSClient with the given config.
func newClient(config *Config, target string) (*XDSClient, error) {
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -130,7 +125,7 @@ func newClient(config *Config, target string) (*XDSClient, error) {
done: syncutil.NewEvent(),
authorities: make(map[string]*authority),
config: config,
watchExpiryTimeout: xdsclientinternal.WatchExpiryTimeout,
watchExpiryTimeout: config.WatchExpiryTimeout,
backoff: xdsclientinternal.StreamBackoff,
serializer: syncutil.NewCallbackSerializer(ctx),
serializerClose: cancel,
Expand Down
9 changes: 9 additions & 0 deletions internal/xds/clients/xdsclient/xdsconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package xdsclient

import (
"time"

"google.golang.org/grpc/internal/xds/clients"
)

Expand Down Expand Up @@ -60,6 +62,13 @@ type Config struct {
// MetricsReporter is used to report registered metrics. If unset, no
// metrics will be reported.
MetricsReporter clients.MetricsReporter

// WatchExpiryTimeout is the duration after which a resource watch expires
// if the requested resource is not received from the management server.
// Most users will not need to set this. If zero, a default value of 15
// seconds is used as specified here:
// envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#knowing-when-a-requested-resource-does-not-exist
WatchExpiryTimeout time.Duration
}

// ServerConfig contains configuration for an xDS management server.
Expand Down
19 changes: 10 additions & 9 deletions internal/xds/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ func (mr *metricsReporter) ReportMetric(metric any) {
}
}

func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string) (*clientImpl, error) {
gConfig, err := buildXDSClientConfig(config, metricsRecorder, target)
func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string, watchExpiryTimeout time.Duration) (*clientImpl, error) {
gConfig, err := buildXDSClientConfig(config, metricsRecorder, target, watchExpiryTimeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func (c *clientImpl) decrRef() int32 {
}

// buildXDSClientConfig builds the xdsclient.Config from the bootstrap.Config.
func buildXDSClientConfig(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string) (xdsclient.Config, error) {
func buildXDSClientConfig(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string, watchExpiryTimeout time.Duration) (xdsclient.Config, error) {
grpcTransportConfigs := make(map[string]grpctransport.Config)
gServerCfgMap := make(map[xdsclient.ServerConfig]*bootstrap.ServerConfig)

Expand Down Expand Up @@ -218,12 +218,13 @@ func buildXDSClientConfig(config *bootstrap.Config, metricsRecorder estats.Metri
}

return xdsclient.Config{
Authorities: gAuthorities,
Servers: gServerCfgs,
Node: gNode,
TransportBuilder: grpctransport.NewBuilder(grpcTransportConfigs),
ResourceTypes: supportedResourceTypes(config, gServerCfgMap),
MetricsReporter: &metricsReporter{recorder: metricsRecorder, target: target},
Authorities: gAuthorities,
Servers: gServerCfgs,
Node: gNode,
TransportBuilder: grpctransport.NewBuilder(grpcTransportConfigs),
ResourceTypes: supportedResourceTypes(config, gServerCfgMap),
MetricsReporter: &metricsReporter{recorder: metricsRecorder, target: target},
WatchExpiryTimeout: watchExpiryTimeout,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/xds/xdsclient/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create bootstrap config: %v", err)
}
gotCfg, err := buildXDSClientConfig(bootstrapConfig, stats.NewTestMetricsRecorder(), testTargetName)
gotCfg, err := buildXDSClientConfig(bootstrapConfig, stats.NewTestMetricsRecorder(), testTargetName, 0)
if err != nil {
t.Fatalf("Failed to build XDSClientConfig: %v", err)
}
Expand Down
9 changes: 4 additions & 5 deletions internal/xds/xdsclient/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewPool(config *bootstrap.Config) *Pool {
// expected to invoke once they are done using the client. It is safe for the
// caller to invoke this close function multiple times.
func (p *Pool) NewClient(name string, metricsRecorder estats.MetricsRecorder) (XDSClient, func(), error) {
return p.newRefCounted(name, metricsRecorder)
return p.newRefCounted(name, metricsRecorder, defaultWatchExpiryTimeout)
}

// NewClientForTesting returns an xDS client configured with the provided
Expand All @@ -126,11 +126,10 @@ func (p *Pool) NewClientForTesting(opts OptionsForTesting) (XDSClient, func(), e
if opts.MetricsRecorder == nil {
opts.MetricsRecorder = istats.NewMetricsRecorderList(nil)
}
c, cancel, err := p.newRefCounted(opts.Name, opts.MetricsRecorder)
c, cancel, err := p.newRefCounted(opts.Name, opts.MetricsRecorder, opts.WatchExpiryTimeout)
if err != nil {
return nil, nil, err
}
c.SetWatchExpiryTimeoutForTesting(opts.WatchExpiryTimeout)
return c, cancel, nil
}

Expand Down Expand Up @@ -252,7 +251,7 @@ func (p *Pool) clientRefCountedClose(name string) {
// newRefCounted creates a new reference counted xDS client implementation for
// name, if one does not exist already. If an xDS client for the given name
// exists, it gets a reference to it and returns it.
func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder) (*clientImpl, func(), error) {
func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder, watchExpiryTimeout time.Duration) (*clientImpl, func(), error) {
p.mu.Lock()
defer p.mu.Unlock()

Expand All @@ -276,7 +275,7 @@ func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder
return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
}

c, err := newClientImpl(config, metricsRecorder, name)
c, err := newClientImpl(config, metricsRecorder, name, watchExpiryTimeout)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions internal/xds/xdsclient/tests/loadreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,6 @@ func (s) TestConcurrentReportLoad(t *testing.T) {
// concurrently with a shared XDSClient, each of which will create a new LRS
// stream without any race.
func (s) TestConcurrentChannels(t *testing.T) {
// TODO(emchandwani) : Unskip after https://github.com/grpc/grpc-go/pull/8526 gets merged.
t.Skip()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

Expand Down