Skip to content

Commit a36df79

Browse files
Rework Cache interface to isolate it from streamState and make it more uniform between sotw and delta
Signed-off-by: Valerian Roche <[email protected]>
1 parent cde5ba5 commit a36df79

File tree

15 files changed

+222
-229
lines changed

15 files changed

+222
-229
lines changed

pkg/cache/v3/cache.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"google.golang.org/protobuf/types/known/durationpb"
2727

2828
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
29-
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
3029

3130
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
3231
)
@@ -37,6 +36,27 @@ type Request = discovery.DiscoveryRequest
3736
// DeltaRequest is an alias for the delta discovery request type.
3837
type DeltaRequest = discovery.DeltaDiscoveryRequest
3938

39+
// ClientState provides additional data on the client knowledge for the type matching the request
40+
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources)
41+
// Though the methods may return mutable parts of the state for performance reasons,
42+
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation
43+
type ClientState interface {
44+
// GetKnownResources returns the list of resources the clients has ACKed and their associated version.
45+
// The versions are:
46+
// - delta protocol: version of the specific resource set in the response
47+
// - sotw protocol: version of the global response when the resource was last ACKed
48+
GetKnownResources() map[string]string
49+
50+
// GetSubscribedResources returns the list of resources currently subscribed to by the client for the type.
51+
// For delta it keeps track across requests
52+
// For sotw it is a normalized view of the request resources
53+
GetSubscribedResources() map[string]struct{}
54+
55+
// IsWildcard returns whether the client has a wildcard watch.
56+
// This considers subtilities related to the current migration of wildcard definition within the protocol.
57+
IsWildcard() bool
58+
}
59+
4060
// ConfigWatcher requests watches for configuration resources by a node, last
4161
// applied version identifier, and resource names hint. The watch should send
4262
// the responses when they are ready. The watch can be canceled by the
@@ -50,7 +70,7 @@ type ConfigWatcher interface {
5070
//
5171
// Cancel is an optional function to release resources in the producer. If
5272
// provided, the consumer may call this function multiple times.
53-
CreateWatch(*Request, stream.StreamState, chan Response) (cancel func())
73+
CreateWatch(*Request, ClientState, chan Response) (cancel func())
5474

5575
// CreateDeltaWatch returns a new open incremental xDS watch.
5676
//
@@ -59,7 +79,7 @@ type ConfigWatcher interface {
5979
//
6080
// Cancel is an optional function to release resources in the producer. If
6181
// provided, the consumer may call this function multiple times.
62-
CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func())
82+
CreateDeltaWatch(*DeltaRequest, ClientState, chan DeltaResponse) (cancel func())
6383
}
6484

6585
// ConfigFetcher fetches configuration resources from cache

pkg/cache/v3/delta.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"context"
1919

2020
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
21-
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
2221
)
2322

2423
// groups together resource-related arguments for the createDeltaResponse function
@@ -28,7 +27,7 @@ type resourceContainer struct {
2827
systemVersion string
2928
}
3029

31-
func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse {
30+
func createDeltaResponse(ctx context.Context, req *DeltaRequest, state ClientState, resources resourceContainer) *RawDeltaResponse {
3231
// variables to build our response with
3332
var nextVersionMap map[string]string
3433
var filtered []types.Resource
@@ -37,7 +36,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
3736
// If we are handling a wildcard request, we want to respond with all resources
3837
switch {
3938
case state.IsWildcard():
40-
if len(state.GetResourceVersions()) == 0 {
39+
if len(state.GetKnownResources()) == 0 {
4140
filtered = make([]types.Resource, 0, len(resources.resourceMap))
4241
}
4342
nextVersionMap = make(map[string]string, len(resources.resourceMap))
@@ -46,25 +45,25 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
4645
// we can just set it here to be used for comparison later
4746
version := resources.versionMap[name]
4847
nextVersionMap[name] = version
49-
prevVersion, found := state.GetResourceVersions()[name]
48+
prevVersion, found := state.GetKnownResources()[name]
5049
if !found || (prevVersion != version) {
5150
filtered = append(filtered, r)
5251
}
5352
}
5453

5554
// Compute resources for removal
5655
// The resource version can be set to "" here to trigger a removal even if never returned before
57-
for name := range state.GetResourceVersions() {
56+
for name := range state.GetKnownResources() {
5857
if _, ok := resources.resourceMap[name]; !ok {
5958
toRemove = append(toRemove, name)
6059
}
6160
}
6261
default:
63-
nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames()))
62+
nextVersionMap = make(map[string]string, len(state.GetSubscribedResources()))
6463
// state.GetResourceVersions() may include resources no longer subscribed
6564
// In the current code this gets silently cleaned when updating the version map
66-
for name := range state.GetSubscribedResourceNames() {
67-
prevVersion, found := state.GetResourceVersions()[name]
65+
for name := range state.GetSubscribedResources() {
66+
prevVersion, found := state.GetKnownResources()[name]
6867
if r, ok := resources.resourceMap[name]; ok {
6968
nextVersion := resources.versionMap[name]
7069
if prevVersion != nextVersion {

pkg/cache/v3/delta_test.go

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package cache_test
33
import (
44
"context"
55
"fmt"
6-
"reflect"
76
"testing"
87
"time"
98

109
"github.com/google/go-cmp/cmp"
1110
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
1212
"google.golang.org/protobuf/testing/protocmp"
1313

1414
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
@@ -35,13 +35,14 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
3535
// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
3636
for _, typ := range testTypes {
3737
watches[typ] = make(chan cache.DeltaResponse, 1)
38+
state := stream.NewStreamState(true, nil)
3839
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
3940
Node: &core.Node{
4041
Id: "node",
4142
},
4243
TypeUrl: typ,
4344
ResourceNamesSubscribe: names[typ],
44-
}, stream.NewStreamState(true, nil), watches[typ])
45+
}, &state, watches[typ])
4546
}
4647

4748
if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
@@ -69,15 +70,15 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
6970
watches[typ] = make(chan cache.DeltaResponse, 1)
7071
state := stream.NewStreamState(false, versionMap[typ])
7172
for resource := range versionMap[typ] {
72-
state.GetSubscribedResourceNames()[resource] = struct{}{}
73+
state.GetSubscribedResources()[resource] = struct{}{}
7374
}
7475
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
7576
Node: &core.Node{
7677
Id: "node",
7778
},
7879
TypeUrl: typ,
7980
ResourceNamesSubscribe: names[typ],
80-
}, state, watches[typ])
81+
}, &state, watches[typ])
8182
}
8283

8384
if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
@@ -123,12 +124,10 @@ func TestDeltaRemoveResources(t *testing.T) {
123124
Id: "node",
124125
},
125126
TypeUrl: typ,
126-
}, *streams[typ], watches[typ])
127+
}, streams[typ], watches[typ])
127128
}
128129

129-
if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
130-
t.Fatal(err)
131-
}
130+
require.NoError(t, c.SetSnapshot(context.Background(), key, fixture.snapshot()))
132131

133132
for _, typ := range testTypes {
134133
t.Run(typ, func(t *testing.T) {
@@ -139,7 +138,7 @@ func TestDeltaRemoveResources(t *testing.T) {
139138
nextVersionMap := out.GetNextVersionMap()
140139
streams[typ].SetResourceVersions(nextVersionMap)
141140
case <-time.After(time.Second):
142-
t.Fatal("failed to receive a snapshot response")
141+
require.Fail(t, "failed to receive a snapshot response")
143142
}
144143
})
145144
}
@@ -152,20 +151,17 @@ func TestDeltaRemoveResources(t *testing.T) {
152151
Node: &core.Node{
153152
Id: "node",
154153
},
155-
TypeUrl: typ,
156-
}, *streams[typ], watches[typ])
154+
TypeUrl: typ,
155+
ResponseNonce: "nonce",
156+
}, streams[typ], watches[typ])
157157
}
158158

159-
if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
160-
t.Errorf("watches should be created for the latest version, saw %d watches expected %d", count, len(testTypes))
161-
}
159+
assert.Equal(t, len(testTypes), c.GetStatusInfo(key).GetNumDeltaWatches(), "watches should be created for the latest version")
162160

163161
// set a partially versioned snapshot with no endpoints
164162
snapshot2 := fixture.snapshot()
165163
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{})
166-
if err := c.SetSnapshot(context.Background(), key, snapshot2); err != nil {
167-
t.Fatal(err)
168-
}
164+
require.NoError(t, c.SetSnapshot(context.Background(), key, snapshot2))
169165

170166
// validate response for endpoints
171167
select {
@@ -176,11 +172,9 @@ func TestDeltaRemoveResources(t *testing.T) {
176172
nextVersionMap := out.GetNextVersionMap()
177173

178174
// make sure the version maps are different since we no longer are tracking any endpoint resources
179-
if reflect.DeepEqual(streams[testTypes[0]].GetResourceVersions(), nextVersionMap) {
180-
t.Fatalf("versionMap for the endpoint resource type did not change, received: %v, instead of an empty map", nextVersionMap)
181-
}
175+
require.Equal(t, nextVersionMap, streams[testTypes[0]].GetKnownResources(), "versionMap for the endpoint resource type did not change")
182176
case <-time.After(time.Second):
183-
t.Fatal("failed to receive snapshot response")
177+
assert.Fail(t, "failed to receive snapshot response")
184178
}
185179
}
186180

@@ -203,13 +197,14 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
203197
t.Fatalf("snapshot failed: %s", err)
204198
}
205199
} else {
200+
state := stream.NewStreamState(false, make(map[string]string))
206201
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
207202
Node: &core.Node{
208203
Id: id,
209204
},
210205
TypeUrl: rsrc.EndpointType,
211206
ResourceNamesSubscribe: []string{clusterName},
212-
}, stream.NewStreamState(false, make(map[string]string)), responses)
207+
}, &state, responses)
213208

214209
defer cancel()
215210
}
@@ -226,14 +221,14 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
226221
// Create a non-buffered channel that will block sends.
227222
watchCh := make(chan cache.DeltaResponse)
228223
state := stream.NewStreamState(false, nil)
229-
state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
224+
state.SetSubscribedResources(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
230225
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
231226
Node: &core.Node{
232227
Id: key,
233228
},
234229
TypeUrl: rsrc.EndpointType,
235230
ResourceNamesSubscribe: names[rsrc.EndpointType],
236-
}, state, watchCh)
231+
}, &state, watchCh)
237232

238233
// The first time we set the snapshot without consuming from the blocking channel, so this should time out.
239234
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
@@ -269,13 +264,14 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
269264
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
270265
for _, typ := range testTypes {
271266
responses := make(chan cache.DeltaResponse, 1)
267+
state := stream.NewStreamState(false, make(map[string]string))
272268
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
273269
Node: &core.Node{
274270
Id: key,
275271
},
276272
TypeUrl: typ,
277273
ResourceNamesSubscribe: names[typ],
278-
}, stream.NewStreamState(false, make(map[string]string)), responses)
274+
}, &state, responses)
279275

280276
// Cancel the watch
281277
cancel()

pkg/cache/v3/linear.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424

2525
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
2626
"github.com/envoyproxy/go-control-plane/pkg/log"
27-
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
2827
)
2928

3029
type watches = map[chan Response]struct{}
@@ -164,20 +163,20 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
164163
}
165164

166165
for id, watch := range cache.deltaWatches {
167-
if !watch.StreamState.WatchesResources(modified) {
166+
if !watch.WatchesResources(modified) {
168167
continue
169168
}
170169

171-
res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState)
170+
res := cache.respondDelta(watch.Request, watch.Response, watch.clientState)
172171
if res != nil {
173172
delete(cache.deltaWatches, id)
174173
}
175174
}
176175
}
177176
}
178177

179-
func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) *RawDeltaResponse {
180-
resp := createDeltaResponse(context.Background(), request, state, resourceContainer{
178+
func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, clientState ClientState) *RawDeltaResponse {
179+
resp := createDeltaResponse(context.Background(), request, clientState, resourceContainer{
181180
resourceMap: cache.resources,
182181
versionMap: cache.versionMap,
183182
systemVersion: cache.getVersion(),
@@ -187,7 +186,7 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe
187186
if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 {
188187
if cache.log != nil {
189188
cache.log.Debugf("[linear cache] node: %s, sending delta response with resources: %v removed resources %v wildcard: %t",
190-
request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard())
189+
request.GetNode().GetId(), resp.Resources, resp.RemovedResources, clientState.IsWildcard())
191190
}
192191
value <- resp
193192
return resp
@@ -298,7 +297,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource {
298297
return resources
299298
}
300299

301-
func (cache *LinearCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() {
300+
func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState, value chan Response) func() {
302301
if request.TypeUrl != cache.typeURL {
303302
value <- nil
304303
return nil
@@ -371,7 +370,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
371370
}
372371
}
373372

374-
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
373+
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState ClientState, value chan DeltaResponse) func() {
375374
cache.mu.Lock()
376375
defer cache.mu.Unlock()
377376

@@ -388,18 +387,18 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S
388387
cache.log.Errorf("failed to update version map: %v", err)
389388
}
390389
}
391-
response := cache.respondDelta(request, value, state)
390+
response := cache.respondDelta(request, value, clientState)
392391

393392
// if respondDelta returns nil this means that there is no change in any resource version
394393
// create a new watch accordingly
395394
if response == nil {
396395
watchID := cache.nextDeltaWatchID()
397396
if cache.log != nil {
398397
cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID,
399-
cache.typeURL, state.GetSubscribedResourceNames(), cache.getVersion())
398+
cache.typeURL, clientState.GetSubscribedResources(), cache.getVersion())
400399
}
401400

402-
cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state}
401+
cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, clientState: clientState}
403402

404403
return cache.cancelDeltaWatch(watchID)
405404
}

0 commit comments

Comments
 (0)