diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 210ef1a46..9927f7809 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -54,6 +54,12 @@ type ResourceSnapshot interface { // GetVersionMap returns a map of resource name to resource version for // all the resources of type indicated by typeURL. GetVersionMap(typeURL string) map[string]string + + // GetWildcardResources returns only the resources that should be sent for wildcard requests. + GetWildcardResources(typeURL string) map[string]types.Resource + + // GetWildcardResourcesAndTTL returns only the resources with TTL that should be sent for wildcard requests. + GetWildcardResourcesAndTTL(typeURL string) map[string]types.ResourceWithTTL } // SnapshotCache is a snapshot-based cache that maintains a single versioned @@ -147,6 +153,53 @@ func newSnapshotCache(ads bool, hash NodeHash, logger log.Logger) *snapshotCache return cache } +func getResourcesForSubscription(snapshot ResourceSnapshot, typeURL string, subscribedResources map[string]struct{}, isWildcard bool) map[string]types.Resource { + resourcesWithTTL := getResourcesAndTTLForSubscription(snapshot, typeURL, subscribedResources, isWildcard) + if resourcesWithTTL == nil { + return nil + } + + withoutTTL := make(map[string]types.Resource, len(resourcesWithTTL)) + for k, v := range resourcesWithTTL { + withoutTTL[k] = v.Resource + } + + return withoutTTL +} + +// Returns resources with TTL for a given subscription. +// This is needed because Envoy can have both a wildcard subscription AND explicit subscriptions +// to on-demand resources in the same stream. +func getResourcesAndTTLForSubscription(snapshot ResourceSnapshot, typeURL string, subscribedResources map[string]struct{}, isWildcard bool) map[string]types.ResourceWithTTL { + var resources map[string]types.ResourceWithTTL + + if isWildcard { + resources = snapshot.GetWildcardResourcesAndTTL(typeURL) + if resources == nil { + resources = make(map[string]types.ResourceWithTTL) + } + } else { + resources = make(map[string]types.ResourceWithTTL) + } + + // Handle the case where a wildcard subscription also has explicit on-demand resources + if len(subscribedResources) > 0 { + allResources := snapshot.GetResourcesAndTTL(typeURL) + for name := range subscribedResources { + if resource, exists := allResources[name]; exists { + resources[name] = resource + } + } + } + + // For non-wildcard subscriptions, if no resources matched, return nil to indicate no response should be sent + if !isWildcard && len(resources) == 0 { + return nil + } + + return resources +} + // NewSnapshotCacheWithHeartbeating initializes a simple cache that sends periodic heartbeat // responses for resources with a TTL. // @@ -195,7 +248,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { for id, watch := range info.watches { // Respond with the current version regardless of whether the version has changed. version := snapshot.GetVersion(watch.Request.GetTypeUrl()) - resources := snapshot.GetResourcesAndTTL(watch.Request.GetTypeUrl()) + resources := getResourcesAndTTLForSubscription(snapshot, watch.Request.GetTypeUrl(), watch.subscription.SubscribedResources(), watch.subscription.IsWildcard()) // TODO(snowp): Construct this once per type instead of once per watch. resourcesWithTTL := map[string]types.ResourceWithTTL{} @@ -253,7 +306,7 @@ func (cache *snapshotCache) respondSOTWWatches(ctx context.Context, info *status version := snapshot.GetVersion(watch.Request.GetTypeUrl()) if version != watch.Request.GetVersionInfo() { cache.log.Debugf("respond open watch %d %s %v with new version %q", id, watch.Request.GetTypeUrl(), watch.Request.GetResourceNames(), version) - resources := snapshot.GetResourcesAndTTL(watch.Request.GetTypeUrl()) + resources := getResourcesAndTTLForSubscription(snapshot, watch.Request.GetTypeUrl(), watch.subscription.SubscribedResources(), watch.subscription.IsWildcard()) err := cache.respond(ctx, watch, resources, version, false) if err != nil { return err @@ -421,7 +474,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, sub Subscription, valu } version := snapshot.GetVersion(request.GetTypeUrl()) - resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl()) + resources := getResourcesAndTTLForSubscription(snapshot, request.GetTypeUrl(), sub.SubscribedResources(), sub.IsWildcard()) if request.GetVersionInfo() == version { // Retrieve whether there are resources in the cache requested and currently unknown to the client. @@ -439,9 +492,9 @@ func (cache *snapshotCache) CreateWatch(request *Request, sub Subscription, valu } } } else { - // Check if a resource present in the snapshot is currently not returned, + // Check if a wildcard-eligible resource present in the snapshot is currently not returned, // for instance if the subscription is newly wildcard. - for r := range snapshot.GetResources(request.GetTypeUrl()) { + for r := range snapshot.GetWildcardResources(request.GetTypeUrl()) { if _, ok := knownResourceNames[r]; !ok { shouldRespond = true break @@ -485,6 +538,10 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { // Respond to a watch with the snapshot value. The value channel should have capacity not to block. // TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46 func (cache *snapshotCache) respond(ctx context.Context, watch ResponseWatch, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) error { + if resources == nil { + return nil + } + request := watch.Request // for ADS, the request names must match the snapshot names // if they do not, then the watch is never responded, and it is expected that envoy makes another request @@ -595,7 +652,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, sub Subscrip // Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, sub Subscription) (*RawDeltaResponse, error) { resp := createDeltaResponse(ctx, request, sub, resourceContainer{ - resourceMap: snapshot.GetResources(request.GetTypeUrl()), + resourceMap: getResourcesForSubscription(snapshot, request.GetTypeUrl(), sub.SubscribedResources(), sub.IsWildcard()), versionMap: snapshot.GetVersionMap(request.GetTypeUrl()), }, snapshot.GetVersion(request.GetTypeUrl())) diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index c39869fbb..6e0b62640 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -542,6 +542,16 @@ func (s *singleResourceSnapshot) GetVersionMap(typeURL string) map[string]string } } +func (s *singleResourceSnapshot) GetWildcardResources(typeURL string) map[string]types.Resource { + // For this simple mock, treat all resources as wildcard (backward compatible behavior) + return s.GetResources(typeURL) +} + +func (s *singleResourceSnapshot) GetWildcardResourcesAndTTL(typeURL string) map[string]types.ResourceWithTTL { + // For this simple mock, treat all resources as wildcard (backward compatible behavior) + return s.GetResourcesAndTTL(typeURL) +} + // TestSnapshotSingleResourceFetch is a basic test to verify that simple // cache functions work with a type that is not `Snapshot`. func TestSnapshotSingleResourceFetch(t *testing.T) { @@ -598,10 +608,10 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) { // Create watch. req := &cache.Request{ Node: &core.Node{Id: "test"}, - ResourceNames: []string{"rtds"}, + ResourceNames: []string{"one-second"}, TypeUrl: rsrc.RuntimeType, } - ss := stream.NewSotwSubscription([]string{"rtds"}, true) + ss := stream.NewSotwSubscription([]string{"one-second"}, true) ss.SetReturnedResources(map[string]string{"cluster": "abcdef"}) responder := make(chan cache.Response) _, err := c.CreateWatch(req, ss, responder) @@ -622,3 +632,74 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) { <-responder } + +func TestSnapshotDeltaWithWildcardAndExplicitSubscriptions(t *testing.T) { + c := cache.NewSnapshotCache(false, cache.IDHash{}, log.NewTestLogger(t)) + + cluster1 := resource.MakeCluster(resource.Ads, "cluster1") + cluster2 := resource.MakeCluster(resource.Ads, "cluster2") + cluster3 := resource.MakeCluster(resource.Ads, "cluster3") + cluster4 := resource.MakeCluster(resource.Ads, "cluster4") + + snapshot, err := cache.NewSnapshotWithExplicitWildcard("v1", map[rsrc.Type][]cache.SnapshotResource{ + rsrc.ClusterType: { + {Resource: types.ResourceWithTTL{Resource: cluster1}, Wildcard: true}, + {Resource: types.ResourceWithTTL{Resource: cluster2}, Wildcard: false}, + {Resource: types.ResourceWithTTL{Resource: cluster3}, Wildcard: true}, + {Resource: types.ResourceWithTTL{Resource: cluster4}, Wildcard: false}, + }, + }) + require.NoError(t, err) + require.NoError(t, c.SetSnapshot(context.Background(), "node1", snapshot)) + + // Initial discovery request with wildcard subscription + value := make(chan cache.DeltaResponse, 1) + req := &cache.DeltaRequest{ + TypeUrl: rsrc.ClusterType, + Node: &core.Node{Id: "node1"}, + } + + sub := stream.NewDeltaSubscription([]string{"*"}, nil, nil, false) + + _, err = c.CreateDeltaWatch(req, sub, value) + require.NoError(t, err) + + select { + case resp := <-value: + resources := resp.GetReturnedResources() + assert.Len(t, resources, 2, "initial request should return only wildcard-eligible clusters") + assert.Contains(t, resources, "cluster1") + assert.Contains(t, resources, "cluster3") + assert.NotContains(t, resources, "cluster2") + assert.NotContains(t, resources, "cluster4") + + case <-time.After(time.Second): + t.Fatal("timeout waiting for initial wildcard response") + } + + // Subscribe to the explicit cluster name (ODCDS scenario) + value2 := make(chan cache.DeltaResponse, 1) + req2 := &cache.DeltaRequest{ + TypeUrl: rsrc.ClusterType, + Node: &core.Node{Id: "node1"}, + ResourceNamesSubscribe: []string{"cluster2"}, + } + + sub.UpdateResourceSubscriptions([]string{"cluster2"}, nil) + + _, err = c.CreateDeltaWatch(req2, sub, value2) + require.NoError(t, err) + + select { + case resp := <-value2: + resources := resp.GetReturnedResources() + assert.Len(t, resources, 3, "should return wildcard clusters + explicitly subscribed on-demand cluster") + assert.Contains(t, resources, "cluster1") + assert.Contains(t, resources, "cluster2") + assert.Contains(t, resources, "cluster3") + assert.NotContains(t, resources, "cluster4") // Not wildcard and not explicitly subscribed + + case <-time.After(time.Second): + t.Fatal("timeout waiting for response with explicit subscription") + } +} diff --git a/pkg/cache/v3/snapshot.go b/pkg/cache/v3/snapshot.go index a80f61d40..0a80e3c02 100644 --- a/pkg/cache/v3/snapshot.go +++ b/pkg/cache/v3/snapshot.go @@ -33,6 +33,24 @@ type Snapshot struct { // instantiated by calling ConstructVersionMap(). // VersionMap is only to be used with delta xDS. VersionMap map[string]map[string]string + + // wildcardMap tracks which resources should be returned for wildcard requests. + // If nil, all resources are treated as wildcard-eligible (backward compatibility). + // If not nil, only resources in this map are returned for wildcard requests. + wildcardMap map[resource.Type]map[string]struct{} +} + +// SnapshotResource contains a resource with metadata about wildcard eligibility. +// The Wildcard field indicates whether this resource should be returned as part +// of wildcard requests. This is particularly useful for ODCDS (On-Demand CDS) +// where some clusters should only be sent when explicitly requested by name. +type SnapshotResource struct { + Resource types.ResourceWithTTL + + // Wildcard indicates if this resource should be returned for wildcard requests. + // true = include in wildcard responses (proactively pushed) + // false = only send when explicitly requested by name (on-demand) + Wildcard bool } var _ ResourceSnapshot = &Snapshot{} @@ -54,6 +72,47 @@ func NewSnapshot(version string, resources map[resource.Type][]types.Resource) ( return &out, nil } +// NewSnapshotWithExplicitWildcard creates a Snapshot where each resource +// explicitly specifies whether it should be returned as part of wildcard requests. +func NewSnapshotWithExplicitWildcard( + version string, + resources map[resource.Type][]SnapshotResource, +) (*Snapshot, error) { + out := Snapshot{ + wildcardMap: make(map[resource.Type]map[string]struct{}), + } + + for typ, snapshotResources := range resources { + index := GetResponseType(typ) + if index == types.UnknownType { + return nil, errors.New("unknown resource type: " + typ) + } + + // Extract resources and track wildcard ones + rawResources := make([]types.ResourceWithTTL, 0, len(snapshotResources)) + wildcardSet := make(map[string]struct{}) + + for _, sr := range snapshotResources { + rawResources = append(rawResources, sr.Resource) + + if sr.Wildcard { + resourceName := GetResourceName(sr.Resource.Resource) + wildcardSet[resourceName] = struct{}{} + } + } + + out.Resources[index] = NewResourcesWithTTL(version, rawResources) + + // Only store wildcard map entry if there are resources of this type + // An empty set means no resources are wildcard for this type + if len(snapshotResources) > 0 { + out.wildcardMap[typ] = wildcardSet + } + } + + return &out, nil +} + // NewSnapshotWithTTLs creates a snapshot of ResourceWithTTLs. // The resources map is keyed off the type URL of a resource, followed by the slice of resource objects. func NewSnapshotWithTTLs(version string, resources map[resource.Type][]types.ResourceWithTTL) (*Snapshot, error) { @@ -204,3 +263,77 @@ func (s *Snapshot) ConstructVersionMap() error { return nil } + +// IsResourceWildcard checks if a specific resource should be returned for wildcard requests. +// If wildcardMap is nil, all resources are considered wildcard-eligible (backward compatibility). +// If wildcardMap is not nil, only resources explicitly marked as wildcard are eligible. +func (s *Snapshot) IsResourceWildcard(typeURL resource.Type, resourceName string) bool { + if s == nil { + return false + } + + // Nil wildcardMap means all resources are wildcard (backward compatibility) + if s.wildcardMap == nil { + return true + } + + wildcardSet, ok := s.wildcardMap[typeURL] + if !ok { + // No entry for this type means no resources are wildcard + return false + } + + _, isWildcard := wildcardSet[resourceName] + return isWildcard +} + +// GetWildcardResources returns only the resources that should be sent for wildcard requests. +// If wildcardMap is nil, all resources are returned (backward compatibility). +// If wildcardMap is not nil, only resources marked as wildcard are returned. +func (s *Snapshot) GetWildcardResources(typeURL resource.Type) map[string]types.Resource { + resources := s.GetWildcardResourcesAndTTL(typeURL) + if resources == nil { + return nil + } + + withoutTTL := make(map[string]types.Resource, len(resources)) + + for k, v := range resources { + withoutTTL[k] = v.Resource + } + + return withoutTTL +} + +// GetWildcardResourcesAndTTL returns only the resources with TTL that should be sent for wildcard requests. +// Similar to GetWildcardResources but includes TTL information. +func (s *Snapshot) GetWildcardResourcesAndTTL(typeURL resource.Type) map[string]types.ResourceWithTTL { + if s == nil { + return nil + } + + allResources := s.GetResourcesAndTTL(typeURL) + if allResources == nil { + return nil + } + + // Nil wildcardMap means all resources are wildcard (backward compatibility) + if s.wildcardMap == nil { + return allResources + } + + wildcardSet, ok := s.wildcardMap[typeURL] + if !ok { + // No entry for this type means no resources are wildcard + return make(map[string]types.ResourceWithTTL) + } + + // Filter to only wildcard resources + filtered := make(map[string]types.ResourceWithTTL, len(wildcardSet)) + for name := range wildcardSet { + if resource, exists := allResources[name]; exists { + filtered[name] = resource + } + } + return filtered +} diff --git a/pkg/cache/v3/snapshot_test.go b/pkg/cache/v3/snapshot_test.go index c32f49207..7727bc3ed 100644 --- a/pkg/cache/v3/snapshot_test.go +++ b/pkg/cache/v3/snapshot_test.go @@ -191,3 +191,46 @@ func TestNewSnapshotBadType(t *testing.T) { require.Error(t, err) assert.Nil(t, snap) } + +func TestSnapshotWithExplicitWildcard(t *testing.T) { + cluster1 := resource.MakeCluster(resource.Ads, "cluster1") + cluster2 := resource.MakeCluster(resource.Ads, "cluster2") + cluster3 := resource.MakeCluster(resource.Ads, "cluster3") + + snapshot, err := cache.NewSnapshotWithExplicitWildcard("v1", map[rsrc.Type][]cache.SnapshotResource{ + rsrc.ClusterType: { + {Resource: types.ResourceWithTTL{Resource: cluster1}, Wildcard: true}, + {Resource: types.ResourceWithTTL{Resource: cluster2}, Wildcard: false}, + {Resource: types.ResourceWithTTL{Resource: cluster3}, Wildcard: true}, + }, + }) + require.NoError(t, err) + + allResources := snapshot.GetResources(rsrc.ClusterType) + assert.Len(t, allResources, 3) + + wildcardResources := snapshot.GetWildcardResources(rsrc.ClusterType) + assert.Len(t, wildcardResources, 2) + assert.Contains(t, wildcardResources, "cluster1") + assert.Contains(t, wildcardResources, "cluster3") + + assert.True(t, snapshot.IsResourceWildcard(rsrc.ClusterType, "cluster1")) + assert.False(t, snapshot.IsResourceWildcard(rsrc.ClusterType, "cluster2")) + assert.True(t, snapshot.IsResourceWildcard(rsrc.ClusterType, "cluster3")) +} + +func TestSnapshotWildcardBackwardCompatibility(t *testing.T) { + cluster1 := resource.MakeCluster(resource.Ads, "cluster1") + cluster2 := resource.MakeCluster(resource.Ads, "cluster2") + + snapshot, err := cache.NewSnapshot("v1", map[rsrc.Type][]types.Resource{ + rsrc.ClusterType: {cluster1, cluster2}, + }) + require.NoError(t, err) + + wildcardResources := snapshot.GetWildcardResources(rsrc.ClusterType) + assert.Len(t, wildcardResources, 2) + + assert.True(t, snapshot.IsResourceWildcard(rsrc.ClusterType, "cluster1")) + assert.True(t, snapshot.IsResourceWildcard(rsrc.ClusterType, "cluster2")) +}