Skip to content

Commit 7c9c2a1

Browse files
SOTW ADS client (#604)
Integration initial ADS client for SOTW protocol. Signed-off-by: Renuka Fernando <[email protected]>
1 parent 65d53e6 commit 7c9c2a1

File tree

2 files changed

+308
-0
lines changed

2 files changed

+308
-0
lines changed

pkg/client/sotw/v3/client.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Copyright 2022 Envoyproxy Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package sotw provides an implementation of GRPC SoTW (State of The World) part of XDS client
16+
package sotw
17+
18+
import (
19+
"context"
20+
"errors"
21+
"io"
22+
"sync"
23+
24+
"github.com/golang/protobuf/ptypes/any"
25+
status "google.golang.org/genproto/googleapis/rpc/status"
26+
"google.golang.org/grpc"
27+
"google.golang.org/grpc/codes"
28+
grpcStatus "google.golang.org/grpc/status"
29+
30+
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
31+
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
32+
)
33+
34+
var (
35+
ErrInit = errors.New("ads client: grpc connection is not initialized (use InitConnect() method to initialize connection)")
36+
ErrNilResp = errors.New("ads client: nil response from xds management server")
37+
)
38+
39+
// ADSClient is a SoTW and ADS based generic gRPC xDS client which can be used to
40+
// implement an xDS client which fetches resources from an xDS server and responds
41+
// the server back with ack or nack the resources.
42+
type ADSClient interface {
43+
// Initialize the gRPC connection with management server and send the initial Discovery Request.
44+
InitConnect(clientConn grpc.ClientConnInterface, opts ...grpc.CallOption) error
45+
// Fetch waits for a response from management server and returns response or error.
46+
Fetch() (*Response, error)
47+
// Ack acknowledge the validity of the last received response to management server.
48+
Ack() error
49+
// Nack acknowledge the invalidity of the last received response to management server.
50+
Nack(message string) error
51+
}
52+
53+
// Response wraps the latest Resources from the xDS server.
54+
// For the time being it only contains the Resources from server. This can be extended with
55+
// other response details. For example some metadata from DiscoveryResponse.
56+
type Response struct {
57+
Resources []*any.Any
58+
}
59+
60+
type adsClient struct {
61+
ctx context.Context
62+
mu sync.Mutex
63+
node *core.Node
64+
typeURL string
65+
66+
// streamClient is the ADS discovery client
67+
streamClient discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient
68+
// lastAckedResponse is the last response acked by the ADS client
69+
lastAckedResponse *discovery.DiscoveryResponse
70+
// lastReceivedResponse is the last response received from management server
71+
lastReceivedResponse *discovery.DiscoveryResponse
72+
}
73+
74+
// NewADSClient returns a new ADSClient
75+
func NewADSClient(ctx context.Context, node *core.Node, typeURL string) ADSClient {
76+
return &adsClient{
77+
ctx: ctx,
78+
node: node,
79+
typeURL: typeURL,
80+
}
81+
}
82+
83+
// Initialize the gRPC connection with management server and send the initial Discovery Request.
84+
func (c *adsClient) InitConnect(clientConn grpc.ClientConnInterface, opts ...grpc.CallOption) error {
85+
streamClient, err := discovery.NewAggregatedDiscoveryServiceClient(clientConn).StreamAggregatedResources(c.ctx, opts...)
86+
if err != nil {
87+
return err
88+
}
89+
c.streamClient = streamClient
90+
return c.Ack()
91+
}
92+
93+
// Fetch waits for a response from management server and returns response or error.
94+
func (c *adsClient) Fetch() (*Response, error) {
95+
c.mu.Lock()
96+
defer c.mu.Unlock()
97+
98+
if c.streamClient == nil {
99+
return nil, ErrInit
100+
}
101+
resp, err := c.streamClient.Recv()
102+
if err != nil {
103+
return nil, err
104+
}
105+
106+
if resp == nil {
107+
return nil, ErrNilResp
108+
}
109+
c.lastReceivedResponse = resp
110+
return &Response{
111+
Resources: resp.GetResources(),
112+
}, err
113+
}
114+
115+
// Ack acknowledge the validity of the last received response to management server.
116+
func (c *adsClient) Ack() error {
117+
c.mu.Lock()
118+
defer c.mu.Unlock()
119+
120+
c.lastAckedResponse = c.lastReceivedResponse
121+
return c.send(nil)
122+
}
123+
124+
// Nack acknowledge the invalidity of the last received response to management server.
125+
func (c *adsClient) Nack(message string) error {
126+
c.mu.Lock()
127+
defer c.mu.Unlock()
128+
129+
errorDetail := &status.Status{
130+
Message: message,
131+
}
132+
return c.send(errorDetail)
133+
}
134+
135+
// IsConnError checks the provided error is due to the gRPC connection
136+
// and returns true if it is due to the gRPC connection.
137+
//
138+
// In this case the gRPC connection with the server should be re initialized with the
139+
// ADSClient.InitConnect method.
140+
func IsConnError(err error) bool {
141+
if err == nil {
142+
return false
143+
}
144+
if errors.Is(err, io.EOF) {
145+
return true
146+
}
147+
errStatus, ok := grpcStatus.FromError(err)
148+
if !ok {
149+
return false
150+
}
151+
return errStatus.Code() == codes.Unavailable || errStatus.Code() == codes.Canceled
152+
}
153+
154+
func (c *adsClient) send(errorDetail *status.Status) error {
155+
if c.streamClient == nil {
156+
return ErrInit
157+
}
158+
159+
req := &discovery.DiscoveryRequest{
160+
Node: c.node,
161+
VersionInfo: c.lastAckedResponse.GetVersionInfo(),
162+
TypeUrl: c.typeURL,
163+
ResponseNonce: c.lastReceivedResponse.GetNonce(),
164+
ErrorDetail: errorDetail,
165+
}
166+
return c.streamClient.Send(req)
167+
}

pkg/client/sotw/v3/client_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package sotw_test
2+
3+
import (
4+
"context"
5+
"net"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"google.golang.org/grpc"
11+
"google.golang.org/grpc/credentials/insecure"
12+
"google.golang.org/protobuf/proto"
13+
"google.golang.org/protobuf/types/known/anypb"
14+
15+
clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
16+
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
17+
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
18+
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
19+
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
20+
client "github.com/envoyproxy/go-control-plane/pkg/client/sotw/v3"
21+
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
22+
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
23+
24+
"github.com/stretchr/testify/assert"
25+
)
26+
27+
func TestFetch(t *testing.T) {
28+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
29+
defer cancel()
30+
31+
snapCache := cache.NewSnapshotCache(true, cache.IDHash{}, nil)
32+
go func() {
33+
err := startAdsServer(ctx, snapCache)
34+
assert.NoError(t, err)
35+
}()
36+
37+
conn, err := grpc.Dial(":18001", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
38+
assert.NoError(t, err)
39+
defer conn.Close()
40+
41+
c := client.NewADSClient(ctx, &core.Node{Id: "node_1"}, resource.ClusterType)
42+
err = c.InitConnect(conn)
43+
assert.NoError(t, err)
44+
45+
t.Run("Test initial fetch", testInitialFetch(ctx, snapCache, c))
46+
t.Run("Test next fetch", testNextFetch(ctx, snapCache, c))
47+
}
48+
49+
func testInitialFetch(ctx context.Context, snapCache cache.SnapshotCache, c client.ADSClient) func(t *testing.T) {
50+
return func(t *testing.T) {
51+
wg := sync.WaitGroup{}
52+
wg.Add(1)
53+
54+
go func() {
55+
// watch for configs
56+
resp, err := c.Fetch()
57+
assert.NoError(t, err)
58+
assert.Equal(t, 3, len(resp.Resources))
59+
for _, r := range resp.Resources {
60+
cluster := &clusterv3.Cluster{}
61+
err := anypb.UnmarshalTo(r, cluster, proto.UnmarshalOptions{})
62+
assert.NoError(t, err)
63+
assert.Contains(t, []string{"cluster_1", "cluster_2", "cluster_3"}, cluster.Name)
64+
}
65+
66+
err = c.Ack()
67+
assert.NoError(t, err)
68+
wg.Done()
69+
}()
70+
71+
snapshot, err := cache.NewSnapshot("1", map[resource.Type][]types.Resource{
72+
resource.ClusterType: {
73+
&clusterv3.Cluster{Name: "cluster_1"},
74+
&clusterv3.Cluster{Name: "cluster_2"},
75+
&clusterv3.Cluster{Name: "cluster_3"},
76+
},
77+
})
78+
assert.NoError(t, err)
79+
80+
err = snapshot.Consistent()
81+
assert.NoError(t, err)
82+
err = snapCache.SetSnapshot(ctx, "node_1", snapshot)
83+
wg.Wait()
84+
assert.NoError(t, err)
85+
}
86+
}
87+
88+
func testNextFetch(ctx context.Context, snapCache cache.SnapshotCache, c client.ADSClient) func(t *testing.T) {
89+
return func(t *testing.T) {
90+
wg := sync.WaitGroup{}
91+
wg.Add(1)
92+
93+
go func() {
94+
// watch for configs
95+
resp, err := c.Fetch()
96+
assert.NoError(t, err)
97+
assert.Equal(t, 2, len(resp.Resources))
98+
for _, r := range resp.Resources {
99+
cluster := &clusterv3.Cluster{}
100+
err = anypb.UnmarshalTo(r, cluster, proto.UnmarshalOptions{})
101+
assert.NoError(t, err)
102+
assert.Contains(t, []string{"cluster_2", "cluster_4"}, cluster.Name)
103+
}
104+
105+
err = c.Ack()
106+
assert.NoError(t, err)
107+
wg.Done()
108+
}()
109+
110+
snapshot, err := cache.NewSnapshot("2", map[resource.Type][]types.Resource{
111+
resource.ClusterType: {
112+
&clusterv3.Cluster{Name: "cluster_2"},
113+
&clusterv3.Cluster{Name: "cluster_4"},
114+
},
115+
})
116+
assert.NoError(t, err)
117+
118+
err = snapshot.Consistent()
119+
assert.NoError(t, err)
120+
err = snapCache.SetSnapshot(ctx, "node_1", snapshot)
121+
assert.NoError(t, err)
122+
wg.Wait()
123+
}
124+
}
125+
126+
func startAdsServer(ctx context.Context, snapCache cache.SnapshotCache) error {
127+
lis, err := net.Listen("tcp", "127.0.0.1:18001")
128+
if err != nil {
129+
return err
130+
}
131+
132+
grpcServer := grpc.NewServer()
133+
s := server.NewServer(ctx, snapCache, nil)
134+
discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, s)
135+
136+
if e := grpcServer.Serve(lis); e != nil {
137+
err = e
138+
}
139+
140+
return err
141+
}

0 commit comments

Comments
 (0)