Skip to content

Commit 728bdcf

Browse files
committed
Add flow controller.
1 parent 66b9889 commit 728bdcf

29 files changed

+8302
-633
lines changed

cmd/epp/main.go

Lines changed: 93 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"context"
2021
"flag"
2122
"fmt"
2223
"net"
@@ -40,8 +41,10 @@ import (
4041
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4142
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4243
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
44+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller"
4345
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4446
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
47+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4548
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4649
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
4750
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -136,13 +139,20 @@ func run() error {
136139
})
137140
setupLog.Info("Flags processed", "flags", flags)
138141

139-
// Init runtime.
142+
// --- Load Configurations from Environment Variables ---
143+
// Note: Scheduler config is loaded via its package init currently. We may
144+
// want to load it here explicitly:
145+
fcConfig, flowControllerEnabled := flowcontroller.LoadConfigFromEnv()
146+
sdConfig := saturationdetector.LoadConfigFromEnv()
147+
148+
// --- Get Kubernetes Config ---
140149
cfg, err := ctrl.GetConfig()
141150
if err != nil {
142-
setupLog.Error(err, "Failed to get rest config")
151+
setupLog.Error(err, "Failed to get Kubernetes rest config")
143152
return err
144153
}
145154

155+
// --- Setup Manager ---
146156
poolNamespacedName := types.NamespacedName{
147157
Name: *poolName,
148158
Namespace: *poolNamespace,
@@ -153,7 +163,7 @@ func run() error {
153163
return err
154164
}
155165

156-
// Set up mapper for metric scraping.
166+
// --- Setup Datastore ---
157167
mapping, err := backendmetrics.NewMetricMapping(
158168
*totalQueuedRequestsMetric,
159169
*kvCacheUsagePercentageMetric,
@@ -164,47 +174,82 @@ func run() error {
164174
return err
165175
}
166176
verifyMetricMapping(*mapping, setupLog)
167-
168177
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
169-
// Setup runner.
170178
ctx := ctrl.SetupSignalHandler()
179+
appDatastore := datastore.NewDatastore(ctx, pmf)
171180

172-
datastore := datastore.NewDatastore(ctx, pmf)
181+
// --- Initialize EPP Components ---
182+
appScheduler := scheduling.NewScheduler(appDatastore)
183+
184+
appSaturationDetector, err := saturationdetector.NewDetector(
185+
*sdConfig,
186+
appDatastore,
187+
ctrl.Log.WithName("saturation-detector"),
188+
)
189+
if err != nil {
190+
setupLog.Error(err, "Failed to create SaturationDetector")
191+
return err
192+
}
173193

174-
scheduler := scheduling.NewScheduler(datastore)
194+
var appFlowController *flowcontroller.FlowController
195+
if flowControllerEnabled {
196+
appFlowController, err = flowcontroller.NewFlowController(
197+
appSaturationDetector,
198+
fcConfig,
199+
)
200+
if err != nil {
201+
setupLog.Error(err, "Failed to create FlowController")
202+
return err
203+
}
204+
setupLog.Info("FlowController enabled and initialized.")
205+
} else {
206+
setupLog.Info("FlowController is disabled via configuration.")
207+
}
208+
209+
// --- Setup ExtProc Server Runner ---
175210
serverRunner := &runserver.ExtProcServerRunner{
176211
GrpcPort: *grpcPort,
177212
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
178213
DestinationEndpointHintKey: *destinationEndpointHintKey,
179214
PoolNamespacedName: poolNamespacedName,
180-
Datastore: datastore,
215+
Datastore: appDatastore,
181216
SecureServing: *secureServing,
182217
CertPath: *certPath,
183218
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
184-
Scheduler: scheduler,
219+
Scheduler: appScheduler,
220+
FlowController: appFlowController, // Pass instance (can be nil)
221+
SaturationDetector: appSaturationDetector,
222+
FlowControllerEnabled: flowControllerEnabled,
185223
}
186224
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
187-
setupLog.Error(err, "Failed to setup ext-proc controllers")
225+
setupLog.Error(err, "Failed to setup EPP controllers")
226+
return err
227+
}
228+
229+
// --- Add Runnables to Manager ---
230+
231+
// Register FlowController Run loop.
232+
if err := registerFlowController(mgr, appFlowController); err != nil {
188233
return err
189234
}
190235

191236
// Register health server.
192-
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil {
237+
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), appDatastore, *grpcHealthPort); err != nil {
193238
return err
194239
}
195240

196241
// Register ext-proc server.
197-
if err := mgr.Add(serverRunner.AsRunnable(ctrl.Log.WithName("ext-proc"))); err != nil {
198-
setupLog.Error(err, "Failed to register ext-proc gRPC server")
242+
if err := registerExtProcServer(mgr, serverRunner, ctrl.Log.WithName("ext-proc")); err != nil {
199243
return err
200244
}
201245

202246
// Register metrics handler.
203-
if err := registerMetricsHandler(mgr, *metricsPort, cfg, datastore); err != nil {
247+
if err := registerMetricsHandler(mgr, *metricsPort, cfg, appDatastore); err != nil {
204248
return err
205249
}
206250

207-
// Start the manager. This blocks until a signal is received.
251+
// --- Start Manager ---
252+
// This blocks until a signal is received.
208253
setupLog.Info("Controller manager starting")
209254
if err := mgr.Start(ctx); err != nil {
210255
setupLog.Error(err, "Error starting controller manager")
@@ -232,6 +277,39 @@ func initLogging(opts *zap.Options) {
232277
ctrl.SetLogger(logger)
233278
}
234279

280+
// registerFlowController adds the FlowController Run loop as a Runnable to the
281+
// manager.
282+
func registerFlowController(mgr manager.Manager, fc *flowcontroller.FlowController) error {
283+
if fc == nil {
284+
setupLog.Info("FlowController is nil (disabled), skipping registration.")
285+
return nil // Not an error if it's intentionally disabled
286+
}
287+
if err := mgr.Add(manager.RunnableFunc(func(runCtx context.Context) error {
288+
fcLog := ctrl.Log.WithName("flowcontroller-runnable")
289+
fcLog.Info("Starting FlowController Run loop")
290+
// Run the FlowController; it handles context cancellation internally.
291+
fc.Run(runCtx)
292+
fcLog.Info("FlowController Run loop stopped")
293+
return nil
294+
})); err != nil {
295+
setupLog.Error(err, "Failed to add FlowController runnable to manager")
296+
return err
297+
}
298+
setupLog.Info("FlowController Run loop added to manager.")
299+
return nil
300+
}
301+
302+
// registerExtProcServer adds the ExtProcServerRunner as a Runnable to the
303+
// manager.
304+
func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerRunner, logger logr.Logger) error {
305+
if err := mgr.Add(runner.AsRunnable(logger)); err != nil {
306+
setupLog.Error(err, "Failed to register ext-proc gRPC server runnable")
307+
return err
308+
}
309+
setupLog.Info("ExtProc server runner added to manager.")
310+
return nil
311+
}
312+
235313
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
236314
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error {
237315
srv := grpc.NewServer()
@@ -321,5 +399,4 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge
321399
if mapping.LoraRequestInfo == nil {
322400
logger.Info("Not scraping metric: LoraRequestInfo")
323401
}
324-
325402
}

0 commit comments

Comments
 (0)