Skip to content

Commit fd6c6f8

Browse files
watch config
1 parent 01c7c86 commit fd6c6f8

20 files changed

+142
-78
lines changed

internal/xds/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"google.golang.org/grpc/internal/testutils/pickfirst"
3838
"google.golang.org/grpc/internal/testutils/xds/e2e"
3939
"google.golang.org/grpc/internal/xds/bootstrap"
40-
ixdsclient "google.golang.org/grpc/internal/xds/clients/xdsclient"
4140
"google.golang.org/grpc/internal/xds/xdsclient"
4241
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version"
4342
"google.golang.org/grpc/peer"
@@ -1186,10 +1185,11 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
11861185
if err != nil {
11871186
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
11881187
}
1189-
revertWatchExpiryTimeout := ixdsclient.SetWatchExpiryTimeoutForTesting(defaultTestWatchExpiryTimeout)
1190-
defer revertWatchExpiryTimeout()
11911188
pool := xdsclient.NewPool(config)
1192-
xdsClient, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: t.Name()})
1189+
xdsClient, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{
1190+
Name: t.Name(),
1191+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
1192+
})
11931193
if err != nil {
11941194
t.Fatalf("Failed to create an xDS client: %v", err)
11951195
}

internal/xds/balancer/clusterresolver/e2e_test/eds_impl_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ import (
4141
rrutil "google.golang.org/grpc/internal/testutils/roundrobin"
4242
"google.golang.org/grpc/internal/testutils/xds/e2e"
4343
"google.golang.org/grpc/internal/xds/bootstrap"
44-
ixdsclient "google.golang.org/grpc/internal/xds/clients/xdsclient"
4544
"google.golang.org/grpc/internal/xds/xdsclient"
4645
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version"
4746
"google.golang.org/grpc/peer"
@@ -1102,10 +1101,11 @@ func (s) TestEDS_ResourceNotFound(t *testing.T) {
11021101
if err != nil {
11031102
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bc), err)
11041103
}
1105-
revertWatchExpiryTimeout := ixdsclient.SetWatchExpiryTimeoutForTesting(defaultTestWatchExpiryTimeout)
1106-
defer revertWatchExpiryTimeout()
11071104
pool := xdsclient.NewPool(config)
1108-
xdsClient, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: t.Name()})
1105+
xdsClient, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{
1106+
Name: t.Name(),
1107+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
1108+
})
11091109
if err != nil {
11101110
t.Fatalf("Failed to create an xDS client: %v", err)
11111111
}

internal/xds/clients/xdsclient/test/ads_stream_watch_test.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/google/uuid"
2929
"google.golang.org/grpc/credentials/insecure"
3030
"google.golang.org/grpc/internal/testutils/xds/e2e"
31+
"google.golang.org/grpc/internal/xds/clients"
3132
"google.golang.org/grpc/internal/xds/clients/grpctransport"
3233
"google.golang.org/grpc/internal/xds/clients/internal/testutils"
3334
"google.golang.org/grpc/internal/xds/clients/xdsclient"
@@ -175,9 +176,34 @@ func (s) TestADS_WatchState_TimerFires(t *testing.T) {
175176
// short resource expiry timeout.
176177
nodeID := uuid.New().String()
177178
configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}}
178-
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
179-
client := createXDSClient(t, mgmtServer.Address, nodeID, grpctransport.NewBuilder(configs))
180-
179+
resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType}
180+
si := clients.ServerIdentifier{
181+
ServerURI: mgmtServer.Address,
182+
Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"},
183+
}
184+
185+
xdsClientConfig := xdsclient.Config{
186+
Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}},
187+
Node: clients.Node{ID: nodeID, UserAgentName: "user-agent", UserAgentVersion: "0.0.0.0"},
188+
TransportBuilder: grpctransport.NewBuilder(configs),
189+
ResourceTypes: resourceTypes,
190+
// Xdstp resource names used in this test do not specify an
191+
// authority. These will end up looking up an entry with the
192+
// empty key in the authorities map. Having an entry with an
193+
// empty key and empty configuration, results in these
194+
// resources also using the top-level configuration.
195+
Authorities: map[string]xdsclient.Authority{
196+
"": {XDSServers: []xdsclient.ServerConfig{}},
197+
},
198+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
199+
}
200+
201+
// Create an xDS client with the above config.
202+
client, err := xdsclient.New(xdsClientConfig)
203+
if err != nil {
204+
t.Fatalf("Failed to create xDS client: %v", err)
205+
}
206+
t.Cleanup(func() { client.Close() })
181207
// Create a watch for the first listener resource and verify that the timer
182208
// is running and the watch state is `requested`.
183209
const listenerName = "listener"

internal/xds/clients/xdsclient/test/authority_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,10 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T) (*testutils.Liste
105105
testAuthority2: {XDSServers: []xdsclient.ServerConfig{}},
106106
testAuthority3: {XDSServers: []xdsclient.ServerConfig{{ServerIdentifier: siNonDefault}}},
107107
},
108+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
108109
}
109110

110111
// Create an xDS client with the above config.
111-
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
112112
client, err := xdsclient.New(xdsClientConfig)
113113
if err != nil {
114114
t.Fatalf("Failed to create xDS client: %v", err)

internal/xds/clients/xdsclient/test/lds_watchers_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -726,11 +726,10 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
726726
Node: clients.Node{ID: nodeID},
727727
TransportBuilder: grpctransport.NewBuilder(configs),
728728
ResourceTypes: resourceTypes,
729+
// Override the default watch expiry timeout.
730+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
729731
}
730732

731-
// Create an xDS client with the above config and override the default
732-
// watch expiry timeout.
733-
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
734733
client, err := xdsclient.New(xdsClientConfig)
735734
if err != nil {
736735
t.Fatalf("Failed to create xDS client: %v", err)
@@ -777,11 +776,11 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
777776
Node: clients.Node{ID: nodeID},
778777
TransportBuilder: grpctransport.NewBuilder(configs),
779778
ResourceTypes: resourceTypes,
779+
// Override the default watch expiry timeout.
780+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
780781
}
781782

782-
// Create an xDS client with the above config and override the default
783-
// watch expiry timeout.
784-
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
783+
// Create an xDS client with the above config.
785784
client, err := xdsclient.New(xdsClientConfig)
786785
if err != nil {
787786
t.Fatalf("Failed to create xDS client: %v", err)

internal/xds/clients/xdsclient/xdsclient.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ func New(config Config) (*XDSClient, error) {
108108
case config.Authorities == nil && config.Servers == nil:
109109
return nil, errors.New("xdsclient: no servers or authorities specified")
110110
}
111-
111+
if config.WatchExpiryTimeout == 0 {
112+
config.WatchExpiryTimeout = defaultWatchExpiryTimeout
113+
}
112114
client, err := newClient(&config, name)
113115
if err != nil {
114116
return nil, err
@@ -141,7 +143,7 @@ func newClient(config *Config, target string) (*XDSClient, error) {
141143
done: syncutil.NewEvent(),
142144
authorities: make(map[string]*authority),
143145
config: config,
144-
watchExpiryTimeout: xdsclientinternal.WatchExpiryTimeout,
146+
watchExpiryTimeout: config.WatchExpiryTimeout,
145147
backoff: xdsclientinternal.StreamBackoff,
146148
serializer: syncutil.NewCallbackSerializer(ctx),
147149
serializerClose: cancel,

internal/xds/clients/xdsclient/xdsconfig.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package xdsclient
2020

2121
import (
22+
"time"
23+
2224
"google.golang.org/grpc/internal/xds/clients"
2325
)
2426

@@ -60,6 +62,13 @@ type Config struct {
6062
// MetricsReporter is used to report registered metrics. If unset, no
6163
// metrics will be reported.
6264
MetricsReporter clients.MetricsReporter
65+
66+
// WatchExpiryTimeout is the duration after which a resource watch expires
67+
// if the requested resource is not received from the management server.
68+
// Most users will not need to set this.If zero, a default value of 15
69+
// seconds is used as specified here :
70+
// envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#knowing-when-a-requested-resource-does-not-exist
71+
WatchExpiryTimeout time.Duration
6372
}
6473

6574
// ServerConfig contains configuration for an xDS management server.

internal/xds/resolver/xds_resolver_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ import (
4040
"google.golang.org/grpc/internal/testutils/xds/e2e"
4141
"google.golang.org/grpc/internal/xds/balancer/clustermanager"
4242
"google.golang.org/grpc/internal/xds/bootstrap"
43-
ixdsclient "google.golang.org/grpc/internal/xds/clients/xdsclient"
4443
"google.golang.org/grpc/internal/xds/httpfilter"
4544
rinternal "google.golang.org/grpc/internal/xds/resolver/internal"
4645
"google.golang.org/grpc/internal/xds/xdsclient"
@@ -268,9 +267,10 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
268267
if err != nil {
269268
t.Fatalf("Failed to create an xDS client pool: %v", err)
270269
}
271-
revertWatchExpiryTimeout := ixdsclient.SetWatchExpiryTimeoutForTesting(defaultTestTimeout)
272-
defer revertWatchExpiryTimeout()
273-
c, cancel, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: t.Name()})
270+
c, cancel, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{
271+
Name: t.Name(),
272+
WatchExpiryTimeout: defaultTestTimeout,
273+
})
274274
return c, sync.OnceFunc(func() {
275275
close(closeCh)
276276
cancel()

internal/xds/xdsclient/clientimpl.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package xdsclient
2121
import (
2222
"fmt"
2323
"sync/atomic"
24+
"time"
2425

2526
"google.golang.org/grpc"
2627
estats "google.golang.org/grpc/experimental/stats"
@@ -41,6 +42,8 @@ const (
4142
// client from xDS-enabled gRPC servers. This is a well-known dedicated key
4243
// value, and is defined in gRFC A71.
4344
NameForServer = "#server"
45+
46+
defaultWatchExpiryTimeout = 15 * time.Second
4447
)
4548

4649
var (
@@ -117,8 +120,9 @@ func (mr *metricsReporter) ReportMetric(metric any) {
117120
}
118121
}
119122

120-
func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string) (*clientImpl, error) {
123+
func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string, watchExpiryTimeout time.Duration) (*clientImpl, error) {
121124
gConfig, err := buildXDSClientConfig(config, metricsRecorder, target)
125+
gConfig.WatchExpiryTimeout = watchExpiryTimeout
122126
if err != nil {
123127
return nil, err
124128
}

internal/xds/xdsclient/metrics_test.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"google.golang.org/grpc/internal/testutils/stats"
3131
"google.golang.org/grpc/internal/testutils/xds/e2e"
3232
"google.golang.org/grpc/internal/xds/bootstrap"
33-
ixdsclient "google.golang.org/grpc/internal/xds/clients/xdsclient"
3433
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
3534

3635
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
@@ -96,12 +95,11 @@ func (s) TestResourceUpdateMetrics(t *testing.T) {
9695
if err != nil {
9796
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
9897
}
99-
revertWatchExpiryTimeout := ixdsclient.SetWatchExpiryTimeoutForTesting(defaultTestWatchExpiryTimeout)
100-
defer revertWatchExpiryTimeout()
10198
pool := NewPool(config)
10299
client, close, err := pool.NewClientForTesting(OptionsForTesting{
103-
Name: t.Name(),
104-
MetricsRecorder: tmr,
100+
Name: t.Name(),
101+
MetricsRecorder: tmr,
102+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
105103
})
106104
if err != nil {
107105
t.Fatalf("Failed to create an xDS client: %v", err)
@@ -199,12 +197,11 @@ func (s) TestServerFailureMetrics_BeforeResponseRecv(t *testing.T) {
199197
if err != nil {
200198
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
201199
}
202-
revertWatchExpiryTimeout := ixdsclient.SetWatchExpiryTimeoutForTesting(defaultTestWatchExpiryTimeout)
203-
defer revertWatchExpiryTimeout()
204200
pool := NewPool(config)
205201
client, close, err := pool.NewClientForTesting(OptionsForTesting{
206-
Name: t.Name(),
207-
MetricsRecorder: tmr,
202+
Name: t.Name(),
203+
MetricsRecorder: tmr,
204+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
208205
})
209206
if err != nil {
210207
t.Fatalf("Failed to create an xDS client: %v", err)

0 commit comments

Comments
 (0)