Skip to content

feat(flowcontrol): Introduce ManagedQueue and Service Contracts #1174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/epp/flowcontrol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ graph LR

subgraph Internal Interactions
direction LR
D(Ports) -- "abstracts state" --> E(Flow Registry);
D(Contracts) -- "abstracts state" --> E(Flow Registry);
D -- "abstracts load" --> SD(Saturation Detector);
E -- "configures" --> F(Framework);
F -- "defines" --> P(Plugins: Queues & Policies);
Expand Down Expand Up @@ -107,10 +107,10 @@ their justifications, please refer to the detailed documentation within the rele
concurrent-safe request storage. It uses a `QueueCapability` system that allows for diverse and extensible queue
implementations (e.g., FIFO, Priority Heap) while maintaining a stable interface.

4. **The `FlowRegistry` (`./registry`, `./ports`)**: This is the stateful control plane of the system. It manages the
configuration and lifecycle of all flows, policies, and queues. It presents a sharded view of its state to the
4. **The `FlowRegistry` (`./registry`, `./contracts`)**: This is the stateful control plane of the system. It manages
the configuration and lifecycle of all flows, policies, and queues. It presents a sharded view of its state to the
`FlowController` workers to enable parallel operation with minimal lock contention.

5. **Core Types and Service Ports (`./types`, `./ports`)**: These packages define the foundational data structures
(e.g., `FlowControlRequest`), errors, and service interfaces that decouple the engine from its dependencies,
following a "Ports and Adapters" architectural style.
5. **Core Types and Service Contracts (`./types`, `./contracts`)**: These packages define the foundational data
structures (e.g., `FlowControlRequest`), errors, and service interfaces that decouple the engine from its
dependencies, following a "Ports and Adapters" architectural style.
45 changes: 45 additions & 0 deletions pkg/epp/flowcontrol/contracts/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package contracts defines the service interfaces that decouple the core `controller.FlowController` engine from its
// primary dependencies. In alignment with a "Ports and Adapters" (or "Hexagonal") architectural style, these
// interfaces represent the "ports" through which the engine communicates.
//
// This package contains the primary service contracts for the Flow Registry and Saturation Detector.
package contracts

import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
)

// ManagedQueue defines the interface for a flow's queue instance on a specific shard.
// It wraps an underlying `framework.SafeQueue`, augmenting it with lifecycle validation against the `FlowRegistry` and
// integrating atomic statistics updates.
//
// Conformance:
// - All methods (including those embedded from `framework.SafeQueue`) MUST be goroutine-safe.
// - Mutating methods (`Add()`, `Remove()`, `CleanupExpired()`, `Drain()`) MUST ensure the flow instance still exists
// and is valid within the `FlowRegistry` before proceeding. They MUST also atomically update relevant statistics
// (e.g., queue length, byte size) at both the queue and priority-band levels.
type ManagedQueue interface {
framework.SafeQueue

// FlowQueueAccessor returns a read-only, flow-aware accessor for this queue.
// This accessor is primarily used by policy plugins to inspect the queue's state in a structured way.
//
// Conformance: This method MUST NOT return nil.
FlowQueueAccessor() framework.FlowQueueAccessor
}
2 changes: 1 addition & 1 deletion pkg/epp/flowcontrol/framework/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// `SafeQueue` Errors
//
// These errors relate to operations directly on a `SafeQueue` implementation. They are returned by `SafeQueue` methods
// and might be handled or wrapped by the `ports.FlowRegistry`'s `ports.ManagedQueue` or the
// and might be handled or wrapped by the `contracts.FlowRegistry`'s `contracts.ManagedQueue` or the
// `controller.FlowController`.
var (
// ErrNilQueueItem indicates that a nil `types.QueueItemAccessor` was passed to `SafeQueue.Add()`.
Expand Down
81 changes: 81 additions & 0 deletions pkg/epp/flowcontrol/framework/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,84 @@ func (m *MockPriorityBandAccessor) IterateQueues(callback func(queue framework.F
}

var _ framework.PriorityBandAccessor = &MockPriorityBandAccessor{}

// MockSafeQueue is a mock implementation of the `framework.SafeQueue` interface.
type MockSafeQueue struct {
NameV string
CapabilitiesV []framework.QueueCapability
LenV int
ByteSizeV uint64
PeekHeadV types.QueueItemAccessor
PeekHeadErrV error
PeekTailV types.QueueItemAccessor
PeekTailErrV error
AddFunc func(item types.QueueItemAccessor) error
RemoveFunc func(handle types.QueueItemHandle) (types.QueueItemAccessor, error)
CleanupFunc func(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error)
DrainFunc func() ([]types.QueueItemAccessor, error)
}

func (m *MockSafeQueue) Name() string { return m.NameV }
func (m *MockSafeQueue) Capabilities() []framework.QueueCapability { return m.CapabilitiesV }
func (m *MockSafeQueue) Len() int { return m.LenV }
func (m *MockSafeQueue) ByteSize() uint64 { return m.ByteSizeV }

func (m *MockSafeQueue) PeekHead() (types.QueueItemAccessor, error) {
return m.PeekHeadV, m.PeekHeadErrV
}

func (m *MockSafeQueue) PeekTail() (types.QueueItemAccessor, error) {
return m.PeekTailV, m.PeekTailErrV
}

func (m *MockSafeQueue) Add(item types.QueueItemAccessor) error {
if m.AddFunc != nil {
return m.AddFunc(item)
}
return nil
}

func (m *MockSafeQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) {
if m.RemoveFunc != nil {
return m.RemoveFunc(handle)
}
return nil, nil
}

func (m *MockSafeQueue) Cleanup(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) {
if m.CleanupFunc != nil {
return m.CleanupFunc(predicate)
}
return nil, nil
}

func (m *MockSafeQueue) Drain() ([]types.QueueItemAccessor, error) {
if m.DrainFunc != nil {
return m.DrainFunc()
}
return nil, nil
}

var _ framework.SafeQueue = &MockSafeQueue{}

// MockIntraFlowDispatchPolicy is a mock implementation of the `framework.IntraFlowDispatchPolicy` interface.
type MockIntraFlowDispatchPolicy struct {
NameV string
SelectItemV types.QueueItemAccessor
SelectItemErrV error
ComparatorV framework.ItemComparator
RequiredQueueCapabilitiesV []framework.QueueCapability
}

func (m *MockIntraFlowDispatchPolicy) Name() string { return m.NameV }
func (m *MockIntraFlowDispatchPolicy) Comparator() framework.ItemComparator { return m.ComparatorV }

func (m *MockIntraFlowDispatchPolicy) SelectItem(queue framework.FlowQueueAccessor) (types.QueueItemAccessor, error) {
return m.SelectItemV, m.SelectItemErrV
}

func (m *MockIntraFlowDispatchPolicy) RequiredQueueCapabilities() []framework.QueueCapability {
return m.RequiredQueueCapabilitiesV
}

var _ framework.IntraFlowDispatchPolicy = &MockIntraFlowDispatchPolicy{}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Key responsibilities and characteristics of a `framework.IntraFlowDispatchPolicy
3. **Queue Compatibility (`RequiredQueueCapabilities`)**: The policy specifies the capabilities its associated
[`framework.SafeQueue`](../../../queue.go) must support for it to function correctly. For example, a simple FCFS
policy would require `framework.CapabilityFIFO`, while a more complex, priority-based policy would require
`framework.CapabilityPriorityConfigurable`. The `ports.FlowRegistry` uses this information to pair policies with
`framework.CapabilityPriorityConfigurable`. The `contracts.FlowRegistry` uses this information to pair policies with
compatible queues.

The `framework.IntraFlowDispatchPolicy` allows for fine-grained control over how individual requests within a single flow are
Expand Down
35 changes: 18 additions & 17 deletions pkg/epp/flowcontrol/framework/plugins/queue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defines core, self-contained queue data structures used by the `controller.FlowC
## Overview

The `controller.FlowController` manages requests by organizing them into queues. Each logical "flow" within a given
priority band has its own `ports.ManagedQueue` instance, which wraps a `framework.SafeQueue`. This design allows the
priority band has its own `contracts.ManagedQueue` instance, which wraps a `framework.SafeQueue`. This design allows the
`controller.FlowController` to apply policies at both the inter-flow (across different flows) and intra-flow (within a
single flow's queue) levels.

Expand All @@ -16,12 +16,12 @@ design allows for:
- **Different Queuing Disciplines**: A basic FIFO queue ([`listqueue`](./listqueue/)) is provided, but other disciplines
like priority queues ([`maxminheap`](./maxminheap/)) can be used for more complex ordering requirements.
- **Specialized Capabilities**: Policies can declare `RequiredQueueCapabilities()` (e.g., `framework.CapabilityFIFO` or
`framework.CapabilityPriorityConfigurable`). The `ports.FlowRegistry` pairs the policy with a queue that provides the
necessary capabilities.
`framework.CapabilityPriorityConfigurable`). The `contracts.FlowRegistry` pairs the policy with a queue that provides
the necessary capabilities.
- **Performance Optimization**: Different queue implementations offer varying performance characteristics, which can be
compared using the centralized benchmark suite to select the best fit for a given workload.

## Contributing a New `SafeQueue` Implementation
## Contributing a New `framework.SafeQueue` Implementation

To contribute a new queue implementation, follow these steps:

Expand Down Expand Up @@ -73,30 +73,31 @@ The suite includes the following scenarios:

### Latest Results

*Last Updated: 2025-07-10*
*Last Updated: Commit `35a9d6c`*
*(CPU: AMD EPYC 7B12)*

| Benchmark | Implementation | Iterations | ns/op | B/op | allocs/op |
| --------------------------- | -------------- | ---------- | ------- | ----- | --------- |
| **AddRemove** | `ListQueue` | 1,889,844 | 609.0 | 224 | 5 |
| | `MaxMinHeap` | 1,660,987 | 696.7 | 184 | 4 |
| **AddPeekRemove** | `ListQueue` | 3,884,938 | 298.0 | 224 | 5 |
| | `MaxMinHeap` | 1,857,448 | 615.9 | 184 | 4 |
| **AddPeekTailRemove** | `ListQueue` | 3,576,487 | 308.4 | 224 | 5 |
| | `MaxMinHeap` | 2,113,134 | 535.3 | 184 | 4 |
| **BulkAddThenBulkRemove** | `ListQueue` | 24,032 | 49,861 | 24801 | 698 |
| | `MaxMinHeap` | 10,000 | 108,868 | 20787 | 597 |
| **HighContention** | `ListQueue` | 484,574 | 2,328 | 896 | 20 |
| | `MaxMinHeap` | 84,806 | 18,679 | 783 | 16 |
| **AddRemove** | `ListQueue` | 1,906,153 | 595.5 | 224 | 5 |
| | `MaxMinHeap` | 1,763,473 | 668.9 | 184 | 4 |
| **AddPeekRemove** | `ListQueue` | 3,547,653 | 298.5 | 224 | 5 |
| | `MaxMinHeap` | 1,986,780 | 751.5 | 184 | 4 |
| **AddPeekTailRemove** | `ListQueue` | 3,732,302 | 303.3 | 224 | 5 |
| | `MaxMinHeap` | 2,006,383 | 551.6 | 184 | 4 |
| **BulkAddThenBulkRemove** | `ListQueue` | 24,046 | 47,240 | 24800 | 698 |
| | `MaxMinHeap` | 9,410 | 110,929 | 20786 | 597 |
| **HighContention** | `ListQueue` | 21,283,537 | 47.53 | 11 | 0 |
| | `MaxMinHeap` | 16,953,121 | 74.09 | 4 | 0 |

### Interpretation of Results

The benchmark results highlight the trade-offs between the different queue implementations based on their underlying
data structures:

- **`ListQueue`**: As a linked list, it excels in scenarios involving frequent additions or removals from either end of
the queue (`AddPeekRemove`, `AddPeekTailRemove`), which are O(1) operations. Its performance is less competitive in high-contention and bulk scenarios, which reflects the necessary per-item memory allocation and pointer manipulation
overhead.
the queue (`AddPeekRemove`, `AddPeekTailRemove`), which are O(1) operations. The `HighContention` benchmark shows that
its simple, low-overhead operations are also extremely performant for consumer throughput even under heavy concurrent
load.
- **`MaxMinHeap`**: As a slice-based heap, it has a lower allocation overhead per operation, making it efficient for
high-throughput `AddRemove` cycles. Peeking and removing items involves maintaining the heap property, which has an
O(log n) cost, making individual peek operations slower than `ListQueue`.
Expand Down
Loading