Skip to content

Commit 5e34778

Browse files
committed
Add flow controller.
1 parent 7c63c0d commit 5e34778

File tree

27 files changed

+7606
-231
lines changed

27 files changed

+7606
-231
lines changed

cmd/epp/main.go

Lines changed: 95 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"context"
2021
"flag"
2122
"fmt"
2223
"net"
@@ -41,8 +42,10 @@ import (
4142
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4243
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4344
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
45+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller"
4446
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4547
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
48+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4649
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4750
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
4851
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
@@ -157,13 +160,20 @@ func run() error {
157160
})
158161
setupLog.Info("Flags processed", "flags", flags)
159162

160-
// Init runtime.
163+
// --- Load Configurations from Environment Variables ---
164+
// Note: Scheduler config is loaded via its package init currently. We may
165+
// want to load it here explicitly:
166+
fcConfig, flowControllerEnabled := flowcontroller.LoadConfigFromEnv()
167+
sdConfig := saturationdetector.LoadConfigFromEnv()
168+
169+
// --- Get Kubernetes Config ---
161170
cfg, err := ctrl.GetConfig()
162171
if err != nil {
163-
setupLog.Error(err, "Failed to get rest config")
172+
setupLog.Error(err, "Failed to get Kubernetes rest config")
164173
return err
165174
}
166175

176+
// --- Setup Manager ---
167177
poolNamespacedName := types.NamespacedName{
168178
Name: *poolName,
169179
Namespace: *poolNamespace,
@@ -174,7 +184,7 @@ func run() error {
174184
return err
175185
}
176186

177-
// Set up mapper for metric scraping.
187+
// --- Setup Datastore ---
178188
mapping, err := backendmetrics.NewMetricMapping(
179189
*totalQueuedRequestsMetric,
180190
*kvCacheUsagePercentageMetric,
@@ -185,14 +195,12 @@ func run() error {
185195
return err
186196
}
187197
verifyMetricMapping(*mapping, setupLog)
188-
189198
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
190-
// Setup runner.
191199
ctx := ctrl.SetupSignalHandler()
200+
appDatastore := datastore.NewDatastore(ctx, pmf)
192201

193-
datastore := datastore.NewDatastore(ctx, pmf)
194-
195-
scheduler := scheduling.NewScheduler(datastore)
202+
// --- Initialize EPP Components ---
203+
appScheduler := scheduling.NewScheduler(appDatastore)
196204
if schedulerV2 == "true" {
197205
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
198206
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
@@ -213,41 +221,78 @@ func run() error {
213221
[]plugins.PostSchedule{},
214222
[]plugins.PostResponse{},
215223
schedConfigOpts...)
216-
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
224+
appScheduler = scheduling.NewSchedulerWithConfig(appDatastore, schedulerConfig)
225+
}
226+
227+
appSaturationDetector, err := saturationdetector.NewDetector(
228+
*sdConfig,
229+
appDatastore,
230+
ctrl.Log.WithName("saturation-detector"),
231+
)
232+
if err != nil {
233+
setupLog.Error(err, "Failed to create SaturationDetector")
234+
return err
235+
}
236+
237+
var appFlowController *flowcontroller.FlowController
238+
if flowControllerEnabled {
239+
appFlowController, err = flowcontroller.NewFlowController(
240+
appSaturationDetector,
241+
fcConfig,
242+
)
243+
if err != nil {
244+
setupLog.Error(err, "Failed to create FlowController")
245+
return err
246+
}
247+
setupLog.Info("FlowController enabled and initialized.")
248+
} else {
249+
setupLog.Info("FlowController is disabled via configuration.")
217250
}
251+
252+
// --- Setup ExtProc Server Runner ---
218253
serverRunner := &runserver.ExtProcServerRunner{
219254
GrpcPort: *grpcPort,
220255
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
221256
DestinationEndpointHintKey: *destinationEndpointHintKey,
222257
PoolNamespacedName: poolNamespacedName,
223-
Datastore: datastore,
258+
Datastore: appDatastore,
224259
SecureServing: *secureServing,
225260
CertPath: *certPath,
226261
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
227-
Scheduler: scheduler,
262+
Scheduler: appScheduler,
263+
FlowController: appFlowController, // Pass instance (can be nil)
264+
SaturationDetector: appSaturationDetector,
265+
FlowControllerEnabled: flowControllerEnabled,
228266
}
229267
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
230-
setupLog.Error(err, "Failed to setup ext-proc controllers")
268+
setupLog.Error(err, "Failed to setup EPP controllers")
269+
return err
270+
}
271+
272+
// --- Add Runnables to Manager ---
273+
274+
// Register FlowController Run loop.
275+
if err := registerFlowController(mgr, appFlowController); err != nil {
231276
return err
232277
}
233278

234279
// Register health server.
235-
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil {
280+
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), appDatastore, *grpcHealthPort); err != nil {
236281
return err
237282
}
238283

239284
// Register ext-proc server.
240-
if err := mgr.Add(serverRunner.AsRunnable(ctrl.Log.WithName("ext-proc"))); err != nil {
241-
setupLog.Error(err, "Failed to register ext-proc gRPC server")
285+
if err := registerExtProcServer(mgr, serverRunner, ctrl.Log.WithName("ext-proc")); err != nil {
242286
return err
243287
}
244288

245289
// Register metrics handler.
246-
if err := registerMetricsHandler(mgr, *metricsPort, cfg, datastore); err != nil {
290+
if err := registerMetricsHandler(mgr, *metricsPort, cfg, appDatastore); err != nil {
247291
return err
248292
}
249293

250-
// Start the manager. This blocks until a signal is received.
294+
// --- Start Manager ---
295+
// This blocks until a signal is received.
251296
setupLog.Info("Controller manager starting")
252297
if err := mgr.Start(ctx); err != nil {
253298
setupLog.Error(err, "Error starting controller manager")
@@ -275,6 +320,39 @@ func initLogging(opts *zap.Options) {
275320
ctrl.SetLogger(logger)
276321
}
277322

323+
// registerFlowController adds the FlowController Run loop as a Runnable to the
324+
// manager.
325+
func registerFlowController(mgr manager.Manager, fc *flowcontroller.FlowController) error {
326+
if fc == nil {
327+
setupLog.Info("FlowController is nil (disabled), skipping registration.")
328+
return nil // Not an error if it's intentionally disabled
329+
}
330+
if err := mgr.Add(manager.RunnableFunc(func(runCtx context.Context) error {
331+
fcLog := ctrl.Log.WithName("flowcontroller-runnable")
332+
fcLog.Info("Starting FlowController Run loop")
333+
// Run the FlowController; it handles context cancellation internally.
334+
fc.Run(runCtx)
335+
fcLog.Info("FlowController Run loop stopped")
336+
return nil
337+
})); err != nil {
338+
setupLog.Error(err, "Failed to add FlowController runnable to manager")
339+
return err
340+
}
341+
setupLog.Info("FlowController Run loop added to manager.")
342+
return nil
343+
}
344+
345+
// registerExtProcServer adds the ExtProcServerRunner as a Runnable to the
346+
// manager.
347+
func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerRunner, logger logr.Logger) error {
348+
if err := mgr.Add(runner.AsRunnable(logger)); err != nil {
349+
setupLog.Error(err, "Failed to register ext-proc gRPC server runnable")
350+
return err
351+
}
352+
setupLog.Info("ExtProc server runner added to manager.")
353+
return nil
354+
}
355+
278356
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
279357
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error {
280358
srv := grpc.NewServer()
@@ -364,5 +442,4 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge
364442
if mapping.LoraRequestInfo == nil {
365443
logger.Info("Not scraping metric: LoraRequestInfo")
366444
}
367-
368445
}

0 commit comments

Comments
 (0)