Skip to content

Commit b7f210d

Browse files
committed
Add flow controller.
1 parent 7c830cb commit b7f210d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+8362
-200
lines changed

cmd/epp/main.go

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4141
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4242
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
43+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4344
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4445
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
4546
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
@@ -151,14 +152,17 @@ func run() error {
151152
})
152153
setupLog.Info("Flags processed", "flags", flags)
153154

154-
// Init runtime.
155+
// --- Load Configurations from Environment Variables ---
156+
sdConfig := saturationdetector.LoadConfigFromEnv()
157+
158+
// --- Get Kubernetes Config ---
155159
cfg, err := ctrl.GetConfig()
156160
if err != nil {
157-
setupLog.Error(err, "Failed to get rest config")
161+
setupLog.Error(err, "Failed to get Kubernetes rest config")
158162
return err
159163
}
160164

161-
// Set up mapper for metric scraping.
165+
// --- Setup Datastore ---
162166
mapping, err := backendmetrics.NewMetricMapping(
163167
*totalQueuedRequestsMetric,
164168
*kvCacheUsagePercentageMetric,
@@ -169,13 +173,11 @@ func run() error {
169173
return err
170174
}
171175
verifyMetricMapping(*mapping, setupLog)
172-
173176
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
174-
// Setup runner.
175177
ctx := ctrl.SetupSignalHandler()
176-
177178
datastore := datastore.NewDatastore(ctx, pmf)
178179

180+
// --- Setup Metrics Server ---
179181
customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)}
180182
metrics.Register(customCollectors...)
181183
metrics.RecordInferenceExtensionInfo()
@@ -199,6 +201,7 @@ func run() error {
199201
return err
200202
}
201203

204+
// --- Initialize Core EPP Components ---
202205
scheduler := scheduling.NewScheduler(datastore)
203206
if schedulerV2 == "true" {
204207
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
@@ -221,6 +224,18 @@ func run() error {
221224
schedulerConfig := scheduling.NewSchedulerConfig(profilepicker.NewAllProfilesPicker(), map[string]*framework.SchedulerProfile{"schedulerv2": schedulerProfile})
222225
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
223226
}
227+
228+
saturationDetector, err := saturationdetector.NewDetector(
229+
*sdConfig,
230+
datastore,
231+
ctrl.Log.WithName("saturation-detector"),
232+
)
233+
if err != nil {
234+
setupLog.Error(err, "Failed to create SaturationDetector")
235+
return err
236+
}
237+
238+
// --- Setup ExtProc Server Runner ---
224239
serverRunner := &runserver.ExtProcServerRunner{
225240
GrpcPort: *grpcPort,
226241
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
@@ -231,24 +246,26 @@ func run() error {
231246
CertPath: *certPath,
232247
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
233248
Scheduler: scheduler,
249+
SaturationDetector: saturationDetector,
234250
}
235251
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
236-
setupLog.Error(err, "Failed to setup ext-proc controllers")
252+
setupLog.Error(err, "Failed to setup EPP controllers")
237253
return err
238254
}
239255

256+
// --- Add Runnables to Manager ---
240257
// Register health server.
241258
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil {
242259
return err
243260
}
244261

245262
// Register ext-proc server.
246-
if err := mgr.Add(serverRunner.AsRunnable(ctrl.Log.WithName("ext-proc"))); err != nil {
247-
setupLog.Error(err, "Failed to register ext-proc gRPC server")
263+
if err := registerExtProcServer(mgr, serverRunner, ctrl.Log.WithName("ext-proc")); err != nil {
248264
return err
249265
}
250266

251-
// Start the manager. This blocks until a signal is received.
267+
// --- Start Manager ---
268+
// This blocks until a signal is received.
252269
setupLog.Info("Controller manager starting")
253270
if err := mgr.Start(ctx); err != nil {
254271
setupLog.Error(err, "Error starting controller manager")
@@ -276,6 +293,16 @@ func initLogging(opts *zap.Options) {
276293
ctrl.SetLogger(logger)
277294
}
278295

296+
// registerExtProcServer adds the ExtProcServerRunner as a Runnable to the manager.
297+
func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerRunner, logger logr.Logger) error {
298+
if err := mgr.Add(runner.AsRunnable(logger)); err != nil {
299+
setupLog.Error(err, "Failed to register ext-proc gRPC server runnable")
300+
return err
301+
}
302+
setupLog.Info("ExtProc server runner added to manager.")
303+
return nil
304+
}
305+
279306
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
280307
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error {
281308
srv := grpc.NewServer()
@@ -309,5 +336,4 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge
309336
if mapping.LoraRequestInfo == nil {
310337
logger.Info("Not scraping metric: LoraRequestInfo")
311338
}
312-
313339
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package config defines configuration structures with defaulting and validation logic for the Flow Controller and the
18+
// Flow Registry.
19+
package config
20+
21+
import (
22+
"fmt"
23+
"time"
24+
25+
"github.com/go-logr/logr"
26+
interd "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller/plugins/dispatch/interflow"
27+
intrad "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller/plugins/dispatch/intraflow"
28+
interp "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller/plugins/preemption/interflow"
29+
intrap "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller/plugins/preemption/intraflow"
30+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller/plugins/queue"
31+
)
32+
33+
// Default values for FlowControllerConfig
34+
const (
35+
// DefaultFCQueueTTL is the default Time-To-Live for requests in the FlowController.
36+
// Used if a request's InitialEffectiveTTL() is zero or if overridden by controller policy.
37+
DefaultFCQueueTTL = 30 * time.Second
38+
// DefaultFCExpiryCleanupInterval is the default frequency for the FlowController's background routine to check for
39+
// and remove expired items.
40+
DefaultFCExpiryCleanupInterval = 1 * time.Second
41+
)
42+
43+
// Default values for PriorityBandConfig
44+
const (
45+
// DefaultPriorityBandMaxBytes is the default maximum byte capacity for a priority band if not explicitly configured.
46+
// Set to a generous value suitable for LLM serving.
47+
DefaultPriorityBandMaxBytes uint64 = 1 * 1024 * 1024 * 1024 // 1 GB
48+
)
49+
50+
// FlowControllerConfig groups configuration parameters for the FlowController engine.
51+
// Note: Default values are applied by the FlowController during its initialization if specific fields are not set or
52+
// are invalid.
53+
type FlowControllerConfig struct {
54+
// DefaultQueueTTL is the default Time-To-Live applied to requests within queues if not otherwise specified by the
55+
// incoming request's InitialEffectiveTTL() or overridden by more specific configurations.
56+
// Optional: If not set or set to a non-positive value, a system default (e.g., 30 seconds) will be used.
57+
// Example: "30s".
58+
DefaultQueueTTL time.Duration
59+
// ExpiryCleanupInterval is the frequency at which the FlowController's background routine checks for and removes
60+
// expired items from all managed queues.
61+
// Optional: If not set or set to a non-positive value, a system default (e.g., 1 second) will be used.
62+
// Example: "1s".
63+
ExpiryCleanupInterval time.Duration
64+
// MaxGlobalBytes defines an optional overall limit on the total byte size of requests across all queues in all
65+
// priority bands. If set to a positive value, this is enforced by the FlowController's capacity checking logic in
66+
// addition to per-priority limits (which are sourced from the FlowRegistry).
67+
// Optional: A value of 0 means no global byte limit is enforced by the FlowController.
68+
// Defaults to 0.
69+
MaxGlobalBytes uint64
70+
// TODO: Consider adding MaxFlowBytes (per-flow capacity limit within a priority band) as a future enhancement for
71+
// finer-grained fairness and resource isolation. This would likely involve changes to FlowSpecification or a new
72+
// per-flow policy, and the FlowController's capacity checks.
73+
}
74+
75+
// ValidateAndApplyDefaults validates the FlowControllerConfig and applies default values if necessary.
76+
func (fcc *FlowControllerConfig) ValidateAndApplyDefaults(logger logr.Logger) error {
77+
if fcc.DefaultQueueTTL <= 0 {
78+
logger.V(1).Info("FlowControllerConfig.DefaultQueueTTL is not set or invalid, using default.",
79+
"default", DefaultFCQueueTTL)
80+
fcc.DefaultQueueTTL = DefaultFCQueueTTL
81+
}
82+
if fcc.ExpiryCleanupInterval <= 0 {
83+
logger.V(1).Info("FlowControllerConfig.ExpiryCleanupInterval is not set or invalid, using default.",
84+
"default", DefaultFCExpiryCleanupInterval)
85+
fcc.ExpiryCleanupInterval = DefaultFCExpiryCleanupInterval
86+
}
87+
// MaxGlobalBytes can be 0 (meaning no global limit), so no default needed if 0.
88+
return nil
89+
}
90+
91+
// FlowRegistryConfig holds the configuration for the FlowRegistry, primarily defining the priority bands.
92+
// Note: Default values for sub-configurations (like PriorityBandConfig) are applied by the FlowRegistry during its
93+
// initialization if specific fields are not set or are invalid.
94+
type FlowRegistryConfig struct {
95+
// PriorityBands defines the set of priority bands managed by the FlowRegistry.
96+
// Required: At least one PriorityBandConfig should typically be provided for a functional registry.
97+
PriorityBands []PriorityBandConfig
98+
}
99+
100+
// validateAndApplyDefaults validates the FlowRegistryConfig by validating each of its PriorityBandConfigs.
101+
func (frc *FlowRegistryConfig) ValidateAndApplyDefaults(logger logr.Logger) error {
102+
for i := range frc.PriorityBands {
103+
if err := frc.PriorityBands[i].validateAndApplyDefaults(logger); err != nil {
104+
return fmt.Errorf("invalid config for priority band (priority %d, name %s): %w",
105+
frc.PriorityBands[i].Priority, frc.PriorityBands[i].PriorityName, err)
106+
}
107+
}
108+
return nil
109+
}
110+
111+
// PriorityBandConfig defines the configuration for a single priority band within a FlowRegistry.
112+
// Note: Default values are applied by the FlowRegistry during its initialization if specific fields are not set or are
113+
// invalid.
114+
type PriorityBandConfig struct {
115+
// Priority is the numerical priority level for this band.
116+
// Convention: Lower numerical values indicate higher priority (e.g., 0 is highest).
117+
// Required.
118+
Priority uint
119+
// PriorityName is a human-readable name for this priority band (e.g., "Critical", "Standard". "Sheddable").
120+
// Required.
121+
PriorityName string
122+
// InterFlowDispatchPolicy specifies the name of the registered policy used to select which flow's queue to service
123+
// next from this band.
124+
// Optional: If empty, a system default (e.g., "BestHeadPriorityScore") will be used.
125+
InterFlowDispatchPolicy interd.RegisteredInterFlowDispatchPolicyName
126+
// InterFlowPreemptionPolicy specifies the name of the registered policy used to select a victim flow's queue from
127+
// this band if preemption is triggered from a higher priority band targeting this one.
128+
// Optional: If empty, a system default (e.g., "RoundRobin") will be used.
129+
InterFlowPreemptionPolicy interp.RegisteredInterFlowPreemptionPolicyName
130+
// IntraFlowDispatchPolicy specifies the name of the registered policy used to select a specific request to dispatch
131+
// next from within a single flow's queue in this band.
132+
// Optional: If empty, a system default (e.g., "FCFS") will be used.
133+
IntraFlowDispatchPolicy intrad.RegisteredIntraFlowDispatchPolicyName
134+
// IntraFlowPreemptionPolicy specifies the name of the registered policy used to select a victim item from within a
135+
// specific flow's queue in this band if preemption is triggered.
136+
// Optional: If empty, a system default (e.g., "Tail") will be used.
137+
IntraFlowPreemptionPolicy intrap.RegisteredIntraFlowPreemptionPolicyName
138+
// QueueType specifies the name of the registered SafeQueue implementation to be used for flow queues within this
139+
// band.
140+
// Optional: If empty, a system default (e.g., "ListQueue") will be used.
141+
QueueType queue.RegisteredQueueName
142+
// MaxBytes defines the maximum total byte size for this specific priority band. The FlowController uses this limit
143+
// in its capacity checking logic.
144+
// Optional: If not set or set to a non-positive value, a system default (e.g., 1 GB) will be used.
145+
MaxBytes uint64
146+
}
147+
148+
// validateAndApplyDefaults validates and applies defaults for a single PriorityBandConfig.
149+
func (pbc *PriorityBandConfig) validateAndApplyDefaults(logger logr.Logger) error {
150+
if pbc.PriorityName == "" {
151+
return fmt.Errorf("PriorityName cannot be empty for priority level %d", pbc.Priority)
152+
}
153+
bandLogger := logger.WithValues("priority", pbc.Priority, "priorityName", pbc.PriorityName)
154+
155+
if pbc.InterFlowDispatchPolicy == "" {
156+
bandLogger.V(1).Info("InterFlowDispatchPolicy is empty, using default", "defaultPolicy",
157+
interd.BestHeadPriorityScoreDispatchPolicyName)
158+
pbc.InterFlowDispatchPolicy = interd.BestHeadPriorityScoreDispatchPolicyName
159+
}
160+
if pbc.InterFlowPreemptionPolicy == "" {
161+
bandLogger.V(1).Info("InterFlowPreemptionPolicy is empty, using default", "defaultPolicy",
162+
interp.RoundRobinPreemptionPolicyName)
163+
pbc.InterFlowPreemptionPolicy = interp.RoundRobinPreemptionPolicyName
164+
}
165+
if pbc.IntraFlowDispatchPolicy == "" {
166+
bandLogger.V(1).Info("IntraFlowDispatchPolicy is empty, using default", "defaultPolicy",
167+
intrad.FCFSDispatchPolicyName)
168+
pbc.IntraFlowDispatchPolicy = intrad.FCFSDispatchPolicyName
169+
}
170+
if pbc.IntraFlowPreemptionPolicy == "" {
171+
bandLogger.V(1).Info("IntraFlowPreemptionPolicy is empty, using default", "defaultPolicy",
172+
intrap.TailPreemptionPolicyName)
173+
pbc.IntraFlowPreemptionPolicy = intrap.TailPreemptionPolicyName
174+
}
175+
if pbc.QueueType == "" {
176+
bandLogger.V(1).Info("QueueType is empty, using default", "defaultQueue", queue.ListQueueName)
177+
pbc.QueueType = queue.ListQueueName
178+
}
179+
if pbc.MaxBytes <= 0 {
180+
bandLogger.V(1).Info("PriorityBandConfig.MaxBytes is not set or invalid, using default",
181+
"default", DefaultPriorityBandMaxBytes)
182+
pbc.MaxBytes = DefaultPriorityBandMaxBytes
183+
}
184+
return nil
185+
}

0 commit comments

Comments
 (0)