From b3be747bf11283e1d329060b98b270b2bd9f7697 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Wed, 16 Jul 2025 20:56:05 +0000 Subject: [PATCH] feat: Introduce ManagedQueue and Contracts This commit introduces the foundational layer for the Flow Registry, which will act as the stateful control plane for managing queues and policies. It defines the `ManagedQueue` decorator, refines the system's core service contracts, and simplifies the queue plugin interface. The key changes are: 1. **`ports` -> `contracts` Rename**: The `ports` package is renamed to `contracts` to more accurately reflect the "Ports and Adapters" architectural pattern and avoid confusion with network ports. 2. **`ManagedQueue` Introduction**: A new `registry.managedQueue` component is introduced. It acts as a stateful decorator around a `framework.SafeQueue`, adding critical registry-level functionality: - **Atomic Statistics**: It maintains its own atomic `len` and `byteSize` counters. - **State Reconciliation**: It uses a callback to propagate statistical deltas to its parent (the future `RegistryShard`), ensuring aggregated stats remain consistent without locks. 3. **`SafeQueue` Interface Simplification**: The `framework.SafeQueue` interface is simplified. The `Add` and `Remove` methods no longer return the new queue state, as this responsibility is now correctly handled by the `ManagedQueue` decorator. This makes the `SafeQueue` contract cleaner and more focused on its core queuing logic. --- pkg/epp/flowcontrol/README.md | 12 +- pkg/epp/flowcontrol/contracts/registry.go | 45 ++ pkg/epp/flowcontrol/framework/errors.go | 2 +- pkg/epp/flowcontrol/framework/mocks/mocks.go | 81 +++ .../policies/intraflow/dispatch/README.md | 2 +- .../framework/plugins/queue/README.md | 35 +- .../framework/plugins/queue/benchmark_test.go | 79 +-- .../framework/plugins/queue/factory.go | 2 +- .../plugins/queue/functional_test.go | 70 ++- .../plugins/queue/listqueue/listqueue.go | 21 +- .../plugins/queue/maxminheap/maxminheap.go | 21 +- .../queue/maxminheap/maxminheap_test.go | 6 +- pkg/epp/flowcontrol/framework/policies.go | 4 +- pkg/epp/flowcontrol/framework/queue.go | 10 +- pkg/epp/flowcontrol/registry/managedqueue.go | 185 +++++++ .../flowcontrol/registry/managedqueue_test.go | 465 ++++++++++++++++++ pkg/epp/flowcontrol/types/flow.go | 5 +- pkg/epp/flowcontrol/types/request.go | 4 +- 18 files changed, 905 insertions(+), 144 deletions(-) create mode 100644 pkg/epp/flowcontrol/contracts/registry.go create mode 100644 pkg/epp/flowcontrol/registry/managedqueue.go create mode 100644 pkg/epp/flowcontrol/registry/managedqueue_test.go diff --git a/pkg/epp/flowcontrol/README.md b/pkg/epp/flowcontrol/README.md index 5db86f446..877ca4af5 100644 --- a/pkg/epp/flowcontrol/README.md +++ b/pkg/epp/flowcontrol/README.md @@ -59,7 +59,7 @@ graph LR subgraph Internal Interactions direction LR - D(Ports) -- "abstracts state" --> E(Flow Registry); + D(Contracts) -- "abstracts state" --> E(Flow Registry); D -- "abstracts load" --> SD(Saturation Detector); E -- "configures" --> F(Framework); F -- "defines" --> P(Plugins: Queues & Policies); @@ -107,10 +107,10 @@ their justifications, please refer to the detailed documentation within the rele concurrent-safe request storage. It uses a `QueueCapability` system that allows for diverse and extensible queue implementations (e.g., FIFO, Priority Heap) while maintaining a stable interface. -4. **The `FlowRegistry` (`./registry`, `./ports`)**: This is the stateful control plane of the system. It manages the - configuration and lifecycle of all flows, policies, and queues. It presents a sharded view of its state to the +4. **The `FlowRegistry` (`./registry`, `./contracts`)**: This is the stateful control plane of the system. It manages + the configuration and lifecycle of all flows, policies, and queues. It presents a sharded view of its state to the `FlowController` workers to enable parallel operation with minimal lock contention. -5. **Core Types and Service Ports (`./types`, `./ports`)**: These packages define the foundational data structures - (e.g., `FlowControlRequest`), errors, and service interfaces that decouple the engine from its dependencies, - following a "Ports and Adapters" architectural style. +5. **Core Types and Service Contracts (`./types`, `./contracts`)**: These packages define the foundational data + structures (e.g., `FlowControlRequest`), errors, and service interfaces that decouple the engine from its + dependencies, following a "Ports and Adapters" architectural style. diff --git a/pkg/epp/flowcontrol/contracts/registry.go b/pkg/epp/flowcontrol/contracts/registry.go new file mode 100644 index 000000000..dbc855bef --- /dev/null +++ b/pkg/epp/flowcontrol/contracts/registry.go @@ -0,0 +1,45 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package contracts defines the service interfaces that decouple the core `controller.FlowController` engine from its +// primary dependencies. In alignment with a "Ports and Adapters" (or "Hexagonal") architectural style, these +// interfaces represent the "ports" through which the engine communicates. +// +// This package contains the primary service contracts for the Flow Registry and Saturation Detector. +package contracts + +import ( + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" +) + +// ManagedQueue defines the interface for a flow's queue instance on a specific shard. +// It wraps an underlying `framework.SafeQueue`, augmenting it with lifecycle validation against the `FlowRegistry` and +// integrating atomic statistics updates. +// +// Conformance: +// - All methods (including those embedded from `framework.SafeQueue`) MUST be goroutine-safe. +// - Mutating methods (`Add()`, `Remove()`, `CleanupExpired()`, `Drain()`) MUST ensure the flow instance still exists +// and is valid within the `FlowRegistry` before proceeding. They MUST also atomically update relevant statistics +// (e.g., queue length, byte size) at both the queue and priority-band levels. +type ManagedQueue interface { + framework.SafeQueue + + // FlowQueueAccessor returns a read-only, flow-aware accessor for this queue. + // This accessor is primarily used by policy plugins to inspect the queue's state in a structured way. + // + // Conformance: This method MUST NOT return nil. + FlowQueueAccessor() framework.FlowQueueAccessor +} diff --git a/pkg/epp/flowcontrol/framework/errors.go b/pkg/epp/flowcontrol/framework/errors.go index 76ace3f43..fb19c4aab 100644 --- a/pkg/epp/flowcontrol/framework/errors.go +++ b/pkg/epp/flowcontrol/framework/errors.go @@ -23,7 +23,7 @@ import ( // `SafeQueue` Errors // // These errors relate to operations directly on a `SafeQueue` implementation. They are returned by `SafeQueue` methods -// and might be handled or wrapped by the `ports.FlowRegistry`'s `ports.ManagedQueue` or the +// and might be handled or wrapped by the `contracts.FlowRegistry`'s `contracts.ManagedQueue` or the // `controller.FlowController`. var ( // ErrNilQueueItem indicates that a nil `types.QueueItemAccessor` was passed to `SafeQueue.Add()`. diff --git a/pkg/epp/flowcontrol/framework/mocks/mocks.go b/pkg/epp/flowcontrol/framework/mocks/mocks.go index 3cf4f4f2b..141b73524 100644 --- a/pkg/epp/flowcontrol/framework/mocks/mocks.go +++ b/pkg/epp/flowcontrol/framework/mocks/mocks.go @@ -102,3 +102,84 @@ func (m *MockPriorityBandAccessor) IterateQueues(callback func(queue framework.F } var _ framework.PriorityBandAccessor = &MockPriorityBandAccessor{} + +// MockSafeQueue is a mock implementation of the `framework.SafeQueue` interface. +type MockSafeQueue struct { + NameV string + CapabilitiesV []framework.QueueCapability + LenV int + ByteSizeV uint64 + PeekHeadV types.QueueItemAccessor + PeekHeadErrV error + PeekTailV types.QueueItemAccessor + PeekTailErrV error + AddFunc func(item types.QueueItemAccessor) error + RemoveFunc func(handle types.QueueItemHandle) (types.QueueItemAccessor, error) + CleanupFunc func(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) + DrainFunc func() ([]types.QueueItemAccessor, error) +} + +func (m *MockSafeQueue) Name() string { return m.NameV } +func (m *MockSafeQueue) Capabilities() []framework.QueueCapability { return m.CapabilitiesV } +func (m *MockSafeQueue) Len() int { return m.LenV } +func (m *MockSafeQueue) ByteSize() uint64 { return m.ByteSizeV } + +func (m *MockSafeQueue) PeekHead() (types.QueueItemAccessor, error) { + return m.PeekHeadV, m.PeekHeadErrV +} + +func (m *MockSafeQueue) PeekTail() (types.QueueItemAccessor, error) { + return m.PeekTailV, m.PeekTailErrV +} + +func (m *MockSafeQueue) Add(item types.QueueItemAccessor) error { + if m.AddFunc != nil { + return m.AddFunc(item) + } + return nil +} + +func (m *MockSafeQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) { + if m.RemoveFunc != nil { + return m.RemoveFunc(handle) + } + return nil, nil +} + +func (m *MockSafeQueue) Cleanup(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) { + if m.CleanupFunc != nil { + return m.CleanupFunc(predicate) + } + return nil, nil +} + +func (m *MockSafeQueue) Drain() ([]types.QueueItemAccessor, error) { + if m.DrainFunc != nil { + return m.DrainFunc() + } + return nil, nil +} + +var _ framework.SafeQueue = &MockSafeQueue{} + +// MockIntraFlowDispatchPolicy is a mock implementation of the `framework.IntraFlowDispatchPolicy` interface. +type MockIntraFlowDispatchPolicy struct { + NameV string + SelectItemV types.QueueItemAccessor + SelectItemErrV error + ComparatorV framework.ItemComparator + RequiredQueueCapabilitiesV []framework.QueueCapability +} + +func (m *MockIntraFlowDispatchPolicy) Name() string { return m.NameV } +func (m *MockIntraFlowDispatchPolicy) Comparator() framework.ItemComparator { return m.ComparatorV } + +func (m *MockIntraFlowDispatchPolicy) SelectItem(queue framework.FlowQueueAccessor) (types.QueueItemAccessor, error) { + return m.SelectItemV, m.SelectItemErrV +} + +func (m *MockIntraFlowDispatchPolicy) RequiredQueueCapabilities() []framework.QueueCapability { + return m.RequiredQueueCapabilitiesV +} + +var _ framework.IntraFlowDispatchPolicy = &MockIntraFlowDispatchPolicy{} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md index 71cfb09d7..8aedcb716 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md @@ -29,7 +29,7 @@ Key responsibilities and characteristics of a `framework.IntraFlowDispatchPolicy 3. **Queue Compatibility (`RequiredQueueCapabilities`)**: The policy specifies the capabilities its associated [`framework.SafeQueue`](../../../queue.go) must support for it to function correctly. For example, a simple FCFS policy would require `framework.CapabilityFIFO`, while a more complex, priority-based policy would require - `framework.CapabilityPriorityConfigurable`. The `ports.FlowRegistry` uses this information to pair policies with + `framework.CapabilityPriorityConfigurable`. The `contracts.FlowRegistry` uses this information to pair policies with compatible queues. The `framework.IntraFlowDispatchPolicy` allows for fine-grained control over how individual requests within a single flow are diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/README.md b/pkg/epp/flowcontrol/framework/plugins/queue/README.md index 6b39221e5..ce0a9c951 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/README.md +++ b/pkg/epp/flowcontrol/framework/plugins/queue/README.md @@ -6,7 +6,7 @@ defines core, self-contained queue data structures used by the `controller.FlowC ## Overview The `controller.FlowController` manages requests by organizing them into queues. Each logical "flow" within a given -priority band has its own `ports.ManagedQueue` instance, which wraps a `framework.SafeQueue`. This design allows the +priority band has its own `contracts.ManagedQueue` instance, which wraps a `framework.SafeQueue`. This design allows the `controller.FlowController` to apply policies at both the inter-flow (across different flows) and intra-flow (within a single flow's queue) levels. @@ -16,12 +16,12 @@ design allows for: - **Different Queuing Disciplines**: A basic FIFO queue ([`listqueue`](./listqueue/)) is provided, but other disciplines like priority queues ([`maxminheap`](./maxminheap/)) can be used for more complex ordering requirements. - **Specialized Capabilities**: Policies can declare `RequiredQueueCapabilities()` (e.g., `framework.CapabilityFIFO` or - `framework.CapabilityPriorityConfigurable`). The `ports.FlowRegistry` pairs the policy with a queue that provides the - necessary capabilities. + `framework.CapabilityPriorityConfigurable`). The `contracts.FlowRegistry` pairs the policy with a queue that provides + the necessary capabilities. - **Performance Optimization**: Different queue implementations offer varying performance characteristics, which can be compared using the centralized benchmark suite to select the best fit for a given workload. -## Contributing a New `SafeQueue` Implementation +## Contributing a New `framework.SafeQueue` Implementation To contribute a new queue implementation, follow these steps: @@ -73,21 +73,21 @@ The suite includes the following scenarios: ### Latest Results -*Last Updated: 2025-07-10* +*Last Updated: Commit `35a9d6c`* *(CPU: AMD EPYC 7B12)* | Benchmark | Implementation | Iterations | ns/op | B/op | allocs/op | | --------------------------- | -------------- | ---------- | ------- | ----- | --------- | -| **AddRemove** | `ListQueue` | 1,889,844 | 609.0 | 224 | 5 | -| | `MaxMinHeap` | 1,660,987 | 696.7 | 184 | 4 | -| **AddPeekRemove** | `ListQueue` | 3,884,938 | 298.0 | 224 | 5 | -| | `MaxMinHeap` | 1,857,448 | 615.9 | 184 | 4 | -| **AddPeekTailRemove** | `ListQueue` | 3,576,487 | 308.4 | 224 | 5 | -| | `MaxMinHeap` | 2,113,134 | 535.3 | 184 | 4 | -| **BulkAddThenBulkRemove** | `ListQueue` | 24,032 | 49,861 | 24801 | 698 | -| | `MaxMinHeap` | 10,000 | 108,868 | 20787 | 597 | -| **HighContention** | `ListQueue` | 484,574 | 2,328 | 896 | 20 | -| | `MaxMinHeap` | 84,806 | 18,679 | 783 | 16 | +| **AddRemove** | `ListQueue` | 1,906,153 | 595.5 | 224 | 5 | +| | `MaxMinHeap` | 1,763,473 | 668.9 | 184 | 4 | +| **AddPeekRemove** | `ListQueue` | 3,547,653 | 298.5 | 224 | 5 | +| | `MaxMinHeap` | 1,986,780 | 751.5 | 184 | 4 | +| **AddPeekTailRemove** | `ListQueue` | 3,732,302 | 303.3 | 224 | 5 | +| | `MaxMinHeap` | 2,006,383 | 551.6 | 184 | 4 | +| **BulkAddThenBulkRemove** | `ListQueue` | 24,046 | 47,240 | 24800 | 698 | +| | `MaxMinHeap` | 9,410 | 110,929 | 20786 | 597 | +| **HighContention** | `ListQueue` | 21,283,537 | 47.53 | 11 | 0 | +| | `MaxMinHeap` | 16,953,121 | 74.09 | 4 | 0 | ### Interpretation of Results @@ -95,8 +95,9 @@ The benchmark results highlight the trade-offs between the different queue imple data structures: - **`ListQueue`**: As a linked list, it excels in scenarios involving frequent additions or removals from either end of - the queue (`AddPeekRemove`, `AddPeekTailRemove`), which are O(1) operations. Its performance is less competitive in high-contention and bulk scenarios, which reflects the necessary per-item memory allocation and pointer manipulation - overhead. + the queue (`AddPeekRemove`, `AddPeekTailRemove`), which are O(1) operations. The `HighContention` benchmark shows that + its simple, low-overhead operations are also extremely performant for consumer throughput even under heavy concurrent + load. - **`MaxMinHeap`**: As a slice-based heap, it has a lower allocation overhead per operation, making it efficient for high-throughput `AddRemove` cycles. Peeking and removing items involves maintaining the heap property, which has an O(log n) cost, making individual peek operations slower than `ListQueue`. diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go index e1c9354b2..916cacdd9 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go @@ -69,11 +69,11 @@ func benchmarkAddRemove(b *testing.B, q framework.SafeQueue) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { item := mocks.NewMockQueueItemAccessor(1, "item", "benchmark-flow") - _, _, err := q.Add(item) + err := q.Add(item) if err != nil { b.Fatalf("Add failed: %v", err) } - _, _, _, err = q.Remove(item.Handle()) + _, err = q.Remove(item.Handle()) if err != nil { b.Fatalf("Remove failed: %v", err) } @@ -86,15 +86,15 @@ func benchmarkAddRemove(b *testing.B, q framework.SafeQueue) { func benchmarkAddPeekRemove(b *testing.B, q framework.SafeQueue) { // Pre-add one item so PeekHead doesn't fail on the first iteration. initialItem := mocks.NewMockQueueItemAccessor(1, "initial", "benchmark-flow") - if _, _, err := q.Add(initialItem); err != nil { + if err := q.Add(initialItem); err != nil { b.Fatalf("Failed to add initial item: %v", err) } b.ReportAllocs() - for b.Loop() { + for i := 0; i < b.N; i++ { item := mocks.NewMockQueueItemAccessor(1, "item", "benchmark-flow") - _, _, err := q.Add(item) + err := q.Add(item) if err != nil { b.Fatalf("Add failed: %v", err) } @@ -106,7 +106,7 @@ func benchmarkAddPeekRemove(b *testing.B, q framework.SafeQueue) { b.Fatalf("PeekHead failed: %v", err) } - _, _, _, err = q.Remove(peeked.Handle()) + _, err = q.Remove(peeked.Handle()) if err != nil { b.Fatalf("Remove failed: %v", err) } @@ -118,13 +118,13 @@ func benchmarkAddPeekRemove(b *testing.B, q framework.SafeQueue) { func benchmarkBulkAddThenBulkRemove(b *testing.B, q framework.SafeQueue) { b.ReportAllocs() - for i := 0; b.Loop(); i++ { + for i := 0; i < b.N; i++ { // Add a batch of items items := make([]types.QueueItemAccessor, 100) for j := range items { item := mocks.NewMockQueueItemAccessor(1, fmt.Sprintf("bulk-%d-%d", i, j), "benchmark-flow") items[j] = item - if _, _, err := q.Add(item); err != nil { + if err := q.Add(item); err != nil { b.Fatalf("Add failed: %v", err) } } @@ -135,7 +135,7 @@ func benchmarkBulkAddThenBulkRemove(b *testing.B, q framework.SafeQueue) { if err != nil { b.Fatalf("PeekHead failed: %v", err) } - if _, _, _, err := q.Remove(peeked.Handle()); err != nil { + if _, err := q.Remove(peeked.Handle()); err != nil { b.Fatalf("Remove failed: %v", err) } } @@ -147,15 +147,15 @@ func benchmarkBulkAddThenBulkRemove(b *testing.B, q framework.SafeQueue) { func benchmarkAddPeekTailRemove(b *testing.B, q framework.SafeQueue) { // Pre-add one item so PeekTail doesn't fail on the first iteration. initialItem := mocks.NewMockQueueItemAccessor(1, "initial", "benchmark-flow") - if _, _, err := q.Add(initialItem); err != nil { + if err := q.Add(initialItem); err != nil { b.Fatalf("Failed to add initial item: %v", err) } b.ReportAllocs() - for b.Loop() { + for i := 0; i < b.N; i++ { item := mocks.NewMockQueueItemAccessor(1, "item", "benchmark-flow") - _, _, err := q.Add(item) + err := q.Add(item) if err != nil { b.Fatalf("Add failed: %v", err) } @@ -165,7 +165,7 @@ func benchmarkAddPeekTailRemove(b *testing.B, q framework.SafeQueue) { b.Fatalf("PeekTail failed: %v", err) } - _, _, _, err = q.Remove(peeked.Handle()) + _, err = q.Remove(peeked.Handle()) if err != nil { b.Fatalf("Remove failed: %v", err) } @@ -178,42 +178,45 @@ func benchmarkHighContention(b *testing.B, q framework.SafeQueue) { // Pre-fill the queue to ensure consumers have work to do immediately. for i := range 1000 { item := mocks.NewMockQueueItemAccessor(1, fmt.Sprintf("prefill-%d", i), "benchmark-flow") - if _, _, err := q.Add(item); err != nil { + if err := q.Add(item); err != nil { b.Fatalf("Failed to pre-fill queue: %v", err) } } - b.ReportAllocs() - b.ResetTimer() + stopCh := make(chan struct{}) + var wgProducers sync.WaitGroup - var wg sync.WaitGroup - // Producers + // Start producer goroutines to run in the background. for range 4 { - wg.Add(1) + wgProducers.Add(1) go func() { - defer wg.Done() - for b.Loop() { - item := mocks.NewMockQueueItemAccessor(1, "item", "benchmark-flow") - _, _, _ = q.Add(item) + defer wgProducers.Done() + for { + select { + case <-stopCh: + return + default: + item := mocks.NewMockQueueItemAccessor(1, "item", "benchmark-flow") + _ = q.Add(item) + } } }() } - // Consumers - for range 4 { - wg.Add(1) - go func() { - defer wg.Done() - for j := 0; j < b.N; j++ { - peeked, err := q.PeekHead() - if err == nil { - _, _, _, _ = q.Remove(peeked.Handle()) - } - // Also peek tail to add more read contention - _, _ = q.PeekTail() + b.ReportAllocs() + b.ResetTimer() + + // Consumers drive the benchmark. + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + peeked, err := q.PeekHead() + if err == nil { + _, _ = q.Remove(peeked.Handle()) } - }() - } + } + }) - wg.Wait() + b.StopTimer() + close(stopCh) // Signal producers to stop. + wgProducers.Wait() } diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/factory.go b/pkg/epp/flowcontrol/framework/plugins/queue/factory.go index f0e9befee..66a8f6b90 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/factory.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/factory.go @@ -51,7 +51,7 @@ func MustRegisterQueue(name RegisteredQueueName, constructor QueueConstructor) { // NewQueueFromName creates a new SafeQueue given its registered name and the `framework.ItemComparator` that will be // optionally used to configure the queue (provided it declares `framework.CapabilityPriorityConfigurable`). -// This is called by the `registry.FlowRegistry` during initialization of a flow's `ports.ManagedQueue`. +// This is called by the `registry.FlowRegistry` during initialization of a flow's `contracts.ManagedQueue`. func NewQueueFromName(name RegisteredQueueName, comparator framework.ItemComparator) (framework.SafeQueue, error) { mu.RLock() defer mu.RUnlock() diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go index dfdf2864e..7e33961e8 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go @@ -90,7 +90,7 @@ func testLifecycleAndOrdering( currentExpectedLen := 0 var currentExpectedByteSize uint64 for i, item := range itemsInOrder { - newLen, newByteSize, addErr := q.Add(item) + addErr := q.Add(item) require.NoError(t, addErr, "[%s] Add should not fail for a valid item (item %d, ID: %s)", comparatorName, i, item.OriginalRequest().ID()) require.NotNil(t, item.Handle(), "[%s] Add must assign a non-nil handle to the item (item %d, ID: %s)", @@ -101,10 +101,10 @@ func testLifecycleAndOrdering( currentExpectedLen++ currentExpectedByteSize += item.OriginalRequest().ByteSize() - assert.Equal(t, uint64(currentExpectedLen), newLen, "[%s] Add must return the correct new length (item %d, ID: %s)", + assert.Equal(t, currentExpectedLen, q.Len(), "[%s] Len() must be correct after Add (item %d, ID: %s)", comparatorName, i, item.OriginalRequest().ID()) - assert.Equal(t, currentExpectedByteSize, newByteSize, - "[%s] Add must return the correct new byte size (item %d, ID: %s)", + assert.Equal(t, currentExpectedByteSize, q.ByteSize(), + "[%s] ByteSize() must be correct after Add (item %d, ID: %s)", comparatorName, i, item.OriginalRequest().ID()) } @@ -149,7 +149,7 @@ func testLifecycleAndOrdering( "[%s] PeekTail must return the item with the lowest priority (iteration %d)", comparatorName, i) // Remove the head item - removed, newLen, newByteSize, removeErr := q.Remove(peekedHandle) + removed, removeErr := q.Remove(peekedHandle) require.NoError(t, removeErr, "[%s] Remove with a valid handle should not fail (iteration %d, item ID: %s)", comparatorName, i, expectedItem.OriginalRequest().ID()) require.NotNil(t, removed, "[%s] Remove should return the removed item (iteration %d)", comparatorName, i) @@ -160,10 +160,6 @@ func testLifecycleAndOrdering( expectedLen-- expectedByteSize -= removed.OriginalRequest().ByteSize() - assert.Equal(t, uint64(expectedLen), newLen, "[%s] Remove must return the correct new length (iteration %d)", - comparatorName, i) - assert.Equal(t, expectedByteSize, newByteSize, - "[%s] Remove must return the correct new byte size (iteration %d)", comparatorName, i) assert.Equal(t, expectedLen, q.Len(), "[%s] Len() should be correctly updated after Remove (iteration %d)", comparatorName, i) assert.Equal(t, expectedByteSize, q.ByteSize(), @@ -261,10 +257,8 @@ func TestQueueConformance(t *testing.T) { currentLen := q.Len() currentByteSize := q.ByteSize() - newLen, newByteSize, err := q.Add(nil) + err = q.Add(nil) assert.ErrorIs(t, err, framework.ErrNilQueueItem, "Add(nil) must return ErrNilQueueItem") - assert.Equal(t, uint64(currentLen), newLen, "Add(nil) must not change the length returned") - assert.Equal(t, currentByteSize, newByteSize, "Add(nil) must not change the byte size returned") assert.Equal(t, currentLen, q.Len(), "The queue's length must not change after a failed Add") assert.Equal(t, currentByteSize, q.ByteSize(), "The queue's byte size must not change after a failed Add") }) @@ -275,13 +269,13 @@ func TestQueueConformance(t *testing.T) { require.NoError(t, err, "Setup: creating queue for test should not fail") item := typesmocks.NewMockQueueItemAccessor(100, "item", flowSpec.ID) - _, _, err = q.Add(item) + err = q.Add(item) require.NoError(t, err, "Setup: adding an item should succeed") otherQ, err := constructor(enqueueTimeComparator) // A different queue instance require.NoError(t, err, "Setup: creating otherQ should succeed") otherItem := typesmocks.NewMockQueueItemAccessor(10, "other_item", "other_flow") - _, _, err = otherQ.Add(otherItem) + err = otherQ.Add(otherItem) require.NoError(t, err, "Setup: adding item to otherQ should succeed") alienHandle := otherItem.Handle() require.NotNil(t, alienHandle, "Setup: alien handle should not be nil") @@ -308,11 +302,8 @@ func TestQueueConformance(t *testing.T) { currentLen := q.Len() currentByteSize := q.ByteSize() - _, newLen, newByteSize, removeErr := q.Remove(tc.handle) + _, removeErr := q.Remove(tc.handle) assert.ErrorIs(t, removeErr, tc.expectErr, "Remove with %s should produce %v", tc.name, tc.expectErr) - assert.Equal(t, uint64(currentLen), newLen, "Remove with %s must not change the length returned", tc.name) - assert.Equal(t, currentByteSize, newByteSize, "Remove with %s must not change the byte size returned", - tc.name) assert.Equal(t, currentLen, q.Len(), "The queue's length must not change after a failed Remove with %s", tc.name) assert.Equal(t, currentByteSize, q.ByteSize(), @@ -334,25 +325,24 @@ func TestQueueConformance(t *testing.T) { item3 := typesmocks.NewMockQueueItemAccessor(30, "item3_nonhead", flowSpec.ID) item3.EnqueueTimeV = now.Add(-1 * time.Second) - _, _, _ = q.Add(item1) - _, _, _ = q.Add(item2) - _, _, _ = q.Add(item3) + _ = q.Add(item1) + _ = q.Add(item2) + _ = q.Add(item3) require.Equal(t, 3, q.Len(), "Queue should have 3 items before removing non-head") handleNonHead := item2.Handle() - removed, newLen, newByteSize, err := q.Remove(handleNonHead) + removed, err := q.Remove(handleNonHead) require.NoError(t, err, "It should be possible to remove an item that is not the head") require.NotNil(t, removed, "Remove should return the removed item") assert.Equal(t, item2.OriginalRequest().ID(), removed.OriginalRequest().ID(), "Remove should return the correct item (item2)") assert.True(t, handleNonHead.IsInvalidated(), "Remove must invalidate the handle of the removed item") - assert.Equal(t, uint64(2), newLen, "Remove must return the correct new length (2)") - assert.Equal(t, item1.OriginalRequest().ByteSize()+item3.OriginalRequest().ByteSize(), newByteSize, - "Remove must return the correct new byte size") assert.Equal(t, 2, q.Len(), "Queue length should be 2 after removing non-head") + assert.Equal(t, item1.OriginalRequest().ByteSize()+item3.OriginalRequest().ByteSize(), q.ByteSize(), + "Byte size should be correct after removing non-head") // Attempt to remove again with the now-stale handle - _, _, _, errStaleNonHead := q.Remove(handleNonHead) + _, errStaleNonHead := q.Remove(handleNonHead) assert.ErrorIs(t, errStaleNonHead, framework.ErrInvalidQueueItemHandle, "Removing with a stale handle must fail with ErrInvalidQueueItemHandle") }) @@ -376,8 +366,8 @@ func TestQueueConformance(t *testing.T) { q, _ := constructor(enqueueTimeComparator) itemK1 := typesmocks.NewMockQueueItemAccessor(10, "k1_matchNone", flowSpec.ID) itemK2 := typesmocks.NewMockQueueItemAccessor(12, "k2_matchNone", flowSpec.ID) - _, _, _ = q.Add(itemK1) - _, _, _ = q.Add(itemK2) + _ = q.Add(itemK1) + _ = q.Add(itemK2) initialLen := q.Len() initialBs := q.ByteSize() @@ -396,8 +386,8 @@ func TestQueueConformance(t *testing.T) { q, _ := constructor(enqueueTimeComparator) itemR1 := typesmocks.NewMockQueueItemAccessor(11, "r1_matchAll", flowSpec.ID) itemR2 := typesmocks.NewMockQueueItemAccessor(13, "r2_matchAll", flowSpec.ID) - _, _, _ = q.Add(itemR1) - _, _, _ = q.Add(itemR2) + _ = q.Add(itemR1) + _ = q.Add(itemR2) cleanedItems, err := q.Cleanup(func(item types.QueueItemAccessor) bool { return true }) require.NoError(t, err, "Cleanup should not return an error") @@ -415,10 +405,10 @@ func TestQueueConformance(t *testing.T) { iR1 := typesmocks.NewMockQueueItemAccessor(11, "r1_subset", flowSpec.ID) iK2 := typesmocks.NewMockQueueItemAccessor(22, "k2_subset", flowSpec.ID) iR2 := typesmocks.NewMockQueueItemAccessor(33, "r2_subset", flowSpec.ID) - _, _, _ = q.Add(iK1) - _, _, _ = q.Add(iR1) - _, _, _ = q.Add(iK2) - _, _, _ = q.Add(iR2) + _ = q.Add(iK1) + _ = q.Add(iR1) + _ = q.Add(iK2) + _ = q.Add(iR2) expectedKeptByteSize := iK1.OriginalRequest().ByteSize() + iK2.OriginalRequest().ByteSize() @@ -449,7 +439,7 @@ func TestQueueConformance(t *testing.T) { var remainingIDs []string for q.Len() > 0 { peeked, _ := q.PeekHead() - item, _, _, _ := q.Remove(peeked.Handle()) + item, _ := q.Remove(peeked.Handle()) remainingIDs = append(remainingIDs, item.OriginalRequest().ID()) } sort.Strings(remainingIDs) // Sort for stable comparison @@ -465,8 +455,8 @@ func TestQueueConformance(t *testing.T) { itemD1 := typesmocks.NewMockQueueItemAccessor(10, "ditem1", flowSpec.ID) itemD2 := typesmocks.NewMockQueueItemAccessor(20, "ditem2", flowSpec.ID) - _, _, _ = q.Add(itemD1) - _, _, _ = q.Add(itemD2) + _ = q.Add(itemD1) + _ = q.Add(itemD2) drainedItems, err := q.Drain() require.NoError(t, err, "Drain on a non-empty queue should not fail") @@ -523,7 +513,7 @@ func TestQueueConformance(t *testing.T) { // Pre-populate the queue with an initial set of items. for i := 0; i < initialItems; i++ { item := typesmocks.NewMockQueueItemAccessor(1, fmt.Sprintf("%s_conc_init_%d", flowSpec.ID, i), flowSpec.ID) - _, _, err := q.Add(item) + err := q.Add(item) require.NoError(t, err, "Setup: pre-populating the queue should not fail") handleChan <- item.Handle() } @@ -542,7 +532,7 @@ func TestQueueConformance(t *testing.T) { case 0: // Add item := typesmocks.NewMockQueueItemAccessor(1, fmt.Sprintf("%s_conc_init_%d_%d", flowSpec.ID, routineID, j), flowSpec.ID) - _, _, err := q.Add(item) + err := q.Add(item) if assert.NoError(t, err, "Add must be goroutine-safe") { successfulAdds.Add(1) handleChan <- item.Handle() @@ -551,7 +541,7 @@ func TestQueueConformance(t *testing.T) { select { case handle := <-handleChan: if handle != nil && !handle.IsInvalidated() { // Check before trying to prevent known-to-fail calls - _, _, _, removeErr := q.Remove(handle) + _, removeErr := q.Remove(handle) if removeErr == nil { successfulRemoves.Add(1) } else { diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/listqueue/listqueue.go b/pkg/epp/flowcontrol/framework/plugins/queue/listqueue/listqueue.go index 6a1e2207f..8e123b631 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/listqueue/listqueue.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/listqueue/listqueue.go @@ -82,48 +82,43 @@ func newListQueue() *listQueue { // --- `framework.SafeQueue` Interface Implementation --- // Add enqueues an item to the back of the list. -func (lq *listQueue) Add(item types.QueueItemAccessor) (newLen, newByteSize uint64, err error) { +func (lq *listQueue) Add(item types.QueueItemAccessor) error { lq.mu.Lock() defer lq.mu.Unlock() if item == nil { - return uint64(lq.requests.Len()), lq.byteSize.Load(), framework.ErrNilQueueItem + return framework.ErrNilQueueItem } element := lq.requests.PushBack(item) lq.byteSize.Add(item.OriginalRequest().ByteSize()) item.SetHandle(&listItemHandle{element: element, owner: lq}) - return uint64(lq.requests.Len()), lq.byteSize.Load(), nil + return nil } // Remove removes an item identified by the given handle from the queue. -func (lq *listQueue) Remove( - handle types.QueueItemHandle, -) (removedItem types.QueueItemAccessor, newLen, newByteSize uint64, err error) { +func (lq *listQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) { lq.mu.Lock() defer lq.mu.Unlock() - currentLen := uint64(lq.requests.Len()) - currentByteSize := lq.byteSize.Load() - if handle == nil || handle.IsInvalidated() { - return nil, currentLen, currentByteSize, framework.ErrInvalidQueueItemHandle + return nil, framework.ErrInvalidQueueItemHandle } lh, ok := handle.(*listItemHandle) if !ok { - return nil, currentLen, currentByteSize, framework.ErrInvalidQueueItemHandle + return nil, framework.ErrInvalidQueueItemHandle } if lh.owner != lq { - return nil, currentLen, currentByteSize, framework.ErrQueueItemNotFound + return nil, framework.ErrQueueItemNotFound } item := lh.element.Value.(types.QueueItemAccessor) lq.requests.Remove(lh.element) lq.byteSize.Add(^item.OriginalRequest().ByteSize() + 1) // Atomic subtraction handle.Invalidate() - return item, uint64(lq.requests.Len()), lq.byteSize.Load(), nil + return item, nil } // Cleanup removes items from the queue that satisfy the predicate. diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap.go b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap.go index 3d3dae727..81db8496e 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap.go @@ -155,17 +155,17 @@ func (h *maxMinHeap) PeekTail() (types.QueueItemAccessor, error) { // Add adds an item to the queue. // Time complexity: O(log n). -func (h *maxMinHeap) Add(item types.QueueItemAccessor) (uint64, uint64, error) { +func (h *maxMinHeap) Add(item types.QueueItemAccessor) error { h.mu.Lock() defer h.mu.Unlock() if item == nil { - return uint64(len(h.items)), h.byteSize.Load(), framework.ErrNilQueueItem + return framework.ErrNilQueueItem } h.push(item) h.byteSize.Add(item.OriginalRequest().ByteSize()) - return uint64(len(h.items)), h.byteSize.Load(), nil + return nil } // push adds an item to the heap and restores the heap property. @@ -256,30 +256,27 @@ func (h *maxMinHeap) swap(i, j int) { // Remove removes an item from the queue. // Time complexity: O(log n). -func (h *maxMinHeap) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, uint64, uint64, error) { +func (h *maxMinHeap) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) { h.mu.Lock() defer h.mu.Unlock() - currentLen := uint64(len(h.items)) - currentByteSize := h.byteSize.Load() - if handle == nil { - return nil, currentLen, currentByteSize, framework.ErrInvalidQueueItemHandle + return nil, framework.ErrInvalidQueueItemHandle } if handle.IsInvalidated() { - return nil, currentLen, currentByteSize, framework.ErrInvalidQueueItemHandle + return nil, framework.ErrInvalidQueueItemHandle } heapItem, ok := handle.(*heapItem) if !ok { - return nil, currentLen, currentByteSize, framework.ErrInvalidQueueItemHandle + return nil, framework.ErrInvalidQueueItemHandle } // Now we can check if the handle is in the map _, ok = h.handles[handle] if !ok { - return nil, currentLen, currentByteSize, framework.ErrQueueItemNotFound + return nil, framework.ErrQueueItemNotFound } i := heapItem.index @@ -305,7 +302,7 @@ func (h *maxMinHeap) Remove(handle types.QueueItemHandle) (types.QueueItemAccess } handle.Invalidate() - return item, uint64(len(h.items)), h.byteSize.Load(), nil + return item, nil } // down moves the item at index i down the heap to its correct position. diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap_test.go index defbf0f40..165d48f55 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap_test.go @@ -47,7 +47,7 @@ func TestMaxMinHeap_InternalProperty(t *testing.T) { // Add items in a somewhat random order of enqueue times items[i] = typesmocks.NewMockQueueItemAccessor(10, "item", "flow") items[i].EnqueueTimeV = now.Add(time.Duration((i%5-2)*10) * time.Second) - _, _, err := q.Add(items[i]) + err := q.Add(items[i]) require.NoError(t, err, "Add should not fail") assertHeapProperty(t, q, "after adding item %d", i) } @@ -55,7 +55,7 @@ func TestMaxMinHeap_InternalProperty(t *testing.T) { // Remove a few items from the middle and validate the heap property for _, i := range []int{15, 7, 11} { handle := items[i].Handle() - _, _, _, err := q.Remove(handle) + _, err := q.Remove(handle) require.NoError(t, err, "Remove should not fail for item %d", i) assertHeapProperty(t, q, "after removing item %d", i) } @@ -64,7 +64,7 @@ func TestMaxMinHeap_InternalProperty(t *testing.T) { for q.Len() > 0 { head, err := q.PeekHead() require.NoError(t, err) - _, _, _, err = q.Remove(head.Handle()) + _, err = q.Remove(head.Handle()) require.NoError(t, err) assertHeapProperty(t, q, "after removing head item") } diff --git a/pkg/epp/flowcontrol/framework/policies.go b/pkg/epp/flowcontrol/framework/policies.go index 8211d6091..da6d5c445 100644 --- a/pkg/epp/flowcontrol/framework/policies.go +++ b/pkg/epp/flowcontrol/framework/policies.go @@ -91,7 +91,7 @@ type IntraFlowDispatchPolicy interface { // For queues that inherently order items by dispatch preference, this method will typically just call // `queue.PeekHead()`. // - // The `controller.FlowController` uses the handle from the returned item to instruct the `ports.ManagedQueue` to + // The `controller.FlowController` uses the handle from the returned item to instruct the `contracts.ManagedQueue` to // remove it. // // Returns: @@ -144,7 +144,7 @@ type InterFlowDispatchPolicy interface { // FlowQueueAccessor provides a policy-facing, read-only view of a single flow's queue. // It combines general queue inspection methods (embedded via `QueueInspectionMethods`) with flow-specific metadata. // -// Instances of `FlowQueueAccessor` are vended by a `ports.ManagedQueue` and are the primary means by which policies +// Instances of `FlowQueueAccessor` are vended by a `contracts.ManagedQueue` and are the primary means by which policies // inspect individual queue state. // // Conformance: Implementations MUST ensure all methods (including those embedded from `QueueInspectionMethods`) are diff --git a/pkg/epp/flowcontrol/framework/queue.go b/pkg/epp/flowcontrol/framework/queue.go index c4829f027..d00cccfaf 100644 --- a/pkg/epp/flowcontrol/framework/queue.go +++ b/pkg/epp/flowcontrol/framework/queue.go @@ -25,7 +25,7 @@ import ( // ensuring that a policy is always paired with a compatible queue. // // For example, a policy that requires a priority-ordered queue would declare `CapabilityPriorityConfigurable`, and the -// `ports.FlowRegistry` would ensure it is paired with a queue implementation (like a heap) that provides this +// `contracts.FlowRegistry` would ensure it is paired with a queue implementation (like a heap) that provides this // capability. // // While a simpler boolean method (e.g., `IsPriorityConfigurable()`) could satisfy current needs, this slice-based @@ -82,18 +82,16 @@ type SafeQueue interface { // Add attempts to enqueue an item. On success, it must associate a new, unique `types.QueueItemHandle` with the item // by calling `item.SetHandle()`. - // - // Returns the new length and byte size of the queue. - Add(item types.QueueItemAccessor) (newLen, newByteSize uint64, err error) + Add(item types.QueueItemAccessor) error // Remove atomically finds and removes the item identified by the given handle. // // On success, implementations MUST invalidate the provided handle by calling `handle.Invalidate()`. // - // Returns the removed item and the new length and byte size of the queue. + // Returns the removed item. // Returns `ErrInvalidQueueItemHandle` if the handle is invalid (e.g., nil, wrong type, already invalidated). // Returns `ErrQueueItemNotFound` if the handle is valid but the item is not in the queue. - Remove(handle types.QueueItemHandle) (removedItem types.QueueItemAccessor, newLen, newByteSize uint64, err error) + Remove(handle types.QueueItemHandle) (removedItem types.QueueItemAccessor, err error) // Cleanup iterates through the queue and atomically removes all items for which the predicate returns true, returning // them in a slice. diff --git a/pkg/epp/flowcontrol/registry/managedqueue.go b/pkg/epp/flowcontrol/registry/managedqueue.go new file mode 100644 index 000000000..b8bc1022f --- /dev/null +++ b/pkg/epp/flowcontrol/registry/managedqueue.go @@ -0,0 +1,185 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package registry provides the concrete implementation of the Flow Registry, which is the stateful control plane for +// the Flow Control system. It implements the service interfaces defined in the `contracts` package. +// +// This initial version includes the implementation of the `contracts.ManagedQueue`, a stateful wrapper that adds atomic +// statistics tracking to a `framework.SafeQueue`. +package registry + +import ( + "sync/atomic" + + "github.com/go-logr/logr" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" +) + +type parentStatsReconciler func(lenDelta, byteSizeDelta int64) + +// managedQueue implements `contracts.ManagedQueue`. It wraps a `framework.SafeQueue` and is responsible for maintaining +// accurate, atomically-updated statistics that are aggregated at the shard level. +// +// # Statistical Integrity +// +// For performance, `managedQueue` maintains its own `len` and `byteSize` fields using atomic operations. This provides +// O(1) access for the parent `contracts.RegistryShard`'s aggregated statistics without needing to lock the underlying +// queue. +// +// This design is predicated on two critical assumptions: +// 1. Exclusive Access: All mutating operations on the underlying `framework.SafeQueue` MUST be performed exclusively +// through this `managedQueue` wrapper. Direct access to the underlying queue will cause statistical drift. +// 2. In-Process Queue: The `framework.SafeQueue` implementation is an in-process data structure (e.g., a list or +// heap). Its state MUST NOT change through external mechanisms. For example, a queue implementation backed by a +// distributed cache like Redis with its own TTL-based eviction policy would violate this assumption and lead to +// state inconsistency, as items could be removed without notifying the `managedQueue`. +// +// This approach avoids the need for the `framework.SafeQueue` interface to return state deltas on each operation, +// keeping its contract simpler. +type managedQueue struct { + // Note: There is no mutex here. Concurrency control is delegated to the underlying `framework.SafeQueue` and the + // atomic operations on the stats fields. + queue framework.SafeQueue + dispatchPolicy framework.IntraFlowDispatchPolicy + flowSpec types.FlowSpecification + byteSize atomic.Uint64 + len atomic.Uint64 + reconcileShardStats parentStatsReconciler + logger logr.Logger +} + +// newManagedQueue creates a new instance of a `managedQueue`. +func newManagedQueue( + queue framework.SafeQueue, + dispatchPolicy framework.IntraFlowDispatchPolicy, + flowSpec types.FlowSpecification, + logger logr.Logger, + reconcileShardStats func(lenDelta, byteSizeDelta int64), +) *managedQueue { + mqLogger := logger.WithName("managed-queue").WithValues( + "flowID", flowSpec.ID, + "priority", flowSpec.Priority, + "queueType", queue.Name(), + ) + return &managedQueue{ + queue: queue, + dispatchPolicy: dispatchPolicy, + flowSpec: flowSpec, + reconcileShardStats: reconcileShardStats, + logger: mqLogger, + } +} + +// FlowQueueAccessor returns a new `flowQueueAccessor` instance. +func (mq *managedQueue) FlowQueueAccessor() framework.FlowQueueAccessor { + return &flowQueueAccessor{mq: mq} +} + +func (mq *managedQueue) Add(item types.QueueItemAccessor) error { + if err := mq.queue.Add(item); err != nil { + return err + } + mq.reconcileStats(1, int64(item.OriginalRequest().ByteSize())) + return nil +} + +func (mq *managedQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) { + removedItem, err := mq.queue.Remove(handle) + if err != nil { + return nil, err + } + mq.reconcileStats(-1, -int64(removedItem.OriginalRequest().ByteSize())) + // TODO: If mq.len.Load() == 0, signal shard for optimistic instance cleanup. + return removedItem, nil +} + +func (mq *managedQueue) Cleanup(predicate framework.PredicateFunc) (cleanedItems []types.QueueItemAccessor, err error) { + cleanedItems, err = mq.queue.Cleanup(predicate) + if err != nil || len(cleanedItems) == 0 { + return cleanedItems, err + } + + var lenDelta int64 + var byteSizeDelta int64 + for _, item := range cleanedItems { + lenDelta-- + byteSizeDelta -= int64(item.OriginalRequest().ByteSize()) + } + mq.reconcileStats(lenDelta, byteSizeDelta) + // TODO: If mq.len.Load() == 0, signal shard for optimistic instance cleanup. + return cleanedItems, nil +} + +func (mq *managedQueue) Drain() ([]types.QueueItemAccessor, error) { + drainedItems, err := mq.queue.Drain() + if err != nil || len(drainedItems) == 0 { + return drainedItems, err + } + + var lenDelta int64 + var byteSizeDelta int64 + for _, item := range drainedItems { + lenDelta-- + byteSizeDelta -= int64(item.OriginalRequest().ByteSize()) + } + mq.reconcileStats(lenDelta, byteSizeDelta) + // TODO: If mq.len.Load() == 0, signal shard for optimistic instance cleanup. + return drainedItems, nil +} + +// reconcileStats atomically updates the queue's own statistics and calls the parent shard's reconciler to ensure +// aggregated stats remain consistent. +func (mq *managedQueue) reconcileStats(lenDelta, byteSizeDelta int64) { + // The use of Add with a negative number on a Uint64 is the standard Go atomic way to perform subtraction, leveraging + // two's complement arithmetic. + mq.len.Add(uint64(lenDelta)) + mq.byteSize.Add(uint64(byteSizeDelta)) + mq.reconcileShardStats(lenDelta, byteSizeDelta) +} + +// --- Pass-through and accessor methods --- + +func (mq *managedQueue) Name() string { return mq.queue.Name() } +func (mq *managedQueue) Capabilities() []framework.QueueCapability { return mq.queue.Capabilities() } +func (mq *managedQueue) Len() int { return int(mq.len.Load()) } +func (mq *managedQueue) ByteSize() uint64 { return mq.byteSize.Load() } +func (mq *managedQueue) PeekHead() (types.QueueItemAccessor, error) { return mq.queue.PeekHead() } +func (mq *managedQueue) PeekTail() (types.QueueItemAccessor, error) { return mq.queue.PeekTail() } +func (mq *managedQueue) Comparator() framework.ItemComparator { return mq.dispatchPolicy.Comparator() } + +var _ contracts.ManagedQueue = &managedQueue{} + +// --- flowQueueAccessor --- + +// flowQueueAccessor implements `framework.FlowQueueAccessor`. It provides a read-only, policy-facing view of a +// `managedQueue`. +type flowQueueAccessor struct { + mq *managedQueue +} + +func (a *flowQueueAccessor) Name() string { return a.mq.Name() } +func (a *flowQueueAccessor) Capabilities() []framework.QueueCapability { return a.mq.Capabilities() } +func (a *flowQueueAccessor) Len() int { return a.mq.Len() } +func (a *flowQueueAccessor) ByteSize() uint64 { return a.mq.ByteSize() } +func (a *flowQueueAccessor) PeekHead() (types.QueueItemAccessor, error) { return a.mq.PeekHead() } +func (a *flowQueueAccessor) PeekTail() (types.QueueItemAccessor, error) { return a.mq.PeekTail() } +func (a *flowQueueAccessor) Comparator() framework.ItemComparator { return a.mq.Comparator() } +func (a *flowQueueAccessor) FlowSpec() types.FlowSpecification { return a.mq.flowSpec } + +var _ framework.FlowQueueAccessor = &flowQueueAccessor{} diff --git a/pkg/epp/flowcontrol/registry/managedqueue_test.go b/pkg/epp/flowcontrol/registry/managedqueue_test.go new file mode 100644 index 000000000..b94c055a3 --- /dev/null +++ b/pkg/epp/flowcontrol/registry/managedqueue_test.go @@ -0,0 +1,465 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "errors" + "sync" + "sync/atomic" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue/listqueue" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" +) + +// testStatsReconciler is a mock implementation of the `parentStatsReconciler` function. +// It captures the deltas it's called with, allowing tests to assert on them. +type testStatsReconciler struct { + mu sync.Mutex + lenDelta int64 + byteSizeDelta int64 + invocationCount int +} + +func (r *testStatsReconciler) reconcile(lenDelta, byteSizeDelta int64) { + r.mu.Lock() + defer r.mu.Unlock() + r.lenDelta += lenDelta + r.byteSizeDelta += byteSizeDelta + r.invocationCount++ +} + +func (r *testStatsReconciler) getStats() (lenDelta, byteSizeDelta int64, count int) { + r.mu.Lock() + defer r.mu.Unlock() + return r.lenDelta, r.byteSizeDelta, r.invocationCount +} + +// testFixture holds the components needed for a `managedQueue` test. +type testFixture struct { + mq *managedQueue + mockQueue *frameworkmocks.MockSafeQueue + mockPolicy *frameworkmocks.MockIntraFlowDispatchPolicy + reconciler *testStatsReconciler + flowSpec types.FlowSpecification + mockComparator *frameworkmocks.MockItemComparator +} + +// setupTestManagedQueue creates a new test fixture for testing the `managedQueue`. +func setupTestManagedQueue(t *testing.T) *testFixture { + t.Helper() + + mockQueue := &frameworkmocks.MockSafeQueue{} + reconciler := &testStatsReconciler{} + flowSpec := types.FlowSpecification{ID: "test-flow", Priority: 1} + mockComparator := &frameworkmocks.MockItemComparator{} + mockPolicy := &frameworkmocks.MockIntraFlowDispatchPolicy{ + ComparatorV: mockComparator, + } + + mq := newManagedQueue( + mockQueue, + mockPolicy, + flowSpec, + logr.Discard(), + reconciler.reconcile, + ) + require.NotNil(t, mq, "newManagedQueue should not return nil") + + return &testFixture{ + mq: mq, + mockQueue: mockQueue, + mockPolicy: mockPolicy, + reconciler: reconciler, + flowSpec: flowSpec, + mockComparator: mockComparator, + } +} + +func TestManagedQueue_New(t *testing.T) { + t.Parallel() + f := setupTestManagedQueue(t) + + assert.Zero(t, f.mq.Len(), "A new managedQueue should have a length of 0") + assert.Zero(t, f.mq.ByteSize(), "A new managedQueue should have a byte size of 0") +} + +func TestManagedQueue_Add(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + itemByteSize uint64 + mockAddError error + expectError bool + expectedLen int + expectedByteSize uint64 + expectedLenDelta int64 + expectedByteSizeDelta int64 + expectedReconcile bool + }{ + { + name: "Success", + itemByteSize: 100, + mockAddError: nil, + expectError: false, + expectedLen: 1, + expectedByteSize: 100, + expectedLenDelta: 1, + expectedByteSizeDelta: 100, + expectedReconcile: true, + }, + { + name: "Error from underlying queue", + itemByteSize: 100, + mockAddError: errors.New("queue full"), + expectError: true, + expectedLen: 0, + expectedByteSize: 0, + expectedLenDelta: 0, + expectedByteSizeDelta: 0, + expectedReconcile: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + f := setupTestManagedQueue(t) + + // Configure mock + f.mockQueue.AddFunc = func(item types.QueueItemAccessor) error { + return tc.mockAddError + } + + item := typesmocks.NewMockQueueItemAccessor(tc.itemByteSize, "req-1", "test-flow") + err := f.mq.Add(item) + + if tc.expectError { + require.Error(t, err, "Add should have returned an error") + } else { + require.NoError(t, err, "Add should not have returned an error") + } + + // Assert final state + assert.Equal(t, tc.expectedLen, f.mq.Len(), "Final length should be as expected") + assert.Equal(t, tc.expectedByteSize, f.mq.ByteSize(), "Final byte size should be as expected") + + // Assert reconciler state + lenDelta, byteSizeDelta, count := f.reconciler.getStats() + assert.Equal(t, tc.expectedLenDelta, lenDelta, "Reconciler length delta should be as expected") + assert.Equal(t, tc.expectedByteSizeDelta, byteSizeDelta, "Reconciler byte size delta should be as expected") + if tc.expectedReconcile { + assert.Equal(t, 1, count, "Reconciler should have been called once") + } else { + assert.Zero(t, count, "Reconciler should not have been called") + } + }) + } +} + +func TestManagedQueue_Remove(t *testing.T) { + t.Parallel() + f := setupTestManagedQueue(t) + + // Setup initial state + initialItem := typesmocks.NewMockQueueItemAccessor(100, "req-1", "test-flow") + f.mockQueue.AddFunc = func(item types.QueueItemAccessor) error { return nil } + err := f.mq.Add(initialItem) + require.NoError(t, err, "Setup: Adding an item should not fail") + require.Equal(t, 1, f.mq.Len(), "Setup: Length should be 1 after adding an item") + + // --- Test Success --- + t.Run("Success", func(t *testing.T) { + // Configure mock for Remove + f.mockQueue.RemoveFunc = func(handle types.QueueItemHandle) (types.QueueItemAccessor, error) { + return initialItem, nil + } + + // Perform Remove + removedItem, err := f.mq.Remove(initialItem.Handle()) + require.NoError(t, err, "Remove should not return an error") + assert.Equal(t, initialItem, removedItem, "Remove should return the correct item") + + // Assert final state + assert.Zero(t, f.mq.Len(), "Length should be 0 after removing the only item") + assert.Zero(t, f.mq.ByteSize(), "ByteSize should be 0 after removing the only item") + + // Assert reconciler state + lenDelta, byteSizeDelta, count := f.reconciler.getStats() + assert.Equal(t, int64(0), lenDelta, "Net length delta should be 0 after add and remove") + assert.Equal(t, int64(0), byteSizeDelta, "Net byte size delta should be 0 after add and remove") + assert.Equal(t, 2, count, "Reconciler should have been called for both Add and Remove") + }) + + // --- Test Error --- + t.Run("Error", func(t *testing.T) { + f := setupTestManagedQueue(t) + require.NoError(t, f.mq.Add(initialItem), "Setup: Adding an item should not fail") + + // Configure mock to return an error + expectedErr := errors.New("item not found") + f.mockQueue.RemoveFunc = func(handle types.QueueItemHandle) (types.QueueItemAccessor, error) { + return nil, expectedErr + } + + _, err := f.mq.Remove(initialItem.Handle()) + require.ErrorIs(t, err, expectedErr, "Remove should propagate the error") + + // Assert state did not change + assert.Equal(t, 1, f.mq.Len(), "Length should not change on a failed remove") + assert.Equal(t, uint64(100), f.mq.ByteSize(), "ByteSize should not change on a failed remove") + + // Assert reconciler was not called for the remove + _, _, count := f.reconciler.getStats() + assert.Equal(t, 1, count, "Reconciler should only have been called for the initial Add") + }) +} + +func TestManagedQueue_CleanupAndDrain(t *testing.T) { + t.Parallel() + item1 := typesmocks.NewMockQueueItemAccessor(10, "req-1", "test-flow") + item2 := typesmocks.NewMockQueueItemAccessor(20, "req-2", "test-flow") + item3 := typesmocks.NewMockQueueItemAccessor(30, "req-3", "test-flow") + + // --- Test Cleanup --- + t.Run("Cleanup", func(t *testing.T) { + t.Parallel() + f := setupTestManagedQueue(t) + // Add initial items + require.NoError(t, f.mq.Add(item1), "Setup: Add item1 should not fail") + require.NoError(t, f.mq.Add(item2), "Setup: Add item2 should not fail") + require.NoError(t, f.mq.Add(item3), "Setup: Add item3 should not fail") + require.Equal(t, 3, f.mq.Len(), "Setup: Initial length should be 3") + require.Equal(t, uint64(60), f.mq.ByteSize(), "Setup: Initial byte size should be 60") + + // Configure mock to clean up item2 + f.mockQueue.CleanupFunc = func(p framework.PredicateFunc) ([]types.QueueItemAccessor, error) { + return []types.QueueItemAccessor{item2}, nil + } + + cleaned, err := f.mq.Cleanup(func(i types.QueueItemAccessor) bool { return true }) + require.NoError(t, err, "Cleanup should not return an error") + require.Len(t, cleaned, 1, "Cleanup should return one item") + + // Assert final state + assert.Equal(t, 2, f.mq.Len(), "Length should be 2 after cleanup") + assert.Equal(t, uint64(40), f.mq.ByteSize(), "ByteSize should be 40 after cleanup") + + // Assert reconciler state (3 adds, 1 cleanup) + lenDelta, byteSizeDelta, count := f.reconciler.getStats() + assert.Equal(t, int64(2), lenDelta, "Net length delta should be 2") + assert.Equal(t, int64(40), byteSizeDelta, "Net byte size delta should be 40") + assert.Equal(t, 4, count, "Reconciler should have been called 4 times") + }) + + // --- Test Drain --- + t.Run("Drain", func(t *testing.T) { + t.Parallel() + f := setupTestManagedQueue(t) + // Add initial items + require.NoError(t, f.mq.Add(item1), "Setup: Add item1 should not fail") + require.NoError(t, f.mq.Add(item2), "Setup: Add item2 should not fail") + require.Equal(t, 2, f.mq.Len(), "Setup: Initial length should be 2") + require.Equal(t, uint64(30), f.mq.ByteSize(), "Setup: Initial byte size should be 30") + + // Configure mock to drain both items + f.mockQueue.DrainFunc = func() ([]types.QueueItemAccessor, error) { + return []types.QueueItemAccessor{item1, item2}, nil + } + + drained, err := f.mq.Drain() + require.NoError(t, err, "Drain should not return an error") + require.Len(t, drained, 2, "Drain should return two items") + + // Assert final state + assert.Zero(t, f.mq.Len(), "Length should be 0 after drain") + assert.Zero(t, f.mq.ByteSize(), "ByteSize should be 0 after drain") + + // Assert reconciler state (2 adds, 1 drain) + lenDelta, byteSizeDelta, count := f.reconciler.getStats() + assert.Equal(t, int64(0), lenDelta, "Net length delta should be 0") + assert.Equal(t, int64(0), byteSizeDelta, "Net byte size delta should be 0") + assert.Equal(t, 3, count, "Reconciler should have been called 3 times") + }) + + // --- Test Error Paths --- + t.Run("ErrorPaths", func(t *testing.T) { + f := setupTestManagedQueue(t) + require.NoError(t, f.mq.Add(item1)) + initialLen, initialByteSize := f.mq.Len(), f.mq.ByteSize() + + expectedErr := errors.New("internal error") + + // Cleanup error + f.mockQueue.CleanupFunc = func(p framework.PredicateFunc) ([]types.QueueItemAccessor, error) { + return nil, expectedErr + } + _, err := f.mq.Cleanup(func(i types.QueueItemAccessor) bool { return true }) + require.ErrorIs(t, err, expectedErr, "Cleanup should propagate error") + assert.Equal(t, initialLen, f.mq.Len(), "Len should not change on Cleanup error") + assert.Equal(t, initialByteSize, f.mq.ByteSize(), "ByteSize should not change on Cleanup error") + + // Drain error + f.mockQueue.DrainFunc = func() ([]types.QueueItemAccessor, error) { + return nil, expectedErr + } + _, err = f.mq.Drain() + require.ErrorIs(t, err, expectedErr, "Drain should propagate error") + assert.Equal(t, initialLen, f.mq.Len(), "Len should not change on Drain error") + assert.Equal(t, initialByteSize, f.mq.ByteSize(), "ByteSize should not change on Drain error") + }) +} + +func TestManagedQueue_FlowQueueAccessor(t *testing.T) { + t.Parallel() + f := setupTestManagedQueue(t) + item := typesmocks.NewMockQueueItemAccessor(100, "req-1", "test-flow") + + // Setup underlying queue state + f.mockQueue.PeekHeadV = item + f.mockQueue.PeekTailV = item + f.mockQueue.NameV = "MockQueue" + f.mockQueue.CapabilitiesV = []framework.QueueCapability{framework.CapabilityFIFO} + + // Add an item to populate the managed queue's stats + require.NoError(t, f.mq.Add(item), "Setup: Adding an item should not fail") + + // Get the accessor + accessor := f.mq.FlowQueueAccessor() + require.NotNil(t, accessor, "FlowQueueAccessor should not be nil") + + // Assert that the accessor methods reflect the underlying state + assert.Equal(t, f.mq.Name(), accessor.Name(), "Accessor Name() should match managed queue") + assert.Equal(t, f.mq.Capabilities(), accessor.Capabilities(), "Accessor Capabilities() should match managed queue") + assert.Equal(t, f.mq.Len(), accessor.Len(), "Accessor Len() should match managed queue") + assert.Equal(t, f.mq.ByteSize(), accessor.ByteSize(), "Accessor ByteSize() should match managed queue") + assert.Equal(t, f.flowSpec, accessor.FlowSpec(), "Accessor FlowSpec() should match managed queue") + assert.Equal(t, f.mockComparator, accessor.Comparator(), "Accessor Comparator() should match the one from the policy") + assert.Equal(t, f.mockComparator, f.mq.Comparator(), "ManagedQueue Comparator() should match the one from the policy") + + peekedHead, err := accessor.PeekHead() + require.NoError(t, err, "Accessor PeekHead() should not return an error") + assert.Equal(t, item, peekedHead, "Accessor PeekHead() should return the correct item") + + peekedTail, err := accessor.PeekTail() + require.NoError(t, err, "Accessor PeekTail() should not return an error") + assert.Equal(t, item, peekedTail, "Accessor PeekTail() should return the correct item") +} + +func TestManagedQueue_Concurrency(t *testing.T) { + t.Parallel() + + // Use a real `listqueue` since it's concurrent-safe. + lq, err := queue.NewQueueFromName(listqueue.ListQueueName, nil) + require.NoError(t, err, "Setup: creating a real listqueue should not fail") + + reconciler := &testStatsReconciler{} + flowSpec := types.FlowSpecification{ID: "conc-test-flow", Priority: 1} + mq := newManagedQueue(lq, nil, flowSpec, logr.Discard(), reconciler.reconcile) + + const ( + numGoroutines = 20 + opsPerGoroutine = 200 + itemByteSize = 10 + initialItems = 500 + ) + + var wg sync.WaitGroup + var successfulAdds, successfulRemoves atomic.Int64 + + // Use a channel as a concurrent-safe pool of handles for removal. + handles := make(chan types.QueueItemHandle, initialItems+(numGoroutines*opsPerGoroutine)) + + // Pre-fill the queue to give removers something to do immediately. + for range initialItems { + item := typesmocks.NewMockQueueItemAccessor(uint64(itemByteSize), "initial", "flow") + require.NoError(t, mq.Add(item), "Setup: pre-filling queue should not fail") + handles <- item.Handle() + } + // Reset reconciler stats after setup so we only measure the concurrent phase. + *reconciler = testStatsReconciler{} + + // Launch goroutines to perform a mix of concurrent operations. + wg.Add(numGoroutines) + for i := range numGoroutines { + go func(routineID int) { + defer wg.Done() + for j := range opsPerGoroutine { + // Mix up operations between adding and removing. + if (routineID+j)%2 == 0 { + // Add operation + item := typesmocks.NewMockQueueItemAccessor(uint64(itemByteSize), "req", "flow") + if err := mq.Add(item); err == nil { + successfulAdds.Add(1) + handles <- item.Handle() + } + } else { + // Remove operation + select { + case handle := <-handles: + if _, err := mq.Remove(handle); err == nil { + successfulRemoves.Add(1) + } + default: + // No handles available, do nothing. This can happen if removers are faster than adders. + } + } + } + }(i) + } + wg.Wait() + + // All concurrent operations are complete. Drain any remaining items to get the final count. + drainedItems, err := mq.Drain() + require.NoError(t, err, "Draining the queue at the end should not fail") + + // Final consistency checks. + finalItemCount := len(drainedItems) + finalByteSize := mq.ByteSize() + + // The number of items left in the queue should match our tracking. + expectedFinalItemCount := initialItems + int(successfulAdds.Load()) - int(successfulRemoves.Load()) + assert.Equal(t, expectedFinalItemCount, finalItemCount, "Final item count should match initial + adds - removes") + + // The queue's internal stats must be zero after draining. + assert.Zero(t, mq.Len(), "Managed queue length must be zero after drain") + assert.Zero(t, finalByteSize, "Managed queue byte size must be zero after drain") + + // The net change reported to the reconciler must match the net change from the concurrent phase, + // plus the final drain. + netLenChangeDuringConcurrentPhase := successfulAdds.Load() - successfulRemoves.Load() + netByteSizeChangeDuringConcurrentPhase := netLenChangeDuringConcurrentPhase * itemByteSize + + lenDelta, byteSizeDelta, _ := reconciler.getStats() + + expectedLenDelta := netLenChangeDuringConcurrentPhase - int64(finalItemCount) + expectedByteSizeDelta := netByteSizeChangeDuringConcurrentPhase - int64(uint64(finalItemCount)*itemByteSize) + + assert.Equal(t, expectedLenDelta, lenDelta, + "Net length delta in reconciler should match the net change from all operations") + assert.Equal(t, expectedByteSizeDelta, byteSizeDelta, + "Net byte size delta in reconciler should match the net change from all operations") +} diff --git a/pkg/epp/flowcontrol/types/flow.go b/pkg/epp/flowcontrol/types/flow.go index 6c73f90f0..9bf16022e 100644 --- a/pkg/epp/flowcontrol/types/flow.go +++ b/pkg/epp/flowcontrol/types/flow.go @@ -18,13 +18,14 @@ package types // FlowSpecification defines the configuration of a logical flow, encapsulating its identity and registered priority. // -// It acts as the registration key for a flow within the `ports.FlowRegistry`. +// It acts as the registration key for a flow within the `contracts.FlowRegistry`. type FlowSpecification struct { // ID returns the unique name or identifier for this logical flow, corresponding to the value from // `FlowControlRequest.FlowID()`. ID string - // Priority returns the numerical priority level currently associated with this flow within the `ports.FlowRegistry`. + // Priority returns the numerical priority level currently associated with this flow within the + // `contracts.FlowRegistry`. // // Convention: Lower numerical values indicate higher priority. Priority uint diff --git a/pkg/epp/flowcontrol/types/request.go b/pkg/epp/flowcontrol/types/request.go index 4b6ae011b..c4dd996f4 100644 --- a/pkg/epp/flowcontrol/types/request.go +++ b/pkg/epp/flowcontrol/types/request.go @@ -35,11 +35,11 @@ type FlowControlRequest interface { // FlowID returns the unique identifier for the flow this request belongs to (e.g., model name, tenant ID). The // `controller.FlowController` uses this ID, in conjunction with the flow's registered priority, to look up the - // active `ports.ManagedQueue` from the `ports.FlowRegistry`'s `ports.RegistryShard`. + // active `contracts.ManagedQueue` from the `contracts.FlowRegistry`'s `contracts.RegistryShard`. FlowID() string // ByteSize returns the request's size in bytes (e.g., prompt size). This is used by the `controller.FlowController` - // and for managing byte-based capacity limits and for `ports.FlowRegistry` statistics. + // and for managing byte-based capacity limits and for `contracts.FlowRegistry` statistics. ByteSize() uint64 // InitialEffectiveTTL returns the suggested Time-To-Live for this request.