-
Notifications
You must be signed in to change notification settings - Fork 2
FCP-1434: Explicit wildcard support for snapshot cache #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8e2d459
9424db5
f8df9e1
0710f23
5fbb1c0
deed9fc
96101d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,12 @@ type ResourceSnapshot interface { | |
| // GetVersionMap returns a map of resource name to resource version for | ||
| // all the resources of type indicated by typeURL. | ||
| GetVersionMap(typeURL string) map[string]string | ||
|
|
||
| // GetWildcardResources returns only the resources that should be sent for wildcard requests. | ||
| GetWildcardResources(typeURL string) map[string]types.Resource | ||
|
|
||
| // GetWildcardResourcesAndTTL returns only the resources with TTL that should be sent for wildcard requests. | ||
| GetWildcardResourcesAndTTL(typeURL string) map[string]types.ResourceWithTTL | ||
| } | ||
|
|
||
| // SnapshotCache is a snapshot-based cache that maintains a single versioned | ||
|
|
@@ -147,6 +153,53 @@ func newSnapshotCache(ads bool, hash NodeHash, logger log.Logger) *snapshotCache | |
| return cache | ||
| } | ||
|
|
||
| func getResourcesForSubscription(snapshot ResourceSnapshot, typeURL string, subscribedResources map[string]struct{}, isWildcard bool) map[string]types.Resource { | ||
| resourcesWithTTL := getResourcesAndTTLForSubscription(snapshot, typeURL, subscribedResources, isWildcard) | ||
| if resourcesWithTTL == nil { | ||
| return nil | ||
| } | ||
|
|
||
| withoutTTL := make(map[string]types.Resource, len(resourcesWithTTL)) | ||
| for k, v := range resourcesWithTTL { | ||
| withoutTTL[k] = v.Resource | ||
| } | ||
|
|
||
| return withoutTTL | ||
| } | ||
|
|
||
| // Returns resources with TTL for a given subscription. | ||
| // This is needed because Envoy can have both a wildcard subscription AND explicit subscriptions | ||
| // to on-demand resources in the same stream. | ||
| func getResourcesAndTTLForSubscription(snapshot ResourceSnapshot, typeURL string, subscribedResources map[string]struct{}, isWildcard bool) map[string]types.ResourceWithTTL { | ||
| var resources map[string]types.ResourceWithTTL | ||
|
|
||
| if isWildcard { | ||
| resources = snapshot.GetWildcardResourcesAndTTL(typeURL) | ||
| if resources == nil { | ||
| resources = make(map[string]types.ResourceWithTTL) | ||
| } | ||
| } else { | ||
| resources = make(map[string]types.ResourceWithTTL) | ||
| } | ||
|
|
||
| // Handle the case where a wildcard subscription also has explicit on-demand resources | ||
| if len(subscribedResources) > 0 { | ||
| allResources := snapshot.GetResourcesAndTTL(typeURL) | ||
| for name := range subscribedResources { | ||
| if resource, exists := allResources[name]; exists { | ||
| resources[name] = resource | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // For non-wildcard subscriptions, if no resources matched, return nil to indicate no response should be sent | ||
| if !isWildcard && len(resources) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| return resources | ||
| } | ||
|
|
||
| // NewSnapshotCacheWithHeartbeating initializes a simple cache that sends periodic heartbeat | ||
| // responses for resources with a TTL. | ||
| // | ||
|
|
@@ -195,7 +248,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { | |
| for id, watch := range info.watches { | ||
| // Respond with the current version regardless of whether the version has changed. | ||
| version := snapshot.GetVersion(watch.Request.GetTypeUrl()) | ||
| resources := snapshot.GetResourcesAndTTL(watch.Request.GetTypeUrl()) | ||
| resources := getResourcesAndTTLForSubscription(snapshot, watch.Request.GetTypeUrl(), watch.subscription.SubscribedResources(), watch.subscription.IsWildcard()) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: why not just pass in the watch or subscription directly? |
||
|
|
||
| // TODO(snowp): Construct this once per type instead of once per watch. | ||
| resourcesWithTTL := map[string]types.ResourceWithTTL{} | ||
|
|
@@ -253,7 +306,7 @@ func (cache *snapshotCache) respondSOTWWatches(ctx context.Context, info *status | |
| version := snapshot.GetVersion(watch.Request.GetTypeUrl()) | ||
| if version != watch.Request.GetVersionInfo() { | ||
| cache.log.Debugf("respond open watch %d %s %v with new version %q", id, watch.Request.GetTypeUrl(), watch.Request.GetResourceNames(), version) | ||
| resources := snapshot.GetResourcesAndTTL(watch.Request.GetTypeUrl()) | ||
| resources := getResourcesAndTTLForSubscription(snapshot, watch.Request.GetTypeUrl(), watch.subscription.SubscribedResources(), watch.subscription.IsWildcard()) | ||
| err := cache.respond(ctx, watch, resources, version, false) | ||
| if err != nil { | ||
| return err | ||
|
|
@@ -421,7 +474,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, sub Subscription, valu | |
| } | ||
|
|
||
| version := snapshot.GetVersion(request.GetTypeUrl()) | ||
| resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl()) | ||
| resources := getResourcesAndTTLForSubscription(snapshot, request.GetTypeUrl(), sub.SubscribedResources(), sub.IsWildcard()) | ||
|
|
||
| if request.GetVersionInfo() == version { | ||
| // Retrieve whether there are resources in the cache requested and currently unknown to the client. | ||
|
|
@@ -439,9 +492,9 @@ func (cache *snapshotCache) CreateWatch(request *Request, sub Subscription, valu | |
| } | ||
| } | ||
| } else { | ||
| // Check if a resource present in the snapshot is currently not returned, | ||
| // Check if a wildcard-eligible resource present in the snapshot is currently not returned, | ||
| // for instance if the subscription is newly wildcard. | ||
| for r := range snapshot.GetResources(request.GetTypeUrl()) { | ||
| for r := range snapshot.GetWildcardResources(request.GetTypeUrl()) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This must also assess non-wildcard resources as above. |
||
| if _, ok := knownResourceNames[r]; !ok { | ||
| shouldRespond = true | ||
| break | ||
|
|
@@ -485,6 +538,10 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { | |
| // Respond to a watch with the snapshot value. The value channel should have capacity not to block. | ||
| // TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46 | ||
| func (cache *snapshotCache) respond(ctx context.Context, watch ResponseWatch, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) error { | ||
| if resources == nil { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should not exit on initial query for wildcard, even if nothing is to be returned |
||
| return nil | ||
| } | ||
|
|
||
| request := watch.Request | ||
| // for ADS, the request names must match the snapshot names | ||
| // if they do not, then the watch is never responded, and it is expected that envoy makes another request | ||
|
|
@@ -595,7 +652,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, sub Subscrip | |
| // Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. | ||
| func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, sub Subscription) (*RawDeltaResponse, error) { | ||
| resp := createDeltaResponse(ctx, request, sub, resourceContainer{ | ||
| resourceMap: snapshot.GetResources(request.GetTypeUrl()), | ||
| resourceMap: getResourcesForSubscription(snapshot, request.GetTypeUrl(), sub.SubscribedResources(), sub.IsWildcard()), | ||
| versionMap: snapshot.GetVersionMap(request.GetTypeUrl()), | ||
| }, snapshot.GetVersion(request.GetTypeUrl())) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -542,6 +542,16 @@ func (s *singleResourceSnapshot) GetVersionMap(typeURL string) map[string]string | |
| } | ||
| } | ||
|
|
||
| func (s *singleResourceSnapshot) GetWildcardResources(typeURL string) map[string]types.Resource { | ||
| // For this simple mock, treat all resources as wildcard (backward compatible behavior) | ||
| return s.GetResources(typeURL) | ||
| } | ||
|
|
||
| func (s *singleResourceSnapshot) GetWildcardResourcesAndTTL(typeURL string) map[string]types.ResourceWithTTL { | ||
| // For this simple mock, treat all resources as wildcard (backward compatible behavior) | ||
| return s.GetResourcesAndTTL(typeURL) | ||
| } | ||
|
|
||
| // TestSnapshotSingleResourceFetch is a basic test to verify that simple | ||
| // cache functions work with a type that is not `Snapshot`. | ||
| func TestSnapshotSingleResourceFetch(t *testing.T) { | ||
|
|
@@ -598,10 +608,10 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) { | |
| // Create watch. | ||
| req := &cache.Request{ | ||
| Node: &core.Node{Id: "test"}, | ||
| ResourceNames: []string{"rtds"}, | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed test bug: resource name in request didn't match snapshot ("rtds" vs "one-second"). |
||
| ResourceNames: []string{"one-second"}, | ||
| TypeUrl: rsrc.RuntimeType, | ||
| } | ||
| ss := stream.NewSotwSubscription([]string{"rtds"}, true) | ||
| ss := stream.NewSotwSubscription([]string{"one-second"}, true) | ||
| ss.SetReturnedResources(map[string]string{"cluster": "abcdef"}) | ||
| responder := make(chan cache.Response) | ||
| _, err := c.CreateWatch(req, ss, responder) | ||
|
|
@@ -622,3 +632,74 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) { | |
|
|
||
| <-responder | ||
| } | ||
|
|
||
| func TestSnapshotDeltaWithWildcardAndExplicitSubscriptions(t *testing.T) { | ||
| c := cache.NewSnapshotCache(false, cache.IDHash{}, log.NewTestLogger(t)) | ||
|
|
||
| cluster1 := resource.MakeCluster(resource.Ads, "cluster1") | ||
| cluster2 := resource.MakeCluster(resource.Ads, "cluster2") | ||
| cluster3 := resource.MakeCluster(resource.Ads, "cluster3") | ||
| cluster4 := resource.MakeCluster(resource.Ads, "cluster4") | ||
|
|
||
| snapshot, err := cache.NewSnapshotWithExplicitWildcard("v1", map[rsrc.Type][]cache.SnapshotResource{ | ||
| rsrc.ClusterType: { | ||
| {Resource: types.ResourceWithTTL{Resource: cluster1}, Wildcard: true}, | ||
| {Resource: types.ResourceWithTTL{Resource: cluster2}, Wildcard: false}, | ||
| {Resource: types.ResourceWithTTL{Resource: cluster3}, Wildcard: true}, | ||
| {Resource: types.ResourceWithTTL{Resource: cluster4}, Wildcard: false}, | ||
| }, | ||
| }) | ||
| require.NoError(t, err) | ||
| require.NoError(t, c.SetSnapshot(context.Background(), "node1", snapshot)) | ||
|
|
||
| // Initial discovery request with wildcard subscription | ||
| value := make(chan cache.DeltaResponse, 1) | ||
| req := &cache.DeltaRequest{ | ||
| TypeUrl: rsrc.ClusterType, | ||
| Node: &core.Node{Id: "node1"}, | ||
| } | ||
|
|
||
| sub := stream.NewDeltaSubscription([]string{"*"}, nil, nil, false) | ||
|
|
||
| _, err = c.CreateDeltaWatch(req, sub, value) | ||
| require.NoError(t, err) | ||
|
|
||
| select { | ||
| case resp := <-value: | ||
| resources := resp.GetReturnedResources() | ||
| assert.Len(t, resources, 2, "initial request should return only wildcard-eligible clusters") | ||
| assert.Contains(t, resources, "cluster1") | ||
| assert.Contains(t, resources, "cluster3") | ||
| assert.NotContains(t, resources, "cluster2") | ||
| assert.NotContains(t, resources, "cluster4") | ||
|
|
||
| case <-time.After(time.Second): | ||
| t.Fatal("timeout waiting for initial wildcard response") | ||
| } | ||
|
|
||
| // Subscribe to the explicit cluster name (ODCDS scenario) | ||
| value2 := make(chan cache.DeltaResponse, 1) | ||
| req2 := &cache.DeltaRequest{ | ||
| TypeUrl: rsrc.ClusterType, | ||
| Node: &core.Node{Id: "node1"}, | ||
| ResourceNamesSubscribe: []string{"cluster2"}, | ||
| } | ||
|
|
||
| sub.UpdateResourceSubscriptions([]string{"cluster2"}, nil) | ||
|
|
||
| _, err = c.CreateDeltaWatch(req2, sub, value2) | ||
| require.NoError(t, err) | ||
|
|
||
| select { | ||
| case resp := <-value2: | ||
| resources := resp.GetReturnedResources() | ||
| assert.Len(t, resources, 3, "should return wildcard clusters + explicitly subscribed on-demand cluster") | ||
| assert.Contains(t, resources, "cluster1") | ||
| assert.Contains(t, resources, "cluster2") | ||
| assert.Contains(t, resources, "cluster3") | ||
| assert.NotContains(t, resources, "cluster4") // Not wildcard and not explicitly subscribed | ||
|
|
||
| case <-time.After(time.Second): | ||
| t.Fatal("timeout waiting for response with explicit subscription") | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,24 @@ type Snapshot struct { | |
| // instantiated by calling ConstructVersionMap(). | ||
| // VersionMap is only to be used with delta xDS. | ||
| VersionMap map[string]map[string]string | ||
|
|
||
| // wildcardMap tracks which resources should be returned for wildcard requests. | ||
| // If nil, all resources are treated as wildcard-eligible (backward compatibility). | ||
| // If not nil, only resources in this map are returned for wildcard requests. | ||
| wildcardMap map[resource.Type]map[string]struct{} | ||
| } | ||
|
|
||
| // SnapshotResource contains a resource with metadata about wildcard eligibility. | ||
| // The Wildcard field indicates whether this resource should be returned as part | ||
| // of wildcard requests. This is particularly useful for ODCDS (On-Demand CDS) | ||
| // where some clusters should only be sent when explicitly requested by name. | ||
| type SnapshotResource struct { | ||
| Resource types.ResourceWithTTL | ||
|
|
||
| // Wildcard indicates if this resource should be returned for wildcard requests. | ||
| // true = include in wildcard responses (proactively pushed) | ||
| // false = only send when explicitly requested by name (on-demand) | ||
| Wildcard bool | ||
| } | ||
|
|
||
| var _ ResourceSnapshot = &Snapshot{} | ||
|
|
@@ -54,6 +72,47 @@ func NewSnapshot(version string, resources map[resource.Type][]types.Resource) ( | |
| return &out, nil | ||
| } | ||
|
|
||
| // NewSnapshotWithExplicitWildcard creates a Snapshot where each resource | ||
| // explicitly specifies whether it should be returned as part of wildcard requests. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you specify that it is mainly for OdCDS currently? |
||
| func NewSnapshotWithExplicitWildcard( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: given the functional implication, I'm wondering if we should rename this "default" instead |
||
| version string, | ||
| resources map[resource.Type][]SnapshotResource, | ||
| ) (*Snapshot, error) { | ||
| out := Snapshot{ | ||
| wildcardMap: make(map[resource.Type]map[string]struct{}), | ||
| } | ||
|
|
||
| for typ, snapshotResources := range resources { | ||
| index := GetResponseType(typ) | ||
| if index == types.UnknownType { | ||
| return nil, errors.New("unknown resource type: " + typ) | ||
| } | ||
|
|
||
| // Extract resources and track wildcard ones | ||
| rawResources := make([]types.ResourceWithTTL, 0, len(snapshotResources)) | ||
| wildcardSet := make(map[string]struct{}) | ||
|
|
||
| for _, sr := range snapshotResources { | ||
| rawResources = append(rawResources, sr.Resource) | ||
|
|
||
| if sr.Wildcard { | ||
| resourceName := GetResourceName(sr.Resource.Resource) | ||
| wildcardSet[resourceName] = struct{}{} | ||
| } | ||
| } | ||
|
|
||
| out.Resources[index] = NewResourcesWithTTL(version, rawResources) | ||
|
|
||
| // Only store wildcard map entry if there are resources of this type | ||
| // An empty set means no resources are wildcard for this type | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the functional interpretation of this? Is it that the type uses all resources for wildcard or none? |
||
| if len(snapshotResources) > 0 { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this check needed? If no resources exist it's fine to set an empty set |
||
| out.wildcardMap[typ] = wildcardSet | ||
| } | ||
| } | ||
|
|
||
| return &out, nil | ||
| } | ||
|
|
||
| // NewSnapshotWithTTLs creates a snapshot of ResourceWithTTLs. | ||
| // The resources map is keyed off the type URL of a resource, followed by the slice of resource objects. | ||
| func NewSnapshotWithTTLs(version string, resources map[resource.Type][]types.ResourceWithTTL) (*Snapshot, error) { | ||
|
|
@@ -204,3 +263,77 @@ func (s *Snapshot) ConstructVersionMap() error { | |
|
|
||
| return nil | ||
| } | ||
|
|
||
| // IsResourceWildcard checks if a specific resource should be returned for wildcard requests. | ||
| // If wildcardMap is nil, all resources are considered wildcard-eligible (backward compatibility). | ||
| // If wildcardMap is not nil, only resources explicitly marked as wildcard are eligible. | ||
| func (s *Snapshot) IsResourceWildcard(typeURL resource.Type, resourceName string) bool { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: maybe |
||
| if s == nil { | ||
| return false | ||
| } | ||
|
|
||
| // Nil wildcardMap means all resources are wildcard (backward compatibility) | ||
| if s.wildcardMap == nil { | ||
| return true | ||
| } | ||
|
|
||
| wildcardSet, ok := s.wildcardMap[typeURL] | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could be simplified to |
||
| if !ok { | ||
| // No entry for this type means no resources are wildcard | ||
| return false | ||
| } | ||
|
|
||
| _, isWildcard := wildcardSet[resourceName] | ||
| return isWildcard | ||
| } | ||
|
|
||
| // GetWildcardResources returns only the resources that should be sent for wildcard requests. | ||
| // If wildcardMap is nil, all resources are returned (backward compatibility). | ||
| // If wildcardMap is not nil, only resources marked as wildcard are returned. | ||
| func (s *Snapshot) GetWildcardResources(typeURL resource.Type) map[string]types.Resource { | ||
| resources := s.GetWildcardResourcesAndTTL(typeURL) | ||
| if resources == nil { | ||
| return nil | ||
| } | ||
|
|
||
| withoutTTL := make(map[string]types.Resource, len(resources)) | ||
|
|
||
| for k, v := range resources { | ||
| withoutTTL[k] = v.Resource | ||
| } | ||
|
|
||
| return withoutTTL | ||
| } | ||
|
|
||
| // GetWildcardResourcesAndTTL returns only the resources with TTL that should be sent for wildcard requests. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naming elsewhere is |
||
| // Similar to GetWildcardResources but includes TTL information. | ||
| func (s *Snapshot) GetWildcardResourcesAndTTL(typeURL resource.Type) map[string]types.ResourceWithTTL { | ||
| if s == nil { | ||
| return nil | ||
| } | ||
|
|
||
| allResources := s.GetResourcesAndTTL(typeURL) | ||
| if allResources == nil { | ||
| return nil | ||
| } | ||
|
|
||
| // Nil wildcardMap means all resources are wildcard (backward compatibility) | ||
| if s.wildcardMap == nil { | ||
| return allResources | ||
| } | ||
|
|
||
| wildcardSet, ok := s.wildcardMap[typeURL] | ||
| if !ok { | ||
| // No entry for this type means no resources are wildcard | ||
| return make(map[string]types.ResourceWithTTL) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: could just return nil here |
||
| } | ||
|
|
||
| // Filter to only wildcard resources | ||
| filtered := make(map[string]types.ResourceWithTTL, len(wildcardSet)) | ||
| for name := range wildcardSet { | ||
| if resource, exists := allResources[name]; exists { | ||
| filtered[name] = resource | ||
| } | ||
| } | ||
| return filtered | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: could reserve to the length of subscribed resources