From b3d0cab137a842590e728c01322f9b7d443f8637 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Tue, 15 Jul 2025 23:08:17 +0000 Subject: [PATCH] feat: Introduce pluggable inter-flow framework This commit introduces the `InterFlowDispatchPolicy` framework, the third and final major component of the new pluggable flow control system. This framework decouples the logic for selecting which flow's queue to service next (fairness between flows) from the core controller. This completes the two-tier policy model, where the `InterFlowDispatchPolicy` makes the strategic decision about which flow to service, and the `IntraFlowDispatchPolicy` makes the tactical decision of which request to select from within that flow's queue. Key components include: - `framework.InterFlowDispatchPolicy`: The core interface that defines the contract for selecting a queue from a priority band. - `framework.PriorityBandAccessor`: A read-only interface that provides policies with safe access to the state of all queues within a priority level. - A factory and registration system for discovering and instantiating policy plugins by name. - A comprehensive functional conformance test suite to validate the contract for all policy plugins. - A `roundrobin` policy for fair, sequential queue selection. - A `besthead` policy for greedy, utilization-focused queue selection. --- pkg/epp/flowcontrol/framework/errors.go | 8 + pkg/epp/flowcontrol/framework/mocks/mocks.go | 39 +++ .../policies/interflow/dispatch/README.md | 57 +++ .../interflow/dispatch/besthead/besthead.go | 99 ++++++ .../dispatch/besthead/besthead_test.go | 326 ++++++++++++++++++ .../policies/interflow/dispatch/factory.go | 63 ++++ .../interflow/dispatch/functional_test.go | 145 ++++++++ .../dispatch/roundrobin/roundrobin.go | 118 +++++++ .../dispatch/roundrobin/roundrobin_test.go | 256 ++++++++++++++ .../policies/intraflow/dispatch/README.md | 8 +- .../framework/plugins/queue/README.md | 2 +- pkg/epp/flowcontrol/framework/policies.go | 52 ++- 12 files changed, 1166 insertions(+), 7 deletions(-) create mode 100644 pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/README.md create mode 100644 pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go create mode 100644 pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go create mode 100644 pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/factory.go create mode 100644 pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go create mode 100644 pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin.go create mode 100644 pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin_test.go diff --git a/pkg/epp/flowcontrol/framework/errors.go b/pkg/epp/flowcontrol/framework/errors.go index e994aafa8..76ace3f43 100644 --- a/pkg/epp/flowcontrol/framework/errors.go +++ b/pkg/epp/flowcontrol/framework/errors.go @@ -42,3 +42,11 @@ var ( // provided, valid `types.QueueItemHandle`. This can occur if the item was removed by a concurrent operation. ErrQueueItemNotFound = errors.New("queue item not found for the given handle") ) + +// Policy Errors +var ( + // ErrIncompatiblePriorityType indicates that an `InterFlowDispatchPolicy` (like "BestHead") attempted to compare + // items from two different flow queues whose `ItemComparator`s have different `ScoreType` values, making a + // meaningful comparison impossible. + ErrIncompatiblePriorityType = errors.New("incompatible priority score type for comparison") +) diff --git a/pkg/epp/flowcontrol/framework/mocks/mocks.go b/pkg/epp/flowcontrol/framework/mocks/mocks.go index 86a818951..3cf4f4f2b 100644 --- a/pkg/epp/flowcontrol/framework/mocks/mocks.go +++ b/pkg/epp/flowcontrol/framework/mocks/mocks.go @@ -63,3 +63,42 @@ func (m *MockFlowQueueAccessor) Comparator() framework.ItemComparator { return m func (m *MockFlowQueueAccessor) FlowSpec() types.FlowSpecification { return m.FlowSpecV } var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{} + +// MockPriorityBandAccessor is a mock implementation of the `framework.PriorityBandAccessor` interface. +type MockPriorityBandAccessor struct { + PriorityV uint + PriorityNameV string + FlowIDsV []string + QueueV framework.FlowQueueAccessor // Value to return for any Queue(flowID) call + QueueFuncV func(flowID string) framework.FlowQueueAccessor + IterateQueuesV func(callback func(queue framework.FlowQueueAccessor) bool) +} + +func (m *MockPriorityBandAccessor) Priority() uint { return m.PriorityV } +func (m *MockPriorityBandAccessor) PriorityName() string { return m.PriorityNameV } +func (m *MockPriorityBandAccessor) FlowIDs() []string { return m.FlowIDsV } + +func (m *MockPriorityBandAccessor) Queue(flowID string) framework.FlowQueueAccessor { + if m.QueueFuncV != nil { + return m.QueueFuncV(flowID) + } + return m.QueueV +} + +func (m *MockPriorityBandAccessor) IterateQueues(callback func(queue framework.FlowQueueAccessor) bool) { + if m.IterateQueuesV != nil { + m.IterateQueuesV(callback) + } else { + // Default behavior: iterate based on FlowIDsV and QueueV/QueueFuncV + for _, id := range m.FlowIDsV { + q := m.Queue(id) + if q != nil { // Only call callback if queue exists + if !callback(q) { + return + } + } + } + } +} + +var _ framework.PriorityBandAccessor = &MockPriorityBandAccessor{} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/README.md b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/README.md new file mode 100644 index 000000000..4e946dc10 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/README.md @@ -0,0 +1,57 @@ +# Flow Controller Inter-Flow Dispatch Policy Plugins + +This directory contains concrete implementations of the [`framework.InterFlowDispatchPolicy`](../../../policies.go) +interface. These policies are responsible for determining *which flow's queue* gets the next opportunity to dispatch a +request to the scheduler. + +## Overview + +The `controller.FlowController` uses a two-tier policy system to manage requests. `framework.InterFlowDispatchPolicy` +plugins represent the second tier, making strategic decisions about fairness *between* different logical flows (e.g., +between different tenants or models). + +This contrasts with the `framework.IntraFlowDispatchPolicy`, which is responsible for **temporal scheduling**: deciding +the order of requests *within* a single flow's queue after it has been selected by the inter-flow policy. + +Key responsibilities and characteristics of a `framework.InterFlowDispatchPolicy`: + +1. **Queue Selection (`SelectQueue`)**: The primary method, `SelectQueue(band framework.PriorityBandAccessor)`, + inspects a set of queues within a single priority level (a "band") and decides which queue, if any, should be + selected to dispatch a request from next. + +2. **Fairness Across Flows**: The core purpose of this policy is to enforce a fairness doctrine across multiple + competing flows. This could be simple round-robin, or more complex weighted fairness schemes. + +3. **Stateless vs. Stateful**: Policies can be stateless (like `besthead`, which makes a decision based only on the + current state of the queues) or stateful (like `roundrobin`, which needs to remember which queue it selected last). + Any state must be managed in a goroutine-safe manner. + +The `framework.InterFlowDispatchPolicy` is critical for multi-tenancy and preventing any single high-traffic flow from +starving all others. + +## Contributing a New `framework.InterFlowDispatchPolicy` Implementation + +To contribute a new dispatch policy implementation, follow these steps: + +1. **Define Your Implementation** + - Create a new Go package in a subdirectory (e.g., `mycustompolicy/`). + - Implement the `framework.InterFlowDispatchPolicy` interface. + - Ensure all methods are goroutine-safe if your policy maintains any internal state. + +2. **Register Your Policy** + - In an `init()` function within your policy's Go file, call [`MustRegisterPolicy()`](./factory.go) with a unique + name and a constructor function that matches the `PolicyConstructor` signature. + +3. **Add to the Functional Test** + - Add a blank import for your new package to [`functional_test.go`](./functional_test.go). Your policy will then + be automatically included in the functional test suite, which validates the basic + `framework.InterFlowDispatchPolicy` contract (e.g., correct initialization, handling of nil/empty bands). + +4. **Add Policy-Specific Tests** + - The functional test suite only validates the universal contract. You MUST add a separate `_test.go` file within + your package to test the specific logic of your policy. + - For example, your tests should validate that `SelectQueue()` correctly implements your desired selection logic + (e.g., that round-robin correctly cycles through queues). + +5. **Documentation** + - Add a package-level GoDoc comment to your new policy's Go file, explaining its behavior and any trade-offs. \ No newline at end of file diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go new file mode 100644 index 000000000..41033e755 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go @@ -0,0 +1,99 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package besthead provides a `framework.InterFlowDispatchPolicy` that selects the queue containing the single "best" +// item from across all queues in a priority band. +package besthead + +import ( + "fmt" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" +) + +// BestHeadPolicyName is the name of the Best Head policy implementation. +const BestHeadPolicyName = "BestHead" + +func init() { + dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(BestHeadPolicyName), + func() (framework.InterFlowDispatchPolicy, error) { + return newBestHead(), nil + }) +} + +type bestHead struct{} + +func newBestHead() *bestHead { + return &bestHead{} +} + +// Name returns the name of the policy. +func (p *bestHead) Name() string { + return BestHeadPolicyName +} + +// SelectQueue implements a greedy strategy that bypasses fairness concerns to select the queue containing the single +// "best" item from across all queues in the priority band. It iterates through all non-empty queues, peeks at their +// head items, and uses the `framework.ItemComparator` from each queue to find the highest-priority item overall. +// +// This policy is useful for maximizing utilization when fairness between flows is not a concern. It requires that all +// queues being compared have a compatible `framework.ScoreType` to ensure the comparison is meaningful. If an +// incompatible comparator is found, the selection fails with an error. +func (p *bestHead) SelectQueue(band framework.PriorityBandAccessor) (framework.FlowQueueAccessor, error) { + if band == nil { + return nil, nil + } + + var bestQueue framework.FlowQueueAccessor + var bestItem types.QueueItemAccessor + + var iterationErr error + band.IterateQueues(func(queue framework.FlowQueueAccessor) (keepIterating bool) { + if queue == nil || queue.Len() == 0 { + return true + } + + item, err := queue.PeekHead() + if err != nil || item == nil { + return true + } + + if bestQueue == nil { + bestQueue = queue + bestItem = item + return true + } + + if queue.Comparator().ScoreType() != bestQueue.Comparator().ScoreType() { + iterationErr = fmt.Errorf("%w: expected %q, got %q", framework.ErrIncompatiblePriorityType, + bestQueue.Comparator().ScoreType(), queue.Comparator().ScoreType()) + return false + } + + if bestQueue.Comparator().Func()(item, bestItem) { + bestQueue = queue + bestItem = item + } + return true + }) + + if iterationErr != nil { + return nil, iterationErr + } + return bestQueue, nil +} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go new file mode 100644 index 000000000..ef7cbb369 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go @@ -0,0 +1,326 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package besthead + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" +) + +const ( + flow1 = "flow1" + flow2 = "flow2" +) + +// enqueueTimeComparatorFunc is a test utility. Lower enqueue time is better. +func enqueueTimeComparatorFunc(a, b types.QueueItemAccessor) bool { + return a.EnqueueTime().Before(b.EnqueueTime()) +} + +const commonScoreType = "enqueue_time_ns_asc" + +func newTestComparator() *frameworkmocks.MockItemComparator { + return &frameworkmocks.MockItemComparator{ + ScoreTypeV: commonScoreType, + FuncV: enqueueTimeComparatorFunc, + } +} + +func TestBestHead_Name(t *testing.T) { + t.Parallel() + policy := newBestHead() + assert.Equal(t, BestHeadPolicyName, policy.Name(), "Name should match the policy's constant") +} + +func TestBestHead_SelectQueue(t *testing.T) { + t.Parallel() + policy := newBestHead() + + t.Run("SelectQueue_Logic", func(t *testing.T) { + t.Parallel() + + t.Run("BasicSelection_TwoQueues", func(t *testing.T) { + t.Parallel() + now := time.Now() + itemBetter := typesmocks.NewMockQueueItemAccessor(10, "itemBetter", flow1) + itemBetter.EnqueueTimeV = now.Add(-10 * time.Second) // Earlier enqueue time = better + queue1 := &frameworkmocks.MockFlowQueueAccessor{ + LenV: 1, + PeekHeadV: itemBetter, + FlowSpecV: types.FlowSpecification{ID: flow1}, + ComparatorV: newTestComparator(), + } + + itemWorse := typesmocks.NewMockQueueItemAccessor(20, "itemWorse", flow2) + itemWorse.EnqueueTimeV = now.Add(-5 * time.Second) // Later enqueue time = worse + queue2 := &frameworkmocks.MockFlowQueueAccessor{ + LenV: 1, + PeekHeadV: itemWorse, + FlowSpecV: types.FlowSpecification{ID: flow2}, + ComparatorV: newTestComparator(), + } + + mockBand := &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{flow1, flow2}, + QueueFuncV: func(id string) framework.FlowQueueAccessor { + if id == flow1 { + return queue1 + } + if id == flow2 { + return queue2 + } + return nil + }, + } + + selected, err := policy.SelectQueue(mockBand) + require.NoError(t, err, "SelectQueue should not error with valid inputs") + require.NotNil(t, selected, "SelectQueue should select a queue") + assert.Equal(t, flow1, selected.FlowSpec().ID, "Should select queue1 with the better item") + }) + + t.Run("IgnoresEmptyQueues", func(t *testing.T) { + t.Parallel() + now := time.Now() + itemBetter := typesmocks.NewMockQueueItemAccessor(10, "itemBetter", flow1) + itemBetter.EnqueueTimeV = now.Add(-10 * time.Second) + queue1 := &frameworkmocks.MockFlowQueueAccessor{ // Non-empty + LenV: 1, + PeekHeadV: itemBetter, + FlowSpecV: types.FlowSpecification{ID: flow1}, + ComparatorV: newTestComparator(), + } + queueEmpty := &frameworkmocks.MockFlowQueueAccessor{ // Empty + LenV: 0, + PeekHeadErrV: framework.ErrQueueEmpty, + FlowSpecV: types.FlowSpecification{ID: "flowEmpty"}, + ComparatorV: newTestComparator(), + } + itemWorse := typesmocks.NewMockQueueItemAccessor(20, "itemWorse", flow2) + itemWorse.EnqueueTimeV = now.Add(-5 * time.Second) + queue2 := &frameworkmocks.MockFlowQueueAccessor{ // Non-empty + LenV: 1, + PeekHeadV: itemWorse, + FlowSpecV: types.FlowSpecification{ID: flow2}, + ComparatorV: newTestComparator(), + } + + mockBand := &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{"flowEmpty", flow1, flow2}, + QueueFuncV: func(id string) framework.FlowQueueAccessor { + switch id { + case flow1: + return queue1 + case "flowEmpty": + return queueEmpty + case flow2: + return queue2 + } + return nil + }, + } + selected, err := policy.SelectQueue(mockBand) + require.NoError(t, err, "SelectQueue should not error when some queues are empty") + require.NotNil(t, selected, "SelectQueue should select a queue from the non-empty set") + assert.Equal(t, flow1, selected.FlowSpec().ID, "Should select queue1, ignoring empty queue") + }) + + t.Run("SingleNonEmptyQueue", func(t *testing.T) { + t.Parallel() + item := typesmocks.NewMockQueueItemAccessor(10, "item", flow1) + queue1 := &frameworkmocks.MockFlowQueueAccessor{ + LenV: 1, + PeekHeadV: item, + FlowSpecV: types.FlowSpecification{ID: flow1}, + ComparatorV: newTestComparator(), + } + mockBand := &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{flow1}, + QueueFuncV: func(id string) framework.FlowQueueAccessor { + if id == flow1 { + return queue1 + } + return nil + }, + } + selected, err := policy.SelectQueue(mockBand) + require.NoError(t, err, "SelectQueue should not error for a single valid queue") + require.NotNil(t, selected, "SelectQueue should select the only available queue") + assert.Equal(t, flow1, selected.FlowSpec().ID, "Should select the only non-empty queue") + }) + }) + + t.Run("SelectQueue_ComparatorCompatibility", func(t *testing.T) { + t.Parallel() + queue1 := &frameworkmocks.MockFlowQueueAccessor{ + LenV: 1, + PeekHeadV: typesmocks.NewMockQueueItemAccessor(10, "item1", flow1), + FlowSpecV: types.FlowSpecification{ID: flow1}, + ComparatorV: &frameworkmocks.MockItemComparator{ + ScoreTypeV: "typeA", + FuncV: enqueueTimeComparatorFunc, + }, + } + queue2 := &frameworkmocks.MockFlowQueueAccessor{ + LenV: 1, + PeekHeadV: typesmocks.NewMockQueueItemAccessor(20, "item2", flow2), + FlowSpecV: types.FlowSpecification{ID: flow2}, + ComparatorV: &frameworkmocks.MockItemComparator{ + ScoreTypeV: "typeB", // Different ScoreType + FuncV: enqueueTimeComparatorFunc, + }, + } + mockBand := &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{flow1, flow2}, + QueueFuncV: func(id string) framework.FlowQueueAccessor { + if id == flow1 { + return queue1 + } + if id == flow2 { + return queue2 + } + return nil + }, + } + selected, err := policy.SelectQueue(mockBand) + assert.Nil(t, selected, "SelectQueue should return a nil queue on incompatible comparator error") + require.Error(t, err, "SelectQueue should return an error for incompatible comparators") + assert.ErrorIs(t, err, framework.ErrIncompatiblePriorityType, "Error should be ErrIncompatiblePriorityType") + }) + + t.Run("SelectQueue_EdgeCase", func(t *testing.T) { + // Scenarios like NilBand, NoQueuesInBand, AllQueuesEmpty are covered by functional tests. + // These tests focus on edge cases more specific to BestHead's logic. + t.Parallel() + + t.Run("QueuePeekHeadErrors", func(t *testing.T) { + t.Parallel() + queue1 := &frameworkmocks.MockFlowQueueAccessor{ + LenV: 1, // Non-empty + PeekHeadErrV: errors.New("internal peek error"), + FlowSpecV: types.FlowSpecification{ID: flow1}, + ComparatorV: newTestComparator(), + } + queue2 := &frameworkmocks.MockFlowQueueAccessor{ + LenV: 1, + PeekHeadV: typesmocks.NewMockQueueItemAccessor(10, "item2", flow2), + FlowSpecV: types.FlowSpecification{ID: flow2}, + ComparatorV: newTestComparator(), + } + + mockBand := &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{flow1, flow2}, + QueueFuncV: func(id string) framework.FlowQueueAccessor { + if id == flow1 { + return queue1 + } + if id == flow2 { + return queue2 + } + return nil + }, + } + selected, err := policy.SelectQueue(mockBand) + require.NoError(t, err, "Policy should gracefully skip queue with PeekHead error") + require.NotNil(t, selected, "Policy should select the valid queue (flow2)") + assert.Equal(t, flow2, selected.FlowSpec().ID, "Should select the valid queue when the other has a peek error") + }) + + t.Run("QueueComparatorIsNil", func(t *testing.T) { + t.Parallel() + queue1 := &frameworkmocks.MockFlowQueueAccessor{ + LenV: 1, + PeekHeadV: typesmocks.NewMockQueueItemAccessor(10, "item1", flow1), + FlowSpecV: types.FlowSpecification{ID: flow1}, + ComparatorV: nil, // Nil Comparator + } + queue2 := &frameworkmocks.MockFlowQueueAccessor{ // A valid queue + LenV: 1, + PeekHeadV: typesmocks.NewMockQueueItemAccessor(20, "item2", flow2), + FlowSpecV: types.FlowSpecification{ID: flow2}, + ComparatorV: newTestComparator(), + } + + mockBand := &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{flow1, flow2}, + QueueFuncV: func(id string) framework.FlowQueueAccessor { + if id == flow1 { + return queue1 + } + if id == flow2 { + return queue2 + } + return nil + }, + } + + require.NotNil(t, queue1, "Setup: queue1 should not be nil") + require.Nil(t, queue1.Comparator(), "Setup: queue1's comparator should be nil for this test") + assert.Panics(t, func() { + _, _ = policy.SelectQueue(mockBand) + }, "Policy should panic if a queue's comparator is nil when it's about to be used.") + }) + + t.Run("ComparatorFuncIsNil", func(t *testing.T) { + t.Parallel() + queue1 := &frameworkmocks.MockFlowQueueAccessor{ + LenV: 1, + PeekHeadV: typesmocks.NewMockQueueItemAccessor(10, "item1", flow1), + FlowSpecV: types.FlowSpecification{ID: flow1}, + ComparatorV: &frameworkmocks.MockItemComparator{ // Comparator exists + ScoreTypeV: commonScoreType, + FuncV: nil, // But its Func is nil + }, + } + queue2 := &frameworkmocks.MockFlowQueueAccessor{ // A valid queue + LenV: 1, + PeekHeadV: typesmocks.NewMockQueueItemAccessor(20, "item2", flow2), + FlowSpecV: types.FlowSpecification{ID: flow2}, + ComparatorV: newTestComparator(), + } + mockBand := &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{flow1, flow2}, + QueueFuncV: func(id string) framework.FlowQueueAccessor { + if id == flow1 { + return queue1 + } + if id == flow2 { + return queue2 + } + return nil + }, + } + + // Similar to nil comparator, current bestHead panics if Func is nil. + require.NotNil(t, queue1, "Setup: queue1 should not be nil") + require.NotNil(t, queue1.Comparator(), "Setup: queue1's comparator should NOT be nil for this test") + require.Nil(t, queue1.Comparator().Func(), "Setup: queue1's comparator Func should be nil for this test") + assert.Panics(t, func() { + _, _ = policy.SelectQueue(mockBand) + }, "Policy should panic if a comparator's func is nil when it's about to be used.") + }) + }) +} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/factory.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/factory.go new file mode 100644 index 000000000..4cebd6c75 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/factory.go @@ -0,0 +1,63 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package dispatch provides the factory and registration mechanism for all `framework.InterFlowDispatchPolicy` +// implementations. +// It allows new policies to be added to the system and instantiated by name. +package dispatch + +import ( + "fmt" + "sync" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" +) + +// RegisteredPolicyName is the unique name under which a policy is registered. +type RegisteredPolicyName string + +// PolicyConstructor defines the function signature for creating a `framework.InterFlowDispatchPolicy`. +type PolicyConstructor func() (framework.InterFlowDispatchPolicy, error) + +var ( + // mu guards the registration map. + mu sync.RWMutex + // RegisteredPolicies stores the constructors for all registered policies. + RegisteredPolicies = make(map[RegisteredPolicyName]PolicyConstructor) +) + +// MustRegisterPolicy registers a policy constructor, and panics if the name is already registered. +// This is intended to be called from the `init()` function of a policy implementation. +func MustRegisterPolicy(name RegisteredPolicyName, constructor PolicyConstructor) { + mu.Lock() + defer mu.Unlock() + if _, ok := RegisteredPolicies[name]; ok { + panic(fmt.Sprintf("InterFlowDispatchPolicy already registered with name %q", name)) + } + RegisteredPolicies[name] = constructor +} + +// NewPolicyFromName creates a new `InterFlowDispatchPolicy` given its registered name. +// This is called by the `registry.FlowRegistry` when configuring a flow. +func NewPolicyFromName(name RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) { + mu.RLock() + defer mu.RUnlock() + constructor, ok := RegisteredPolicies[name] + if !ok { + return nil, fmt.Errorf("no InterFlowDispatchPolicy registered with name %q", name) + } + return constructor() +} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go new file mode 100644 index 000000000..dd5d31a4e --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go @@ -0,0 +1,145 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dispatch_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + + _ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead" + _ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin" +) + +// TestInterFlowDispatchPolicy_Conformance is the main conformance test suite for `framework.InterFlowDispatchPolicy` +// implementations. +// It iterates over all policy implementations registered via `dispatch.MustRegisterPolicy` and runs a series of +// sub-tests to ensure they adhere to the `framework.InterFlowDispatchPolicy` contract. +func TestInterFlowDispatchPolicyConformance(t *testing.T) { + t.Parallel() + + for policyName, constructor := range dispatch.RegisteredPolicies { + t.Run(string(policyName), func(t *testing.T) { + t.Parallel() + + policy, err := constructor() + require.NoError(t, err, "Policy constructor for %s failed", policyName) + require.NotNil(t, policy, "Constructor for %s should return a non-nil policy instance", policyName) + + t.Run("Initialization", func(t *testing.T) { + t.Parallel() + assert.NotEmpty(t, policy.Name(), "Name() for %s should return a non-empty string", policyName) + }) + + t.Run("SelectQueue", func(t *testing.T) { + t.Parallel() + runSelectQueueConformanceTests(t, policy) + }) + }) + } +} + +func runSelectQueueConformanceTests(t *testing.T, policy framework.InterFlowDispatchPolicy) { + t.Helper() + + flowIDEmpty := "flow-empty" + mockQueueEmpty := &frameworkmocks.MockFlowQueueAccessor{ + LenV: 0, + PeekHeadErrV: framework.ErrQueueEmpty, + FlowSpecV: types.FlowSpecification{ID: flowIDEmpty}, + } + + testCases := []struct { + name string + band framework.PriorityBandAccessor + expectErr bool + expectNil bool + expectedQueue framework.FlowQueueAccessor + }{ + { + name: "With a nil priority band accessor", + band: nil, + expectErr: false, + expectNil: true, + }, + { + name: "With an empty priority band accessor", + band: &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{}, + IterateQueuesV: func(callback func(queue framework.FlowQueueAccessor) bool) { /* no-op */ }, + }, + expectErr: false, + expectNil: true, + }, + { + name: "With a band that has one empty queue", + band: &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{flowIDEmpty}, + QueueFuncV: func(fID string) framework.FlowQueueAccessor { + if fID == flowIDEmpty { + return mockQueueEmpty + } + return nil + }, + }, + expectErr: false, + expectNil: true, + }, + { + name: "With a band that has multiple empty queues", + band: &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{flowIDEmpty, "flow-empty-2"}, + QueueFuncV: func(fID string) framework.FlowQueueAccessor { + return mockQueueEmpty + }, + }, + expectErr: false, + expectNil: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + selectedQueue, err := policy.SelectQueue(tc.band) + + if tc.expectErr { + require.Error(t, err, "SelectQueue for policy %s should return an error", policy.Name()) + } else { + require.NoError(t, err, "SelectQueue for policy %s should not return an error", policy.Name()) + } + + if tc.expectNil { + assert.Nil(t, selectedQueue, "SelectQueue for policy %s should return a nil queue", policy.Name()) + } else { + assert.NotNil(t, selectedQueue, "SelectQueue for policy %s should not return a nil queue", policy.Name()) + } + + if tc.expectedQueue != nil { + assert.Equal(t, tc.expectedQueue.FlowSpec().ID, selectedQueue.FlowSpec().ID, + "SelectQueue for policy %s returned an unexpected queue", policy.Name()) + } + }) + } +} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin.go new file mode 100644 index 000000000..9906dc11f --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin.go @@ -0,0 +1,118 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package roundrobin provides a `framework.InterFlowDispatchPolicy` that selects a queue from a priority band using a +// simple round-robin strategy. +package roundrobin + +import ( + "slices" + "sync" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" +) + +// RoundRobinPolicyName is the name of the Round Robin policy implementation. +const RoundRobinPolicyName = "RoundRobin" + +func init() { + dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(RoundRobinPolicyName), + func() (framework.InterFlowDispatchPolicy, error) { + return newRoundRobin(), nil + }) +} + +// roundRobin implements the `framework.InterFlowDispatchPolicy` interface using a round-robin strategy. +type roundRobin struct { + iterator *iterator +} + +func newRoundRobin() framework.InterFlowDispatchPolicy { + return &roundRobin{ + iterator: newIterator(), + } +} + +// Name returns the name of the policy. +func (p *roundRobin) Name() string { + return RoundRobinPolicyName +} + +// SelectQueue selects the next flow queue in a round-robin fashion from the given priority band. +// It returns nil if all queues in the band are empty or if an error occurs. +func (p *roundRobin) SelectQueue(band framework.PriorityBandAccessor) (framework.FlowQueueAccessor, error) { + if band == nil { + return nil, nil + } + selectedQueue := p.iterator.selectNextQueue(band) + return selectedQueue, nil +} + +// iterator implements a thread-safe round-robin selection logic. It maintains the ID of the last selected flow to +// ensure the selection order is correct even when the set of available flows changes dynamically. +// +// This is kept as a private, nested type as its logic is specific to this policy. This structure is a deliberate +// choice for future refactoring; the iterator logic can be easily extracted into a shared internal package if a +// "RoundRobin" displacement policy is introduced, while keeping the dispatch policy's public API stable. +type iterator struct { + mu sync.Mutex + lastSelected string +} + +// newIterator creates a new round-robin Iterator. +func newIterator() *iterator { + return &iterator{} +} + +// selectNextQueue iterates through the flow queues in a round-robin fashion, starting from the flow after the one +// selected in the previous call. It sorts the flow IDs to ensure a deterministic ordering. If no non-empty queue is +// found after a full cycle, it returns nil. +func (r *iterator) selectNextQueue(band framework.PriorityBandAccessor) framework.FlowQueueAccessor { + r.mu.Lock() + defer r.mu.Unlock() + + flowIDs := band.FlowIDs() + if len(flowIDs) == 0 { + r.lastSelected = "" // Reset state if no flows are present + return nil + } + slices.Sort(flowIDs) + + startIndex := 0 + if r.lastSelected != "" { + // Find the index of the last selected flow. + // If it's not found (e.g., the flow was removed), we'll start from the beginning. + if idx := slices.Index(flowIDs, r.lastSelected); idx != -1 { + startIndex = (idx + 1) % len(flowIDs) + } + } + + numFlows := len(flowIDs) + for i := 0; i < numFlows; i++ { + currentIdx := (startIndex + i) % numFlows + currentFlowID := flowIDs[currentIdx] + queue := band.Queue(currentFlowID) + if queue != nil && queue.Len() > 0 { + r.lastSelected = currentFlowID + return queue + } + } + + // No non-empty queue was found. + r.lastSelected = "" + return nil +} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin_test.go new file mode 100644 index 000000000..804a4c8c9 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin_test.go @@ -0,0 +1,256 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package roundrobin + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" +) + +func TestRoundRobin_Name(t *testing.T) { + t.Parallel() + policy := newRoundRobin() + assert.Equal(t, RoundRobinPolicyName, policy.Name(), "Name should match the policy's constant") +} + +func TestRoundRobin_SelectQueue_Logic(t *testing.T) { + t.Parallel() + policy := newRoundRobin() + + // Setup: Three non-empty queues + queue1 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowSpecV: types.FlowSpecification{ID: "flow1"}} + queue2 := &frameworkmocks.MockFlowQueueAccessor{LenV: 2, FlowSpecV: types.FlowSpecification{ID: "flow2"}} + queue3 := &frameworkmocks.MockFlowQueueAccessor{LenV: 3, FlowSpecV: types.FlowSpecification{ID: "flow3"}} + + mockBand := &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{"flow3", "flow1", "flow2"}, // Unsorted to test sorting + QueueFuncV: func(id string) framework.FlowQueueAccessor { + switch id { + case "flow1": + return queue1 + case "flow2": + return queue2 + case "flow3": + return queue3 + } + return nil + }, + } + + // Expected order is based on sorted FlowIDs: flow1, flow2, flow3 + expectedOrder := []string{"flow1", "flow2", "flow3"} + + // First cycle + for i := range expectedOrder { + selected, err := policy.SelectQueue(mockBand) + require.NoError(t, err, "SelectQueue should not error on a valid band") + require.NotNil(t, selected, "SelectQueue should have selected a queue") + assert.Equal(t, expectedOrder[i], selected.FlowSpec().ID, "Cycle 1, selection %d should be %s", i+1, expectedOrder[i]) + } + + // Second cycle (wraps around) + for i := range expectedOrder { + selected, err := policy.SelectQueue(mockBand) + require.NoError(t, err, "SelectQueue should not error on a valid band") + require.NotNil(t, selected, "SelectQueue should have selected a queue") + assert.Equal(t, expectedOrder[i], selected.FlowSpec().ID, "Cycle 2, selection %d should be %s", i+1, expectedOrder[i]) + } +} + +func TestRoundRobin_SelectQueue_SkipsEmptyQueues(t *testing.T) { + t.Parallel() + policy := newRoundRobin() + + // Setup: Two non-empty queues and one empty queue + queue1 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowSpecV: types.FlowSpecification{ID: "flow1"}} + queueEmpty := &frameworkmocks.MockFlowQueueAccessor{LenV: 0, FlowSpecV: types.FlowSpecification{ID: "flowEmpty"}} + queue3 := &frameworkmocks.MockFlowQueueAccessor{LenV: 3, FlowSpecV: types.FlowSpecification{ID: "flow3"}} + + mockBand := &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{"flow1", "flowEmpty", "flow3"}, + QueueFuncV: func(id string) framework.FlowQueueAccessor { + switch id { + case "flow1": + return queue1 + case "flowEmpty": + return queueEmpty + case "flow3": + return queue3 + } + return nil + }, + } + + // Expected order: flow1, flow3, flow1, flow3, ... + selected, err := policy.SelectQueue(mockBand) + require.NoError(t, err, "SelectQueue should not error when skipping queues") + require.NotNil(t, selected, "SelectQueue should select the first non-empty queue") + assert.Equal(t, "flow1", selected.FlowSpec().ID, "First selection should be flow1") + + selected, err = policy.SelectQueue(mockBand) + require.NoError(t, err, "SelectQueue should not error when skipping queues") + require.NotNil(t, selected, "SelectQueue should select the second non-empty queue") + assert.Equal(t, "flow3", selected.FlowSpec().ID, "Second selection should be flow3, skipping flowEmpty") + + selected, err = policy.SelectQueue(mockBand) + require.NoError(t, err, "SelectQueue should not error when wrapping around") + require.NotNil(t, selected, "SelectQueue should wrap around and select a queue") + assert.Equal(t, "flow1", selected.FlowSpec().ID, "Should wrap around and select flow1 again") +} + +func TestRoundRobin_SelectQueue_HandlesDynamicFlows(t *testing.T) { + t.Parallel() + policy := newRoundRobin() + + // Initial setup + queue1 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowSpecV: types.FlowSpecification{ID: "flow1"}} + queue2 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowSpecV: types.FlowSpecification{ID: "flow2"}} + mockBand := &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{"flow1", "flow2"}, + QueueFuncV: func(id string) framework.FlowQueueAccessor { + if id == "flow1" { + return queue1 + } + return queue2 + }, + } + + // First selection + selected, err := policy.SelectQueue(mockBand) + require.NoError(t, err, "SelectQueue should not error on initial selection") + require.NotNil(t, selected, "SelectQueue should select a queue initially") + assert.Equal(t, "flow1", selected.FlowSpec().ID, "First selection should be flow1") + + // --- Simulate adding a flow --- + queue3 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowSpecV: types.FlowSpecification{ID: "flow3"}} + mockBand.FlowIDsV = []string{"flow1", "flow2", "flow3"} + mockBand.QueueFuncV = func(id string) framework.FlowQueueAccessor { + switch id { + case "flow1": + return queue1 + case "flow2": + return queue2 + case "flow3": + return queue3 + } + return nil + } + + // Next selection should be flow2 (continues from last index) + selected, err = policy.SelectQueue(mockBand) + require.NoError(t, err, "SelectQueue should not error after adding a flow") + require.NotNil(t, selected, "SelectQueue should select a queue after adding a flow") + assert.Equal(t, "flow2", selected.FlowSpec().ID, "Next selection should be flow2") + + // Next selection should be flow3 + selected, err = policy.SelectQueue(mockBand) + require.NoError(t, err, "SelectQueue should not error on the third selection") + require.NotNil(t, selected, "SelectQueue should select the new flow") + assert.Equal(t, "flow3", selected.FlowSpec().ID, "Next selection should be the new flow3") + + // --- Simulate removing a flow --- + mockBand.FlowIDsV = []string{"flow1", "flow3"} // flow2 is removed + + // Next selection should wrap around and pick flow1 + selected, err = policy.SelectQueue(mockBand) + require.NoError(t, err, "SelectQueue should not error after removing a flow") + require.NotNil(t, selected, "SelectQueue should select a queue after removing a flow") + assert.Equal(t, "flow1", selected.FlowSpec().ID, "Next selection should wrap around to flow1 after a removal") +} + +func TestRoundRobin_SelectQueue_Concurrency(t *testing.T) { + t.Parallel() + // Run this test multiple times to increase the chance of catching race conditions. + for i := range 5 { + t.Run(fmt.Sprintf("Iteration%d", i), func(t *testing.T) { + t.Parallel() + policy := newRoundRobin() + + // Setup: Three non-empty queues + queues := []*frameworkmocks.MockFlowQueueAccessor{ + {LenV: 1, FlowSpecV: types.FlowSpecification{ID: "flow1"}}, + {LenV: 2, FlowSpecV: types.FlowSpecification{ID: "flow2"}}, + {LenV: 3, FlowSpecV: types.FlowSpecification{ID: "flow3"}}, + } + numQueues := int64(len(queues)) + + mockBand := &frameworkmocks.MockPriorityBandAccessor{ + FlowIDsV: []string{"flow1", "flow2", "flow3"}, + QueueFuncV: func(id string) framework.FlowQueueAccessor { + for _, q := range queues { + if q.FlowSpec().ID == id { + return q + } + } + return nil + }, + } + + var wg sync.WaitGroup + numGoroutines := 10 + selectionsPerGoroutine := 30 + totalSelections := int64(numGoroutines * selectionsPerGoroutine) + + var selectionCounts sync.Map // Used like a concurrent map[string]*atomic.Int64 + + wg.Add(numGoroutines) + for range numGoroutines { + go func() { + defer wg.Done() + for range selectionsPerGoroutine { + selected, err := policy.SelectQueue(mockBand) + if err == nil && selected != nil { + val, _ := selectionCounts.LoadOrStore(selected.FlowSpec().ID, new(atomic.Int64)) + val.(*atomic.Int64).Add(1) + } + } + }() + } + wg.Wait() + + var finalCount int64 + countsStr := "" + selectionCounts.Range(func(key, value any) bool { + count := value.(*atomic.Int64).Load() + finalCount += count + countsStr += fmt.Sprintf("%s: %d, ", key, count) + + // Check for reasonable distribution. + // In a perfect world, each queue gets totalSelections / numQueues. + // We allow for some variance due to scheduling, but it shouldn't be wildly off. + // A simple check is that each queue gets at least a certain fraction of its expected share. + expectedCount := totalSelections / numQueues + minExpectedCount := expectedCount / 2 // Expect at least half of the ideal distribution + assert.True(t, count > minExpectedCount, + "Queue %s was selected only %d times, expected at least %d", key, count, minExpectedCount) + return true + }) + + assert.Equal(t, totalSelections, finalCount, "Total selections should match the expected number") + t.Logf("Selection distribution: %s", countsStr) + }) + } +} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md index 1432db0b7..71cfb09d7 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md @@ -10,9 +10,9 @@ The `controller.FlowController` uses a two-tier policy system to manage requests plugins represent the first tier, making tactical decisions about the ordering of requests *within* a single logical flow (e.g., for a specific model or tenant). -This contrasts with the `framework.InterFlowDispatchPolicy` (not yet implemented), which is responsible for -**spatial fairness**: deciding *which flow's queue* gets the next opportunity to dispatch a request. The -`framework.IntraFlowDispatchPolicy` only operates *after* the inter-flow policy has selected a specific queue. +This contrasts with the `framework.InterFlowDispatchPolicy`, which is responsible for deciding *which flow's queue* +gets the next opportunity to dispatch a request. The `framework.IntraFlowDispatchPolicy` only operates *after* the +inter-flow policy has selected a specific queue. Key responsibilities and characteristics of a `framework.IntraFlowDispatchPolicy`: @@ -60,4 +60,4 @@ To contribute a new dispatch policy implementation, follow these steps: correctly implements your desired selection logic for a non-empty queue. 5. **Documentation** - - Add a package-level GoDoc comment to your new policy's Go file, explaining its behavior and any trade-offs. + - Add a package-level GoDoc comment to your new policy's Go file, explaining its behavior and any trade-offs. \ No newline at end of file diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/README.md b/pkg/epp/flowcontrol/framework/plugins/queue/README.md index 3632487fc..6b39221e5 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/README.md +++ b/pkg/epp/flowcontrol/framework/plugins/queue/README.md @@ -1,4 +1,4 @@ -# Flow Controller Queue Plugins (`plugins/queue/`) +# Flow Controller Queue Plugins This directory contains concrete implementations of the [`framework.SafeQueue`](../../queue.go) interface. This contract defines core, self-contained queue data structures used by the `controller.FlowController`. diff --git a/pkg/epp/flowcontrol/framework/policies.go b/pkg/epp/flowcontrol/framework/policies.go index 14cbc5df4..8211d6091 100644 --- a/pkg/epp/flowcontrol/framework/policies.go +++ b/pkg/epp/flowcontrol/framework/policies.go @@ -119,6 +119,28 @@ type IntraFlowDispatchPolicy interface { RequiredQueueCapabilities() []QueueCapability } +// InterFlowDispatchPolicy selects which flow's queue to service next from a given priority band. +// Implementations define the fairness or dispatch ordering logic between different flows that share the same priority +// level. +type InterFlowDispatchPolicy interface { + // Name returns a string identifier for the concrete policy implementation type (e.g., "RoundRobin"). + Name() string + + // SelectQueue inspects the flow queues within the provided `PriorityBandAccessor` and returns the `FlowQueueAccessor` + // of the queue chosen for the next dispatch attempt. + // + // Returns: + // - `FlowQueueAccessor`: The selected queue, or nil if no queue is chosen. + // - error: Non-nil if an unrecoverable error occurs. A nil error is returned if no queue is selected (e.g., all + // queues in the band are empty or the policy logic determines a pause is appropriate). + // + // Policies should be resilient to transient issues (like a queue becoming empty during inspection) and select from + // other available queues if possible, rather than returning an error for such conditions. + // + // Conformance: Implementations MUST be goroutine-safe if they maintain internal state. + SelectQueue(band PriorityBandAccessor) (selectedQueue FlowQueueAccessor, err error) +} + // FlowQueueAccessor provides a policy-facing, read-only view of a single flow's queue. // It combines general queue inspection methods (embedded via `QueueInspectionMethods`) with flow-specific metadata. // @@ -136,7 +158,33 @@ type FlowQueueAccessor interface { // FlowSpec returns the `types.FlowSpecification` of the flow this queue accessor is associated with. // This provides essential context (like `FlowID`) to policies. - // - // Conformance: Implementations MUST return a valid `types.FlowSpecification`. FlowSpec() types.FlowSpecification } + +// PriorityBandAccessor provides a read-only view into a specific priority band within the `ports.FlowRegistry`. +// It allows the `controller.FlowController` and inter-flow policies to inspect the state of all flow queues within that +// band. +// +// Conformance: Implementations MUST ensure all methods are goroutine-safe for concurrent access. +type PriorityBandAccessor interface { + // Priority returns the numerical priority level of this band. + Priority() uint + + // PriorityName returns the human-readable name of this priority band. + PriorityName() string + + // FlowIDs returns a slice of all flow IDs within this priority band. + // The order of items in the slice is not guaranteed unless specified by the implementations (e.g., for deterministic + // testing scenarios). + FlowIDs() []string + + // Queue returns a `FlowQueueAccessor` for the specified `flowID` within this priority band. + // + // Conformance: Implementations MUST return nil if the `flowID` is not found in this band. + Queue(flowID string) FlowQueueAccessor + + // IterateQueues executes the given `callback` for each `FlowQueueAccessor` in this priority band. + // Iteration stops if the `callback` returns false. The order of iteration is not guaranteed unless specified by the + // implementation (e.g., for deterministic testing scenarios). + IterateQueues(callback func(queue FlowQueueAccessor) (keepIterating bool)) +}