Skip to content

Commit 5e1171e

Browse files
committed
Add scheduler for queuing and fairness.
1 parent 6058b09 commit 5e1171e

File tree

12 files changed

+1188
-142
lines changed

12 files changed

+1188
-142
lines changed

cmd/epp/main.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4141
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4242
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
43+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4344
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
4445
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
4546
)
@@ -102,6 +103,27 @@ var (
102103
loraInfoMetric = flag.String("loraInfoMetric",
103104
"vllm:lora_requests_info",
104105
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
106+
// Scheduling config flags
107+
totalQueueCapacity = flag.Uint64(
108+
"totalQueueCapacity",
109+
scheduling.DefaultTotalQueueCapacity,
110+
"Total capacity (in bytes) of the queue across all models and criticality bands.",
111+
)
112+
modelQueueCapacity = flag.Uint64(
113+
"modelQueueCapacity",
114+
scheduling.DefaultModelQueueCapacity,
115+
"Capacity (in bytes) of the per-model queues.",
116+
)
117+
queueTTL = flag.Duration(
118+
"queueTTL",
119+
scheduling.DefaultQueueTTL,
120+
"TTL for requests in the queue.",
121+
)
122+
expiryCleanupInterval = flag.Duration(
123+
"expiryCleanupInterval",
124+
scheduling.DefaultExpiryCleanupInterval,
125+
"Interval for cleaning up expired requests from the queue.",
126+
)
105127

106128
setupLog = ctrl.Log.WithName("setup")
107129
)
@@ -180,6 +202,12 @@ func run() error {
180202
CertPath: *certPath,
181203
UseStreaming: useStreamingServer,
182204
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
205+
QueueConfig: scheduling.QueueConfig{
206+
TotalQueueCapacity: *totalQueueCapacity,
207+
ModelQueueCapacity: *modelQueueCapacity,
208+
QueueTTL: *queueTTL,
209+
ExpiryCleanupInterval: *expiryCleanupInterval,
210+
},
183211
}
184212
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
185213
setupLog.Error(err, "Failed to setup ext-proc controllers")

pkg/epp/datastore/datastore.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -302,13 +302,6 @@ func stripLabelKeyAliasFromLabelMap(labels map[v1alpha2.LabelKey]v1alpha2.LabelV
302302
return outMap
303303
}
304304

305-
func IsCritical(model *v1alpha2.InferenceModel) bool {
306-
if model.Spec.Criticality != nil && *model.Spec.Criticality == v1alpha2.Critical {
307-
return true
308-
}
309-
return false
310-
}
311-
312305
// TODO: move out to share with pod_reconciler.go
313306
func podIsReady(pod *corev1.Pod) bool {
314307
for _, condition := range pod.Status.Conditions {

pkg/epp/handlers/request.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@ package handlers
1919
import (
2020
"context"
2121
"encoding/json"
22+
"errors"
2223
"fmt"
2324
"strconv"
2425

2526
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2627
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2728
"google.golang.org/protobuf/types/known/structpb"
2829
"sigs.k8s.io/controller-runtime/pkg/log"
29-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
30+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
3031
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
3132
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
3233
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -77,7 +78,7 @@ func (s *Server) HandleRequestBody(
7778
llmReq := &schedulingtypes.LLMRequest{
7879
Model: model,
7980
ResolvedTargetModel: modelName,
80-
Critical: datastore.IsCritical(modelObj),
81+
Criticality: *modelObj.Spec.Criticality,
8182
}
8283
loggerVerbose.Info("LLM request assembled", "request", llmReq)
8384

@@ -94,9 +95,43 @@ func (s *Server) HandleRequestBody(
9495
loggerVerbose.Info("Updated request body marshalled", "body", string(requestBody))
9596
}
9697

97-
target, err := s.scheduler.Schedule(ctx, llmReq)
98+
schedulableReq, err := newSchedulableRequestFromContext(reqCtx)
9899
if err != nil {
99-
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
100+
return nil, errutil.Error{Code: errutil.Internal, Msg: err.Error()}
101+
}
102+
target, evictionReason, err := s.queueController.Schedule(schedulableReq)
103+
if err != nil {
104+
logger.Error(err, "Failed to schedule request", "evictionReason", evictionReason.String())
105+
switch {
106+
case errors.Is(err, scheduling.ErrEvicted):
107+
// Handle eviction errors, including the eviction reason.
108+
switch evictionReason {
109+
case scheduling.ReasonTTLExpiry:
110+
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Sprintf("request evicted due to TTL expiry: %v", err)}
111+
case scheduling.ReasonExternalContextExpiry:
112+
// TODO: determine if this is an appropriate code. For expiry due to
113+
// gateway timeout, I think it makes sense. For manual cancellation, I
114+
// am not certain.
115+
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Sprintf("request evicted due to external context expiry: %v", err)}
116+
case scheduling.ReasonPreempted:
117+
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Sprintf("request evicted due to preemption: %v", err)}
118+
case scheduling.ReasonCannotFindBackend:
119+
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Sprintf("request evicted due to failure to find a suitable backend: %v", err)}
120+
default:
121+
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Sprintf("request evicted for unknown reason: %v", err)}
122+
}
123+
case errors.Is(err, scheduling.ErrModelAtCapacity):
124+
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Sprintf("model at capacity: %v", err)}
125+
case errors.Is(err, scheduling.ErrCannotFindBackend):
126+
return nil, errutil.Error{Code: errutil.Unknown, Msg: fmt.Sprintf("cannot find suitable backend for non-pool exhaustion reason: %v", err)}
127+
default:
128+
// Handle other errors.
129+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("failed to schedule request: %v", err)}
130+
}
131+
}
132+
if target == nil || target.GetPod() == nil {
133+
// This should be unreachable.
134+
return nil, errutil.Error{Code: errutil.Internal, Msg: "target pod is nil, request was likely evicted"}
100135
}
101136
targetPod := target.GetPod()
102137

pkg/epp/handlers/server.go

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package handlers
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"io"
2223
"time"
2324

@@ -28,14 +29,15 @@ import (
2829
"sigs.k8s.io/controller-runtime/pkg/log"
2930
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3031
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
32+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
3133
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
3234
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
3335
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3436
)
3537

36-
func NewServer(scheduler Scheduler, destinationEndpointHintMetadataNamespace, destinationEndpointHintKey string, datastore datastore.Datastore) *Server {
38+
func NewServer(queueController QueueController, destinationEndpointHintMetadataNamespace, destinationEndpointHintKey string, datastore datastore.Datastore) *Server {
3739
return &Server{
38-
scheduler: scheduler,
40+
queueController: queueController,
3941
destinationEndpointHintMetadataNamespace: destinationEndpointHintMetadataNamespace,
4042
destinationEndpointHintKey: destinationEndpointHintKey,
4143
datastore: datastore,
@@ -45,7 +47,7 @@ func NewServer(scheduler Scheduler, destinationEndpointHintMetadataNamespace, de
4547
// Server implements the Envoy external processing server.
4648
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto
4749
type Server struct {
48-
scheduler Scheduler
50+
queueController QueueController
4951
// The key of the header to specify the target pod address. This value needs to match Envoy
5052
// configuration.
5153
destinationEndpointHintKey string
@@ -55,8 +57,8 @@ type Server struct {
5557
datastore datastore.Datastore
5658
}
5759

58-
type Scheduler interface {
59-
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (targetPod schedulingtypes.Pod, err error)
60+
type QueueController interface {
61+
Schedule(req scheduling.SchedulableRequest) (targetPod schedulingtypes.Pod, evictionReason scheduling.EvictionReason, err error)
6062
}
6163

6264
func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
@@ -217,12 +219,15 @@ func BuildErrResponse(err error) (*extProcPb.ProcessingResponse, error) {
217219

218220
// RequestContext stores context information during the life time of an HTTP request.
219221
type RequestContext struct {
222+
Context context.Context
223+
CancelFunc context.CancelFunc
220224
TargetPod string
221225
TargetEndpoint string
222226
Model string
223227
ResolvedTargetModel string
224228
RequestReceivedTimestamp time.Time
225229
ResponseCompleteTimestamp time.Time
230+
Request schedulingtypes.LLMRequest
226231
RequestSize int
227232
Usage Usage
228233
ResponseSize int
@@ -254,3 +259,36 @@ const (
254259
BodyResponseResponsesComplete StreamRequestState = 6
255260
TrailerResponseResponsesComplete StreamRequestState = 7
256261
)
262+
263+
type schedulableRequest struct {
264+
schedulingtypes.LLMRequest
265+
size uint64
266+
ctx context.Context
267+
}
268+
269+
func (s *schedulableRequest) Context() context.Context {
270+
return s.ctx
271+
}
272+
273+
func (s *schedulableRequest) Request() *schedulingtypes.LLMRequest {
274+
return &s.LLMRequest
275+
}
276+
277+
func (s *schedulableRequest) Size() uint64 {
278+
return uint64(s.size)
279+
}
280+
281+
// newSchedulableRequestFromContext creates a new schedulableRequest from a
282+
// RequestContext. It contains the minimal RequestContext information necessary
283+
// for scheduling to reduce memory in the queue.
284+
// It returns an error if the RequestContext is invalid.
285+
func newSchedulableRequestFromContext(reqCtx *RequestContext) (*schedulableRequest, error) {
286+
if reqCtx == nil || reqCtx.Context == nil || reqCtx.Request.Model == "" || reqCtx.RequestSize == 0 {
287+
return nil, fmt.Errorf("invalid RequestContext")
288+
}
289+
return &schedulableRequest{
290+
ctx: reqCtx.Context,
291+
size: uint64(reqCtx.RequestSize),
292+
LLMRequest: reqCtx.Request,
293+
}, nil
294+
}

pkg/epp/handlers/streamingserver.go

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package handlers
1919
import (
2020
"context"
2121
"encoding/json"
22+
"errors"
2223
"fmt"
2324
"io"
2425
"math/rand"
@@ -37,22 +38,23 @@ import (
3738
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3839
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3940
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
41+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4042
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
4143
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
4244
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
4345
)
4446

45-
func NewStreamingServer(scheduler Scheduler, destinationEndpointHintMetadataNamespace, destinationEndpointHintKey string, datastore datastore.Datastore) *StreamingServer {
47+
func NewStreamingServer(queueController QueueController, destinationEndpointHintMetadataNamespace, destinationEndpointHintKey string, datastore datastore.Datastore) *StreamingServer {
4648
return &StreamingServer{
47-
scheduler: scheduler,
49+
queueController: queueController,
4850
destinationEndpointHintMetadataNamespace: destinationEndpointHintMetadataNamespace,
4951
destinationEndpointHintKey: destinationEndpointHintKey,
5052
datastore: datastore,
5153
}
5254
}
5355

5456
type StreamingServer struct {
55-
scheduler Scheduler
57+
queueController QueueController
5658
// The key of the header to specify the target pod address. This value needs to match Envoy
5759
// configuration.
5860
destinationEndpointHintKey string
@@ -348,9 +350,9 @@ func (s *StreamingServer) HandleRequestBody(
348350
llmReq := &schedulingtypes.LLMRequest{
349351
Model: model,
350352
ResolvedTargetModel: modelName,
351-
Critical: datastore.IsCritical(modelObj),
353+
Criticality: *modelObj.Spec.Criticality,
352354
}
353-
logger.V(logutil.DEBUG).Info("LLM request assembled", "model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "critical", llmReq.Critical)
355+
logger.V(logutil.DEBUG).Info("LLM request assembled", "model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "criticality", llmReq.Criticality)
354356

355357
var err error
356358
// Update target models in the body.
@@ -364,9 +366,43 @@ func (s *StreamingServer) HandleRequestBody(
364366
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)}
365367
}
366368

367-
target, err := s.scheduler.Schedule(ctx, llmReq)
369+
schedulableReq, err := newSchedulableRequestFromContext(reqCtx)
368370
if err != nil {
369-
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
371+
return nil, errutil.Error{Code: errutil.Internal, Msg: err.Error()}
372+
}
373+
target, evictionReason, err := s.queueController.Schedule(schedulableReq)
374+
if err != nil {
375+
logger.Error(err, "Failed to schedule request", "evictionReason", evictionReason.String())
376+
switch {
377+
case errors.Is(err, scheduling.ErrEvicted):
378+
// Handle eviction errors, including the eviction reason.
379+
switch evictionReason {
380+
case scheduling.ReasonTTLExpiry:
381+
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Sprintf("request evicted due to TTL expiry: %v", err)}
382+
case scheduling.ReasonExternalContextExpiry:
383+
// TODO: determine if this is an appropriate code. For expiry due to
384+
// gateway timeout, I think it makes sense. For manual cancellation, I
385+
// am not certain.
386+
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Sprintf("request evicted due to external context expiry: %v", err)}
387+
case scheduling.ReasonPreempted:
388+
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Sprintf("request evicted due to preemption: %v", err)}
389+
case scheduling.ReasonCannotFindBackend:
390+
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Sprintf("request evicted due to failure to find a suitable backend: %v", err)}
391+
default:
392+
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Sprintf("request evicted for unknown reason: %v", err)}
393+
}
394+
case errors.Is(err, scheduling.ErrModelAtCapacity):
395+
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Sprintf("model at capacity: %v", err)}
396+
case errors.Is(err, scheduling.ErrCannotFindBackend):
397+
return reqCtx, errutil.Error{Code: errutil.Unknown, Msg: fmt.Sprintf("cannot find suitable backend for non-pool exhaustion reason: %v", err)}
398+
default:
399+
// Handle other errors.
400+
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("failed to schedule request: %v", err)}
401+
}
402+
}
403+
if target == nil || target.GetPod() == nil {
404+
// This should be unreachable.
405+
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "target pod is nil, request was likely evicted"}
370406
}
371407
targetPod := target.GetPod()
372408

pkg/epp/scheduling/filter.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,6 @@ func leastQueuingFilterFunc(ctx *types.Context, pods []*types.PodMetrics) ([]*ty
160160
return filtered, nil
161161
}
162162

163-
var lowQueueFilter = &basicFilter{
164-
name: "low queueing filter",
165-
filter: toFilterFunc((queueThresholdPredicate(config.QueueingThresholdLoRA))),
166-
}
167-
168163
var leastKVCacheFilter = &basicFilter{
169164
name: "least KV cache percent",
170165
filter: leastKVCacheFilterFunc,

0 commit comments

Comments
 (0)