Skip to content

feat: Introduce pluggable inter-flow dispatch policy framework #1167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/epp/flowcontrol/framework/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ var (
// provided, valid `types.QueueItemHandle`. This can occur if the item was removed by a concurrent operation.
ErrQueueItemNotFound = errors.New("queue item not found for the given handle")
)

// Policy Errors
var (
// ErrIncompatiblePriorityType indicates that an `InterFlowDispatchPolicy` (like "BestHead") attempted to compare
// items from two different flow queues whose `ItemComparator`s have different `ScoreType` values, making a
// meaningful comparison impossible.
ErrIncompatiblePriorityType = errors.New("incompatible priority score type for comparison")
)
39 changes: 39 additions & 0 deletions pkg/epp/flowcontrol/framework/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,42 @@ func (m *MockFlowQueueAccessor) Comparator() framework.ItemComparator { return m
func (m *MockFlowQueueAccessor) FlowSpec() types.FlowSpecification { return m.FlowSpecV }

var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{}

// MockPriorityBandAccessor is a mock implementation of the `framework.PriorityBandAccessor` interface.
type MockPriorityBandAccessor struct {
PriorityV uint
PriorityNameV string
FlowIDsV []string
QueueV framework.FlowQueueAccessor // Value to return for any Queue(flowID) call
QueueFuncV func(flowID string) framework.FlowQueueAccessor
IterateQueuesV func(callback func(queue framework.FlowQueueAccessor) bool)
}

func (m *MockPriorityBandAccessor) Priority() uint { return m.PriorityV }
func (m *MockPriorityBandAccessor) PriorityName() string { return m.PriorityNameV }
func (m *MockPriorityBandAccessor) FlowIDs() []string { return m.FlowIDsV }

func (m *MockPriorityBandAccessor) Queue(flowID string) framework.FlowQueueAccessor {
if m.QueueFuncV != nil {
return m.QueueFuncV(flowID)
}
return m.QueueV
}

func (m *MockPriorityBandAccessor) IterateQueues(callback func(queue framework.FlowQueueAccessor) bool) {
if m.IterateQueuesV != nil {
m.IterateQueuesV(callback)
} else {
// Default behavior: iterate based on FlowIDsV and QueueV/QueueFuncV
for _, id := range m.FlowIDsV {
q := m.Queue(id)
if q != nil { // Only call callback if queue exists
if !callback(q) {
return
}
}
}
}
}

var _ framework.PriorityBandAccessor = &MockPriorityBandAccessor{}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Flow Controller Inter-Flow Dispatch Policy Plugins

This directory contains concrete implementations of the [`framework.InterFlowDispatchPolicy`](../../../policies.go)
interface. These policies are responsible for determining *which flow's queue* gets the next opportunity to dispatch a
request to the scheduler.

## Overview

The `controller.FlowController` uses a two-tier policy system to manage requests. `framework.InterFlowDispatchPolicy`
plugins represent the second tier, making strategic decisions about fairness *between* different logical flows (e.g.,
between different tenants or models).

This contrasts with the `framework.IntraFlowDispatchPolicy`, which is responsible for **temporal scheduling**: deciding
the order of requests *within* a single flow's queue after it has been selected by the inter-flow policy.

Key responsibilities and characteristics of a `framework.InterFlowDispatchPolicy`:

1. **Queue Selection (`SelectQueue`)**: The primary method, `SelectQueue(band framework.PriorityBandAccessor)`,
inspects a set of queues within a single priority level (a "band") and decides which queue, if any, should be
selected to dispatch a request from next.

2. **Fairness Across Flows**: The core purpose of this policy is to enforce a fairness doctrine across multiple
competing flows. This could be simple round-robin, or more complex weighted fairness schemes.

3. **Stateless vs. Stateful**: Policies can be stateless (like `besthead`, which makes a decision based only on the
current state of the queues) or stateful (like `roundrobin`, which needs to remember which queue it selected last).
Any state must be managed in a goroutine-safe manner.

The `framework.InterFlowDispatchPolicy` is critical for multi-tenancy and preventing any single high-traffic flow from
starving all others.

## Contributing a New `framework.InterFlowDispatchPolicy` Implementation

To contribute a new dispatch policy implementation, follow these steps:

1. **Define Your Implementation**
- Create a new Go package in a subdirectory (e.g., `mycustompolicy/`).
- Implement the `framework.InterFlowDispatchPolicy` interface.
- Ensure all methods are goroutine-safe if your policy maintains any internal state.

2. **Register Your Policy**
- In an `init()` function within your policy's Go file, call [`MustRegisterPolicy()`](./factory.go) with a unique
name and a constructor function that matches the `PolicyConstructor` signature.

3. **Add to the Functional Test**
- Add a blank import for your new package to [`functional_test.go`](./functional_test.go). Your policy will then
be automatically included in the functional test suite, which validates the basic
`framework.InterFlowDispatchPolicy` contract (e.g., correct initialization, handling of nil/empty bands).

4. **Add Policy-Specific Tests**
- The functional test suite only validates the universal contract. You MUST add a separate `_test.go` file within
your package to test the specific logic of your policy.
- For example, your tests should validate that `SelectQueue()` correctly implements your desired selection logic
(e.g., that round-robin correctly cycles through queues).

5. **Documentation**
- Add a package-level GoDoc comment to your new policy's Go file, explaining its behavior and any trade-offs.
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package besthead provides a `framework.InterFlowDispatchPolicy` that selects the queue containing the single "best"
// item from across all queues in a priority band.
package besthead

import (
"fmt"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
)

// BestHeadPolicyName is the name of the Best Head policy implementation.
const BestHeadPolicyName = "BestHead"

func init() {
dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(BestHeadPolicyName),
func() (framework.InterFlowDispatchPolicy, error) {
return newBestHead(), nil
})
}

type bestHead struct{}

func newBestHead() *bestHead {
return &bestHead{}
}

// Name returns the name of the policy.
func (p *bestHead) Name() string {
return BestHeadPolicyName
}

// SelectQueue implements a greedy strategy that bypasses fairness concerns to select the queue containing the single
// "best" item from across all queues in the priority band. It iterates through all non-empty queues, peeks at their
// head items, and uses the `framework.ItemComparator` from each queue to find the highest-priority item overall.
//
// This policy is useful for maximizing utilization when fairness between flows is not a concern. It requires that all
// queues being compared have a compatible `framework.ScoreType` to ensure the comparison is meaningful. If an
// incompatible comparator is found, the selection fails with an error.
func (p *bestHead) SelectQueue(band framework.PriorityBandAccessor) (framework.FlowQueueAccessor, error) {
if band == nil {
return nil, nil
}

var bestQueue framework.FlowQueueAccessor
var bestItem types.QueueItemAccessor

var iterationErr error
band.IterateQueues(func(queue framework.FlowQueueAccessor) (keepIterating bool) {
if queue == nil || queue.Len() == 0 {
return true
}

item, err := queue.PeekHead()
if err != nil || item == nil {
return true
}

if bestQueue == nil {
bestQueue = queue
bestItem = item
return true
}

if queue.Comparator().ScoreType() != bestQueue.Comparator().ScoreType() {
iterationErr = fmt.Errorf("%w: expected %q, got %q", framework.ErrIncompatiblePriorityType,
bestQueue.Comparator().ScoreType(), queue.Comparator().ScoreType())
return false
}

if bestQueue.Comparator().Func()(item, bestItem) {
bestQueue = queue
bestItem = item
}
return true
})

if iterationErr != nil {
return nil, iterationErr
}
return bestQueue, nil
}
Loading