Skip to content

Commit 014e27a

Browse files
xdsclient: create LRSClient at time of initialisation (grpc#8483)
Fixes: grpc#8474 The race is in [ReportLoad](https://github.com/grpc/grpc-go/blob/9186ebd774370e3b3232d1b202914ff8fc2c56d6/xds/internal/xdsclient/clientimpl_loadreport.go#L35C2-L44C21) function of clientImpl. The implementation was recently changed as the part of [xds client migration](grpc@082a927). The [comment](https://github.com/grpc/grpc-go/blob/85240a5b02defe7b653ccba66866b4370c982b6a/xds/internal/xdsclient/clientimpl.go#L86C2-L87C16) says that `lrsclient.LRSClient` should be initialized only at creation time but that was not the case. It was being initialized at the time of calling `ReportLoad` function. RELEASE NOTES: - lrsclient: - Fix a race condition where the `LRSClient` was not initialized at creation time but it was being initialized at the time of calling the `ReportLoad` function. - Creating an `LRSClient` no longer requires a node ID.
1 parent bd47aef commit 014e27a

File tree

4 files changed

+134
-17
lines changed

4 files changed

+134
-17
lines changed

xds/internal/clients/lrsclient/lrsclient.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,7 @@ type LRSClient struct {
6161

6262
// New returns a new LRS Client configured with the provided config.
6363
func New(config Config) (*LRSClient, error) {
64-
switch {
65-
case config.Node.ID == "":
66-
return nil, errors.New("lrsclient: node ID in node is empty")
67-
case config.TransportBuilder == nil:
64+
if config.TransportBuilder == nil {
6865
return nil, errors.New("lrsclient: transport builder is nil")
6966
}
7067

xds/internal/xdsclient/clientimpl.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,21 @@ func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecor
129129
if err != nil {
130130
return nil, err
131131
}
132-
c := &clientImpl{XDSClient: client, xdsClientConfig: gConfig, bootstrapConfig: config, target: target, refCount: 1}
132+
lrsC, err := lrsclient.New(lrsclient.Config{
133+
Node: gConfig.Node,
134+
TransportBuilder: gConfig.TransportBuilder,
135+
})
136+
if err != nil {
137+
return nil, err
138+
}
139+
c := &clientImpl{
140+
XDSClient: client,
141+
xdsClientConfig: gConfig,
142+
bootstrapConfig: config,
143+
target: target,
144+
refCount: 1,
145+
lrsClient: lrsC,
146+
}
133147
c.logger = prefixLogger(c)
134148
return c, nil
135149
}

xds/internal/xdsclient/clientimpl_loadreport.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,6 @@ import (
3232
//
3333
// It returns a lrsclient.LoadStore for the user to report loads.
3434
func (c *clientImpl) ReportLoad(server *bootstrap.ServerConfig) (*lrsclient.LoadStore, func(context.Context)) {
35-
if c.lrsClient == nil {
36-
lrsC, err := lrsclient.New(lrsclient.Config{
37-
Node: c.xdsClientConfig.Node,
38-
TransportBuilder: c.xdsClientConfig.TransportBuilder,
39-
})
40-
if err != nil {
41-
c.logger.Warningf("Failed to create an lrs client to the management server to report load: %v", server, err)
42-
return nil, func(context.Context) {}
43-
}
44-
c.lrsClient = lrsC
45-
}
46-
4735
load, err := c.lrsClient.ReportLoad(clients.ServerIdentifier{
4836
ServerURI: server.ServerURI(),
4937
Extensions: grpctransport.ServerIdentifierExtension{

xds/internal/xdsclient/tests/loadreport_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,32 @@ import (
2323
"encoding/json"
2424
"fmt"
2525
"net"
26+
"sync"
2627
"testing"
2728

2829
"github.com/google/go-cmp/cmp"
2930
"github.com/google/go-cmp/cmp/cmpopts"
3031
"github.com/google/uuid"
3132
"google.golang.org/grpc"
3233
"google.golang.org/grpc/codes"
34+
"google.golang.org/grpc/credentials/insecure"
35+
"google.golang.org/grpc/internal"
36+
"google.golang.org/grpc/internal/stubserver"
3337
"google.golang.org/grpc/internal/testutils"
3438
"google.golang.org/grpc/internal/testutils/xds/e2e"
3539
"google.golang.org/grpc/internal/testutils/xds/fakeserver"
3640
"google.golang.org/grpc/internal/xds/bootstrap"
41+
"google.golang.org/grpc/resolver"
3742
"google.golang.org/grpc/status"
3843
"google.golang.org/grpc/xds/internal/clients"
44+
"google.golang.org/grpc/xds/internal/xdsclient"
3945
"google.golang.org/protobuf/testing/protocmp"
4046

4147
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
4248
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
4349
v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
50+
testgrpc "google.golang.org/grpc/interop/grpc_testing"
51+
testpb "google.golang.org/grpc/interop/grpc_testing"
4452
"google.golang.org/protobuf/types/known/durationpb"
4553
)
4654

@@ -437,3 +445,113 @@ func (s) TestReportLoad_StreamCreation(t *testing.T) {
437445
defer sCancel3()
438446
cancel3(sCtx3)
439447
}
448+
449+
// TestConcurrentReportLoad verifies that the client can safely handle concurrent
450+
// requests to initiate load reporting streams. It launches multiple goroutines
451+
// that all call client.ReportLoad simultaneously.
452+
func (s) TestConcurrentReportLoad(t *testing.T) {
453+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
454+
defer cancel()
455+
456+
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{SupportLoadReportingService: true})
457+
nodeID := uuid.New().String()
458+
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
459+
client := createXDSClient(t, bc)
460+
461+
serverConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address})
462+
if err != nil {
463+
t.Fatalf("Failed to create server config for testing: %v", err)
464+
}
465+
466+
// Call ReportLoad() concurrently from multiple go routines.
467+
var wg sync.WaitGroup
468+
const numGoroutines = 10
469+
wg.Add(numGoroutines)
470+
for range numGoroutines {
471+
go func() {
472+
defer wg.Done()
473+
_, cancelStore := client.ReportLoad(serverConfig)
474+
defer cancelStore(ctx)
475+
}()
476+
}
477+
wg.Wait()
478+
}
479+
480+
// TestConcurrentChannels verifies that we can create multiple gRPC channels
481+
// concurrently with a shared XDSClient, each of which will create a new LRS
482+
// stream without any race.
483+
func (s) TestConcurrentChannels(t *testing.T) {
484+
// TODO(emchandwani) : Unskip after https://github.com/grpc/grpc-go/pull/8526 gets merged.
485+
t.Skip()
486+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
487+
defer cancel()
488+
489+
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true, SupportLoadReportingService: true})
490+
491+
nodeID := uuid.New().String()
492+
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
493+
494+
if internal.NewXDSResolverWithPoolForTesting == nil {
495+
t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil")
496+
}
497+
498+
config, err := bootstrap.NewConfigFromContents(bc)
499+
if err != nil {
500+
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bc), err)
501+
}
502+
pool := xdsclient.NewPool(config)
503+
504+
resolverBuilder := internal.NewXDSResolverWithPoolForTesting.(func(*xdsclient.Pool) (resolver.Builder, error))
505+
xdsResolver, err := resolverBuilder(pool)
506+
if err != nil {
507+
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
508+
}
509+
510+
server := stubserver.StartTestService(t, nil)
511+
defer server.Stop()
512+
513+
// Configure the management server with resources that enable LRS.
514+
const serviceName = "my-service-e2e-lrs-test"
515+
resources := e2e.DefaultClientResources(e2e.ResourceParams{
516+
DialTarget: serviceName,
517+
NodeID: nodeID,
518+
Host: "localhost",
519+
Port: testutils.ParsePort(t, server.Address),
520+
SecLevel: e2e.SecurityLevelNone,
521+
})
522+
resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{
523+
ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{
524+
Self: &v3corepb.SelfConfigSource{},
525+
},
526+
}
527+
if err := mgmtServer.Update(ctx, resources); err != nil {
528+
t.Fatal(err)
529+
}
530+
531+
var wg sync.WaitGroup
532+
const (
533+
numGoroutines = 10
534+
numRPCs = 10
535+
)
536+
for range numGoroutines {
537+
wg.Add(1)
538+
go func() {
539+
defer wg.Done()
540+
for range numRPCs {
541+
cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver))
542+
if err != nil {
543+
t.Errorf("grpc.NewClient() failed: %v", err)
544+
return
545+
}
546+
defer cc.Close()
547+
548+
testClient := testgrpc.NewTestServiceClient(cc)
549+
if _, err := testClient.EmptyCall(ctx, &testpb.Empty{}); err != nil {
550+
t.Errorf("EmptyCall() failed: %v", err)
551+
return
552+
}
553+
}
554+
}()
555+
}
556+
wg.Wait()
557+
}

0 commit comments

Comments
 (0)