Skip to content

Commit 2fba74b

Browse files
[feat] Add running requests scorer and tests (#1957)
* Add running requests scorer and tests * Remove running request scorer from epp config * Rename RunningQueueSize metric and scorer to RunningRequestsSize * Fix whitespace lint
1 parent 4fcc24f commit 2fba74b

File tree

12 files changed

+217
-25
lines changed

12 files changed

+217
-25
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ func (r *Runner) registerInTreePlugins() {
447447
plugins.Register(profile.SingleProfileHandlerType, profile.SingleProfileHandlerFactory)
448448
plugins.Register(scorer.KvCacheUtilizationScorerType, scorer.KvCacheUtilizationScorerFactory)
449449
plugins.Register(scorer.QueueScorerType, scorer.QueueScorerFactory)
450+
plugins.Register(scorer.RunningRequestsSizeScorerType, scorer.RunningRequestsSizeScorerFactory)
450451
plugins.Register(scorer.LoraAffinityScorerType, scorer.LoraAffinityScorerFactory)
451452
// Latency predictor plugins
452453
plugins.Register(slo_aware_router.SLOAwareRouterPluginType, slo_aware_router.SLOAwareRouterFactory)

pkg/epp/backend/metrics/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (p *PodMetricsClientImpl) promToPodMetrics(
100100
if p.MetricMapping.TotalRunningRequests != nil {
101101
running, err := p.getMetric(metricFamilies, *p.MetricMapping.TotalRunningRequests)
102102
if err == nil {
103-
updated.RunningQueueSize = int(running.GetGauge().GetValue())
103+
updated.RunningRequestsSize = int(running.GetGauge().GetValue())
104104
} else {
105105
errs = multierr.Append(errs, err)
106106
}

pkg/epp/datalayer/metrics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type Metrics struct {
2828
WaitingModels map[string]int
2929
// MaxActiveModels is the maximum number of models that can be loaded to GPU.
3030
MaxActiveModels int
31-
RunningQueueSize int
31+
RunningRequestsSize int
3232
WaitingQueueSize int
3333
KVCacheUsagePercent float64
3434
KvCacheMaxTokenCapacity int
@@ -74,7 +74,7 @@ func (m *Metrics) Clone() *Metrics {
7474
ActiveModels: activeModels,
7575
WaitingModels: waitingModels,
7676
MaxActiveModels: m.MaxActiveModels,
77-
RunningQueueSize: m.RunningQueueSize,
77+
RunningRequestsSize: m.RunningRequestsSize,
7878
WaitingQueueSize: m.WaitingQueueSize,
7979
KVCacheUsagePercent: m.KVCacheUsagePercent,
8080
KvCacheMaxTokenCapacity: m.KvCacheMaxTokenCapacity,

pkg/epp/datalayer/metrics/extractor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type Extractor struct {
5656
func Produces() map[string]any {
5757
return map[string]any{
5858
metrics.WaitingQueueSizeKey: int(0),
59+
metrics.RunningRequestsSizeKey: int(0),
5960
metrics.KVCacheUsagePercentKey: float64(0),
6061
metrics.ActiveModelsKey: map[string]int{},
6162
metrics.WaitingModelsKey: map[string]int{},
@@ -119,7 +120,7 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi
119120
if metric, err := spec.getLatestMetric(families); err != nil {
120121
errs = append(errs, err)
121122
} else {
122-
clone.RunningQueueSize = int(extractValue(metric))
123+
clone.RunningRequestsSize = int(extractValue(metric))
123124
updated = true
124125
}
125126
}

pkg/epp/datalayer/metrics/logger_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func TestLogger(t *testing.T) {
7474
assert.Contains(t, logOutput, "Refreshing Prometheus Metrics {\"ReadyPods\": 2}")
7575
assert.Contains(t, logOutput, "Current Pods and metrics gathered {\"Fresh metrics\": \"[Metadata: {NamespacedName:default/pod1 PodName: Address:1.2.3.4:5678")
7676
assert.Contains(t, logOutput, "Metrics: {ActiveModels:map[modelA:1] WaitingModels:map[modelB:2] MaxActiveModels:5")
77-
assert.Contains(t, logOutput, "RunningQueueSize:3 WaitingQueueSize:7 KVCacheUsagePercent:42.5 KvCacheMaxTokenCapacity:2048")
77+
assert.Contains(t, logOutput, "RunningRequestsSize:3 WaitingQueueSize:7 KVCacheUsagePercent:42.5 KvCacheMaxTokenCapacity:2048")
7878
assert.Contains(t, logOutput, "Metadata: {NamespacedName:default/pod2 PodName: Address:1.2.3.4:5679")
7979
assert.Contains(t, logOutput, "\"Stale metrics\": \"[]\"")
8080
}
@@ -106,7 +106,7 @@ func (f *fakeDataStore) PodList(predicate func(datalayer.Endpoint) bool) []datal
106106
ActiveModels: map[string]int{"modelA": 1},
107107
WaitingModels: map[string]int{"modelB": 2},
108108
MaxActiveModels: 5,
109-
RunningQueueSize: 3,
109+
RunningRequestsSize: 3,
110110
WaitingQueueSize: 7,
111111
KVCacheUsagePercent: 42.5,
112112
KvCacheMaxTokenCapacity: 2048,

pkg/epp/datalayer/metrics_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestMetricsClone(t *testing.T) {
2929
ActiveModels: map[string]int{"modelA": 1},
3030
WaitingModels: map[string]int{"modelB": 2},
3131
MaxActiveModels: 5,
32-
RunningQueueSize: 3,
32+
RunningRequestsSize: 3,
3333
WaitingQueueSize: 7,
3434
KVCacheUsagePercent: 42.5,
3535
KvCacheMaxTokenCapacity: 2048,

pkg/epp/metrics/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const (
3737

3838
KVCacheUsagePercentKey = "KVCacheUsagePercent"
3939
WaitingQueueSizeKey = "WaitingQueueSize"
40+
RunningRequestsSizeKey = "RunningRequestsSize"
4041
MaxActiveModelsKey = "MaxActiveModels"
4142
ActiveModelsKey = "ActiveModels"
4243
WaitingModelsKey = "WaitingModels"

pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/latencypredictor_helper.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func processHeaderForLatencyPrediction(
8787
KVCachePercentage: m.KVCacheUsagePercent,
8888
InputTokenLength: len(strings.Fields(sloCtx.schedulingRequest.Body.Completions.Prompt)),
8989
NumRequestWaiting: m.WaitingQueueSize,
90-
NumRequestRunning: m.RunningQueueSize,
90+
NumRequestRunning: m.RunningRequestsSize,
9191
NumTokensGenerated: 0,
9292
PrefixCacheScore: prefix_cache_score,
9393
}
@@ -174,7 +174,7 @@ func recordTTFTTrainingData(
174174
ActualTPOT: 0,
175175
Timestamp: now,
176176
NumRequestWaiting: m.WaitingQueueSize,
177-
NumRequestRunning: m.RunningQueueSize,
177+
NumRequestRunning: m.RunningRequestsSize,
178178
NumTokensGenerated: 0,
179179
PrefixCacheScore: prefixCacheScore,
180180
}
@@ -201,7 +201,7 @@ func predictFirstTPOT(
201201
KVCachePercentage: m.KVCacheUsagePercent,
202202
InputTokenLength: len(strings.Fields(sloCtx.schedulingRequest.Body.Completions.Prompt)),
203203
NumRequestWaiting: m.WaitingQueueSize,
204-
NumRequestRunning: m.RunningQueueSize,
204+
NumRequestRunning: m.RunningRequestsSize,
205205
NumTokensGenerated: sloCtx.generatedTokenCount,
206206
PrefixCacheScore: 0,
207207
}
@@ -260,7 +260,7 @@ func processTokenForLatencyPrediction(
260260
ActualTPOT: latencyMs,
261261
Timestamp: now,
262262
NumRequestWaiting: m.WaitingQueueSize,
263-
NumRequestRunning: m.RunningQueueSize,
263+
NumRequestRunning: m.RunningRequestsSize,
264264
NumTokensGenerated: sloCtx.generatedTokenCount - 1,
265265
PrefixCacheScore: 0, // TPOT does not use prefix cache score
266266
}
@@ -274,7 +274,7 @@ func processTokenForLatencyPrediction(
274274
KVCachePercentage: m.KVCacheUsagePercent,
275275
InputTokenLength: len(strings.Fields(sloCtx.schedulingRequest.Body.Completions.Prompt)),
276276
NumRequestWaiting: m.WaitingQueueSize,
277-
NumRequestRunning: m.RunningQueueSize,
277+
NumRequestRunning: m.RunningRequestsSize,
278278
NumTokensGenerated: sloCtx.generatedTokenCount,
279279
PrefixCacheScore: 0, // TPOT does not use prefix cache score
280280
}
@@ -337,7 +337,7 @@ func bulkPredictWithMetrics(
337337
KVCachePercentage: metricsStates[i].KVCacheUsagePercent,
338338
InputTokenLength: len(strings.Fields(prompts[i])),
339339
NumRequestWaiting: metricsStates[i].WaitingQueueSize,
340-
NumRequestRunning: metricsStates[i].RunningQueueSize,
340+
NumRequestRunning: metricsStates[i].RunningRequestsSize,
341341
NumTokensGenerated: generatedTokenCounts[i],
342342
PrefixCacheScore: prefixCacheScores[i],
343343
}
@@ -385,7 +385,7 @@ func bulkPredictWithMetrics(
385385
"generated_tokens", bulkRequests[i].NumTokensGenerated,
386386
"kv_cache_percent", bulkRequests[i].KVCachePercentage,
387387
"waiting_queue", bulkRequests[i].NumRequestWaiting,
388-
"running_queue", bulkRequests[i].NumRequestRunning,
388+
"running_requests", bulkRequests[i].NumRequestRunning,
389389
"prefix_cache_score", bulkRequests[i].PrefixCacheScore)
390390
}
391391
}

pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/requestcontrol_hooks_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,17 @@ import (
3636
)
3737

3838
const (
39-
testModelName = "test-model"
40-
kvUsage = 1
41-
runningQueue = 1
42-
waitingQueue = 1
39+
testModelName = "test-model"
40+
kvUsage = 1
41+
runningRequests = 1
42+
waitingQueue = 1
4343
)
4444

4545
// Helper functions
4646

4747
func createTestSchedulingResult(pod *backend.Pod) *schedulingtypes.SchedulingResult {
4848

49-
mockPod := createTestPod(pod.NamespacedName.Name, kvUsage, runningQueue, waitingQueue)
49+
mockPod := createTestPod(pod.NamespacedName.Name, kvUsage, runningRequests, waitingQueue)
5050

5151
return &schedulingtypes.SchedulingResult{
5252
PrimaryProfileName: "default",
@@ -343,12 +343,12 @@ func TestSLOAwareRouter_ResponseStreaming_FirstToken(t *testing.T) {
343343
sloCtx.lastSeenMetrics["prefill"] = &backendmetrics.MetricsState{
344344
KVCacheUsagePercent: 0.5,
345345
WaitingQueueSize: 1,
346-
RunningQueueSize: 1,
346+
RunningRequestsSize: 1,
347347
}
348348
sloCtx.lastSeenMetrics["default"] = &backendmetrics.MetricsState{
349349
KVCacheUsagePercent: 0.5,
350350
WaitingQueueSize: 1,
351-
RunningQueueSize: 1,
351+
RunningRequestsSize: 1,
352352
}
353353
router.setSLOContextForRequest(request, sloCtx)
354354

@@ -394,12 +394,12 @@ func TestSLOAwareRouter_ResponseStreaming_SubsequentTokens(t *testing.T) {
394394
sloCtx.lastSeenMetrics["prefill"] = &backendmetrics.MetricsState{
395395
KVCacheUsagePercent: 0.5,
396396
WaitingQueueSize: 1,
397-
RunningQueueSize: 1,
397+
RunningRequestsSize: 1,
398398
}
399399
sloCtx.lastSeenMetrics["default"] = &backendmetrics.MetricsState{
400400
KVCacheUsagePercent: 0.5,
401401
WaitingQueueSize: 1,
402-
RunningQueueSize: 1,
402+
RunningRequestsSize: 1,
403403
}
404404
firstTokenTime := time.Now().Add(-100 * time.Millisecond)
405405

pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (m *mockPredictor) GetServerStatus(ctx context.Context) (*latencypredictor.
103103
return &latencypredictor.ServerStatusResponse{}, nil
104104
}
105105

106-
func createTestPod(name string, kvCacheUsage float64, runningQueueSize, waitingQueueSize int) schedulingtypes.Pod {
106+
func createTestPod(name string, kvCacheUsage float64, runningRequestsSize, waitingQueueSize int) schedulingtypes.Pod {
107107
return &schedulingtypes.PodMetrics{
108108
Pod: &backend.Pod{
109109
NamespacedName: types.NamespacedName{
@@ -113,7 +113,7 @@ func createTestPod(name string, kvCacheUsage float64, runningQueueSize, waitingQ
113113
},
114114
MetricsState: &backendmetrics.MetricsState{
115115
KVCacheUsagePercent: kvCacheUsage,
116-
RunningQueueSize: runningQueueSize,
116+
RunningRequestsSize: runningRequestsSize,
117117
WaitingQueueSize: waitingQueueSize,
118118
},
119119
}

0 commit comments

Comments
 (0)