Skip to content

Commit 2387722

Browse files
committed
Add flow controller.
1 parent 7c63c0d commit 2387722

33 files changed

+8727
-231
lines changed

cmd/epp/main.go

Lines changed: 95 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"context"
2021
"flag"
2122
"fmt"
2223
"net"
@@ -41,8 +42,10 @@ import (
4142
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4243
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4344
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
45+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller"
4446
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4547
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
48+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4649
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4750
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
4851
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
@@ -157,13 +160,20 @@ func run() error {
157160
})
158161
setupLog.Info("Flags processed", "flags", flags)
159162

160-
// Init runtime.
163+
// --- Load Configurations from Environment Variables ---
164+
// Note: Scheduler config is loaded via its package init currently. We may
165+
// want to load it here explicitly:
166+
fcConfig, flowControllerEnabled := flowcontroller.LoadConfigFromEnv()
167+
sdConfig := saturationdetector.LoadConfigFromEnv()
168+
169+
// --- Get Kubernetes Config ---
161170
cfg, err := ctrl.GetConfig()
162171
if err != nil {
163-
setupLog.Error(err, "Failed to get rest config")
172+
setupLog.Error(err, "Failed to get Kubernetes rest config")
164173
return err
165174
}
166175

176+
// --- Setup Manager ---
167177
poolNamespacedName := types.NamespacedName{
168178
Name: *poolName,
169179
Namespace: *poolNamespace,
@@ -174,7 +184,7 @@ func run() error {
174184
return err
175185
}
176186

177-
// Set up mapper for metric scraping.
187+
// --- Setup Datastore ---
178188
mapping, err := backendmetrics.NewMetricMapping(
179189
*totalQueuedRequestsMetric,
180190
*kvCacheUsagePercentageMetric,
@@ -185,14 +195,12 @@ func run() error {
185195
return err
186196
}
187197
verifyMetricMapping(*mapping, setupLog)
188-
189198
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
190-
// Setup runner.
191199
ctx := ctrl.SetupSignalHandler()
200+
appDatastore := datastore.NewDatastore(ctx, pmf)
192201

193-
datastore := datastore.NewDatastore(ctx, pmf)
194-
195-
scheduler := scheduling.NewScheduler(datastore)
202+
// --- Initialize EPP Components ---
203+
appScheduler := scheduling.NewScheduler(appDatastore)
196204
if schedulerV2 == "true" {
197205
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
198206
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
@@ -213,41 +221,78 @@ func run() error {
213221
[]plugins.PostSchedule{},
214222
[]plugins.PostResponse{},
215223
schedConfigOpts...)
216-
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
224+
appScheduler = scheduling.NewSchedulerWithConfig(appDatastore, schedulerConfig)
225+
}
226+
227+
appSaturationDetector, err := saturationdetector.NewDetector(
228+
*sdConfig,
229+
appDatastore,
230+
ctrl.Log.WithName("saturation-detector"),
231+
)
232+
if err != nil {
233+
setupLog.Error(err, "Failed to create SaturationDetector")
234+
return err
235+
}
236+
237+
var appFlowController *flowcontroller.FlowController
238+
if flowControllerEnabled {
239+
appFlowController, err = flowcontroller.NewFlowController(
240+
appSaturationDetector,
241+
fcConfig,
242+
)
243+
if err != nil {
244+
setupLog.Error(err, "Failed to create FlowController")
245+
return err
246+
}
247+
setupLog.Info("FlowController enabled and initialized.")
248+
} else {
249+
setupLog.Info("FlowController is disabled via configuration.")
217250
}
251+
252+
// --- Setup ExtProc Server Runner ---
218253
serverRunner := &runserver.ExtProcServerRunner{
219254
GrpcPort: *grpcPort,
220255
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
221256
DestinationEndpointHintKey: *destinationEndpointHintKey,
222257
PoolNamespacedName: poolNamespacedName,
223-
Datastore: datastore,
258+
Datastore: appDatastore,
224259
SecureServing: *secureServing,
225260
CertPath: *certPath,
226261
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
227-
Scheduler: scheduler,
262+
Scheduler: appScheduler,
263+
FlowController: appFlowController, // Pass instance (can be nil)
264+
SaturationDetector: appSaturationDetector,
265+
FlowControllerEnabled: flowControllerEnabled,
228266
}
229267
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
230-
setupLog.Error(err, "Failed to setup ext-proc controllers")
268+
setupLog.Error(err, "Failed to setup EPP controllers")
269+
return err
270+
}
271+
272+
// --- Add Runnables to Manager ---
273+
274+
// Register FlowController Run loop.
275+
if err := registerFlowController(mgr, appFlowController); err != nil {
231276
return err
232277
}
233278

234279
// Register health server.
235-
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil {
280+
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), appDatastore, *grpcHealthPort); err != nil {
236281
return err
237282
}
238283

239284
// Register ext-proc server.
240-
if err := mgr.Add(serverRunner.AsRunnable(ctrl.Log.WithName("ext-proc"))); err != nil {
241-
setupLog.Error(err, "Failed to register ext-proc gRPC server")
285+
if err := registerExtProcServer(mgr, serverRunner, ctrl.Log.WithName("ext-proc")); err != nil {
242286
return err
243287
}
244288

245289
// Register metrics handler.
246-
if err := registerMetricsHandler(mgr, *metricsPort, cfg, datastore); err != nil {
290+
if err := registerMetricsHandler(mgr, *metricsPort, cfg, appDatastore); err != nil {
247291
return err
248292
}
249293

250-
// Start the manager. This blocks until a signal is received.
294+
// --- Start Manager ---
295+
// This blocks until a signal is received.
251296
setupLog.Info("Controller manager starting")
252297
if err := mgr.Start(ctx); err != nil {
253298
setupLog.Error(err, "Error starting controller manager")
@@ -275,6 +320,39 @@ func initLogging(opts *zap.Options) {
275320
ctrl.SetLogger(logger)
276321
}
277322

323+
// registerFlowController adds the FlowController Run loop as a Runnable to the
324+
// manager.
325+
func registerFlowController(mgr manager.Manager, fc *flowcontroller.FlowController) error {
326+
if fc == nil {
327+
setupLog.Info("FlowController is nil (disabled), skipping registration.")
328+
return nil // Not an error if it's intentionally disabled
329+
}
330+
if err := mgr.Add(manager.RunnableFunc(func(runCtx context.Context) error {
331+
fcLog := ctrl.Log.WithName("flowcontroller-runnable")
332+
fcLog.Info("Starting FlowController Run loop")
333+
// Run the FlowController; it handles context cancellation internally.
334+
fc.Run(runCtx)
335+
fcLog.Info("FlowController Run loop stopped")
336+
return nil
337+
})); err != nil {
338+
setupLog.Error(err, "Failed to add FlowController runnable to manager")
339+
return err
340+
}
341+
setupLog.Info("FlowController Run loop added to manager.")
342+
return nil
343+
}
344+
345+
// registerExtProcServer adds the ExtProcServerRunner as a Runnable to the
346+
// manager.
347+
func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerRunner, logger logr.Logger) error {
348+
if err := mgr.Add(runner.AsRunnable(logger)); err != nil {
349+
setupLog.Error(err, "Failed to register ext-proc gRPC server runnable")
350+
return err
351+
}
352+
setupLog.Info("ExtProc server runner added to manager.")
353+
return nil
354+
}
355+
278356
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
279357
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error {
280358
srv := grpc.NewServer()
@@ -364,5 +442,4 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge
364442
if mapping.LoraRequestInfo == nil {
365443
logger.Info("Not scraping metric: LoraRequestInfo")
366444
}
367-
368445
}

pkg/epp/flowcontroller/config.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package flowcontroller
18+
19+
import (
20+
"fmt"
21+
"time"
22+
23+
"sigs.k8s.io/controller-runtime/pkg/log"
24+
v1alpha2 "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller/types"
26+
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
27+
)
28+
29+
// PolicyType strings
30+
const (
31+
PolicyFCFS = "FCFS"
32+
PolicyRoundRobin = "RoundRobin"
33+
PolicyEvictOldest = "EvictOldest"
34+
PolicyNone = "None"
35+
)
36+
37+
// Default byte capacities per criticality and other settings
38+
const (
39+
DefaultCriticalCapacityBytes uint64 = 1000 * 1024 * 1024 // 1000MB
40+
DefaultStandardCapacityBytes uint64 = 500 * 1024 * 1024 // 500MB
41+
DefaultSheddableCapacityBytes uint64 = 500 * 1024 * 1024 // 500MB
42+
DefaultQueueTTL = 10 * time.Second
43+
DefaultExpiryCleanupInterval = time.Second
44+
DefaultFcEnabled = false // Default to disabled
45+
)
46+
47+
// Environment variable names for FlowController configuration
48+
const (
49+
EnvFcMaxBytesCritical = "FC_MAX_BYTES_CRITICAL"
50+
EnvFcMaxBytesStandard = "FC_MAX_BYTES_STANDARD"
51+
EnvFcMaxBytesSheddable = "FC_MAX_BYTES_SHEDDABLE"
52+
EnvFcMaxGlobalBytes = "FC_MAX_GLOBAL_BYTES"
53+
EnvFcDefaultQueueTTL = "FC_DEFAULT_QUEUE_TTL"
54+
EnvFcExpiryCleanupInterval = "FC_EXPIRY_CLEANUP_INTERVAL"
55+
EnvFcFairnessPolicy = "FC_FAIRNESS_POLICY"
56+
EnvFcPreemptionStrategy = "FC_PREEMPTION_STRATEGY"
57+
EnvFcEnabled = "FC_ENABLED"
58+
)
59+
60+
// Supported policy values
61+
var (
62+
SupportedInterModelFairnessPolicies = map[string]struct{}{
63+
PolicyFCFS: {},
64+
PolicyRoundRobin: {},
65+
}
66+
SupportedPreemptionStrategies = map[string]struct{}{
67+
PolicyEvictOldest: {},
68+
PolicyNone: {},
69+
}
70+
)
71+
72+
// FlowControllerConfig holds the configuration for the FlowController.
73+
type FlowControllerConfig struct {
74+
// MaxBytesPerCriticality maps criticality levels to their maximum allowed
75+
// byte size in the queue.
76+
MaxBytesPerCriticality map[v1alpha2.Criticality]uint64
77+
// MaxGlobalBytes defines the overall limit on the total byte size across
78+
// all criticality bands. 0 means no global limit (beyond the sum of
79+
// per-criticality limits).
80+
MaxGlobalBytes uint64
81+
// DefaultQueueTTL is the TTL applied to requests if not specified elsewhere
82+
// (eventually this comes from InferenceModel). Defaults to 10s.
83+
DefaultQueueTTL time.Duration
84+
// ExpiryCleanupInterval is the interval for cleaning up expired requests.
85+
// Defaults to 1s.
86+
ExpiryCleanupInterval time.Duration
87+
// InterModelFairnessPolicy defines the fairness algorithm ("FCFS",
88+
// "RoundRobin") by string. This is used if CustomFairnessPolicies are not
89+
// provided for a criticality.
90+
// Defaults to "FCFS".
91+
InterModelFairnessPolicy string
92+
// PreemptionStrategy defines the preemption algorithm ("None", ...).
93+
// Defaults to "None".
94+
PreemptionStrategy string
95+
// CustomFairnessPolicies allows injecting specific fairness policy instances
96+
// for each criticality band. If a policy is provided here for a given
97+
// criticality, it will be used instead of the one specified by
98+
// InterModelFairnessPolicy string.
99+
CustomFairnessPolicies map[v1alpha2.Criticality]types.FairnessPolicy
100+
// TODO: Add CustomPreemptionStrategies map[v1alpha2.Criticality]types.PreemptionStrategy
101+
// if similar custom injection is desired for preemption strategies.
102+
}
103+
104+
// validateAndApplyDefaults validates the provided config and assigns default
105+
// values. Called by NewFlowController.
106+
func (cfg *FlowControllerConfig) validateAndApplyDefaults() error {
107+
if cfg.MaxBytesPerCriticality == nil {
108+
cfg.MaxBytesPerCriticality = make(map[v1alpha2.Criticality]uint64)
109+
}
110+
111+
// Apply default capacities only if not specified.
112+
if _, ok := cfg.MaxBytesPerCriticality[v1alpha2.Critical]; !ok {
113+
cfg.MaxBytesPerCriticality[v1alpha2.Critical] = DefaultCriticalCapacityBytes
114+
}
115+
if _, ok := cfg.MaxBytesPerCriticality[v1alpha2.Standard]; !ok {
116+
cfg.MaxBytesPerCriticality[v1alpha2.Standard] = DefaultStandardCapacityBytes
117+
}
118+
if _, ok := cfg.MaxBytesPerCriticality[v1alpha2.Sheddable]; !ok {
119+
cfg.MaxBytesPerCriticality[v1alpha2.Sheddable] = DefaultSheddableCapacityBytes
120+
}
121+
122+
// Apply default TTL and intervals if zero/negative.
123+
if cfg.DefaultQueueTTL <= 0 {
124+
cfg.DefaultQueueTTL = DefaultQueueTTL
125+
}
126+
if cfg.ExpiryCleanupInterval <= 0 {
127+
cfg.ExpiryCleanupInterval = DefaultExpiryCleanupInterval
128+
}
129+
130+
// Apply default policies if empty and validate provided ones.
131+
if cfg.InterModelFairnessPolicy == "" {
132+
cfg.InterModelFairnessPolicy = PolicyFCFS
133+
} else {
134+
if _, ok := SupportedInterModelFairnessPolicies[cfg.InterModelFairnessPolicy]; !ok {
135+
return fmt.Errorf("unsupported or misconfigured InterModelFairnessPolicy: %s", cfg.InterModelFairnessPolicy)
136+
}
137+
}
138+
139+
if cfg.PreemptionStrategy == "" {
140+
cfg.PreemptionStrategy = PolicyNone
141+
} else {
142+
if _, ok := SupportedPreemptionStrategies[cfg.PreemptionStrategy]; !ok {
143+
return fmt.Errorf("unsupported PreemptionStrategy: %s", cfg.PreemptionStrategy)
144+
}
145+
}
146+
147+
return nil
148+
}
149+
150+
// LoadConfigFromEnv loads FlowControllerConfig from environment variables.
151+
// It also returns a boolean indicating if the FlowController is enabled.
152+
func LoadConfigFromEnv() (*FlowControllerConfig, bool) {
153+
// Use a default logger for initial configuration loading.
154+
logger := log.Log.WithName("flow-controller-config")
155+
156+
cfg := &FlowControllerConfig{
157+
MaxBytesPerCriticality: make(map[v1alpha2.Criticality]uint64),
158+
// Custom policies are not loaded from env vars
159+
}
160+
161+
cfg.MaxBytesPerCriticality[v1alpha2.Critical] = envutil.GetEnvUint64(EnvFcMaxBytesCritical, DefaultCriticalCapacityBytes, logger)
162+
cfg.MaxBytesPerCriticality[v1alpha2.Standard] = envutil.GetEnvUint64(EnvFcMaxBytesStandard, DefaultStandardCapacityBytes, logger)
163+
cfg.MaxBytesPerCriticality[v1alpha2.Sheddable] = envutil.GetEnvUint64(EnvFcMaxBytesSheddable, DefaultSheddableCapacityBytes, logger)
164+
165+
cfg.MaxGlobalBytes = envutil.GetEnvUint64(EnvFcMaxGlobalBytes, 0 /*Default is 0*/, logger)
166+
cfg.DefaultQueueTTL = envutil.GetEnvDuration(EnvFcDefaultQueueTTL, DefaultQueueTTL, logger)
167+
cfg.ExpiryCleanupInterval = envutil.GetEnvDuration(EnvFcExpiryCleanupInterval, DefaultExpiryCleanupInterval, logger)
168+
169+
cfg.InterModelFairnessPolicy = envutil.GetEnvString(EnvFcFairnessPolicy, PolicyFCFS, logger)
170+
cfg.PreemptionStrategy = envutil.GetEnvString(EnvFcPreemptionStrategy, PolicyNone, logger)
171+
172+
fcEnabled := envutil.GetEnvBool(EnvFcEnabled, DefaultFcEnabled, logger)
173+
174+
// Note: We don't call cfg.validateAndApplyDefaults() here.
175+
// It is called within NewFlowController to ensure defaults are applied just
176+
// before the controller is created, using the final config object.
177+
logger.Info("FlowController configuration loaded from env",
178+
"config", fmt.Sprintf("%+v", cfg),
179+
"fcEnabled", fcEnabled)
180+
return cfg, fcEnabled
181+
}

0 commit comments

Comments
 (0)