Skip to content

Commit 5db9085

Browse files
committed
Add fallback logic
1 parent 0e1e964 commit 5db9085

File tree

7 files changed

+157
-34
lines changed

7 files changed

+157
-34
lines changed

pkg/epp/requestcontrol/director.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC
239239
}
240240
// primary profile is used to set destination
241241
targetPod := result.ProfileResults[result.PrimaryProfileName].TargetPod.GetPod()
242+
backupPods := result.ProfileResults[result.PrimaryProfileName].FallbackPods
242243

243244
pool, err := d.datastore.PoolGet()
244245
if err != nil {
@@ -249,8 +250,20 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC
249250
endpoint := net.JoinHostPort(targetPod.Address, strconv.Itoa(targetPort))
250251
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", targetPod)
251252

253+
allEndpoints := []string{endpoint}
254+
255+
for _, pod := range backupPods {
256+
curPod := pod.GetPod()
257+
curEndpoint := net.JoinHostPort(curPod.Address, strconv.Itoa(targetPort))
258+
259+
allEndpoints = append(allEndpoints, curEndpoint)
260+
261+
}
262+
263+
combinedEndpointsString := strings.Join(allEndpoints, ",")
264+
252265
reqCtx.TargetPod = targetPod
253-
reqCtx.TargetEndpoint = endpoint
266+
reqCtx.TargetEndpoint = combinedEndpointsString
254267

255268
d.runPreRequestPlugins(ctx, reqCtx.SchedulingRequest, result, targetPort)
256269

pkg/epp/scheduling/framework/plugins/picker/max_score_picker.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"encoding/json"
2222
"fmt"
23+
"sort"
2324

2425
"sigs.k8s.io/controller-runtime/pkg/log"
2526
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
@@ -80,9 +81,27 @@ func (p *MaxScorePicker) Pick(ctx context.Context, cycleState *types.CycleState,
8081
}
8182
}
8283

84+
var target types.Pod
85+
fallbackPods := []*types.ScoredPod{}
8386
if len(highestScorePods) > 1 {
84-
return p.random.Pick(ctx, cycleState, highestScorePods) // pick randomly from the highest score pods
87+
profile := p.random.Pick(ctx, cycleState, highestScorePods) // pick randomly from the highest score pods
88+
target = profile.TargetPod
89+
fallbackPods = profile.FallbackPods
90+
} else {
91+
target = highestScorePods[0]
8592
}
8693

87-
return &types.ProfileRunResult{TargetPod: highestScorePods[0]}
94+
// Add all pods that had a lower score than highestScorePods
95+
for _, pod := range scoredPods {
96+
if pod.Score < maxScore {
97+
fallbackPods = append(fallbackPods, pod)
98+
}
99+
}
100+
101+
// Sort the final fallback pods by score in descending order
102+
sort.Slice(fallbackPods, func(i, j int) bool {
103+
return fallbackPods[i].Score > fallbackPods[j].Score
104+
})
105+
106+
return &types.ProfileRunResult{TargetPod: target, FallbackPods: fallbackPods}
88107
}

pkg/epp/scheduling/framework/plugins/picker/picker_test.go

Lines changed: 75 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,70 @@ import (
2020
"context"
2121
"testing"
2222

23+
"github.com/google/go-cmp/cmp"
2324
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
2427
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2528

2629
k8stypes "k8s.io/apimachinery/pkg/types"
2730
)
2831

32+
var _ framework.Picker = &TestRandomPicker{}
33+
34+
// NewTestRandomPicker initializes a new NewTestRandomPicker and returns its pointer.
35+
func NewTestRandomPicker(pickRes string) *TestRandomPicker {
36+
return &TestRandomPicker{
37+
Picked: pickRes,
38+
tn: plugins.TypedName{Type: "random-test", Name: "random-test"},
39+
}
40+
}
41+
42+
// NewTestRandomPicker picks the selected pod from the list of candidates.
43+
type TestRandomPicker struct {
44+
tn plugins.TypedName
45+
Picked string
46+
}
47+
48+
// TypedName returns the type and name tuple of this plugin instance.
49+
func (p *TestRandomPicker) TypedName() plugins.TypedName {
50+
return p.tn
51+
}
52+
53+
// WithName sets the name of the picker.
54+
func (p *TestRandomPicker) WithName(name string) *TestRandomPicker {
55+
p.tn.Name = name
56+
return p
57+
}
58+
59+
// Type returns the type of the picker.
60+
func (p *TestRandomPicker) Type() string {
61+
return RandomPickerType
62+
}
63+
64+
func (tp *TestRandomPicker) Pick(_ context.Context, _ *types.CycleState, scoredPods []*types.ScoredPod) *types.ProfileRunResult {
65+
fallbackPods := []*types.ScoredPod{}
66+
67+
var winnerPod types.Pod
68+
for _, scoredPod := range scoredPods {
69+
if scoredPod.GetPod().NamespacedName.String() == tp.Picked {
70+
winnerPod = scoredPod
71+
} else {
72+
fallbackPods = append(fallbackPods, scoredPod)
73+
}
74+
}
75+
76+
return &types.ProfileRunResult{TargetPod: winnerPod, FallbackPods: fallbackPods}
77+
}
78+
2979
func TestPickMaxScorePicker(t *testing.T) {
3080
tests := []struct {
31-
name string
32-
scoredPods []*types.ScoredPod
33-
wantNames []string
34-
shouldPanic bool
81+
name string
82+
scoredPods []*types.ScoredPod
83+
wantPodName string
84+
wantFallbackNames []string
85+
shouldPanic bool
86+
picker framework.Picker
3587
}{
3688
{
3789
name: "Single max score",
@@ -40,7 +92,9 @@ func TestPickMaxScorePicker(t *testing.T) {
4092
{Pod: &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}, Score: 25},
4193
{Pod: &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}}, Score: 15},
4294
},
43-
wantNames: []string{"pod2"},
95+
wantPodName: "pod2",
96+
wantFallbackNames: []string{"pod3", "pod1"},
97+
picker: NewMaxScorePicker(),
4498
},
4599
{
46100
name: "Multiple max scores",
@@ -49,13 +103,16 @@ func TestPickMaxScorePicker(t *testing.T) {
49103
{Pod: &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "podB"}}}, Score: 50},
50104
{Pod: &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "podC"}}}, Score: 30},
51105
},
52-
wantNames: []string{"podA", "podB"},
106+
wantPodName: "podA",
107+
wantFallbackNames: []string{"podB", "podC"},
108+
picker: NewTestRandomPicker(k8stypes.NamespacedName{Name: "podA"}.String()),
53109
},
54110
{
55-
name: "Empty pod list",
56-
scoredPods: []*types.ScoredPod{},
57-
wantNames: nil,
58-
shouldPanic: true,
111+
name: "Empty pod list",
112+
scoredPods: []*types.ScoredPod{},
113+
wantFallbackNames: nil,
114+
shouldPanic: true,
115+
picker: NewMaxScorePicker(),
59116
},
60117
}
61118

@@ -69,8 +126,7 @@ func TestPickMaxScorePicker(t *testing.T) {
69126
}()
70127
}
71128

72-
p := NewMaxScorePicker()
73-
result := p.Pick(context.Background(), nil, tt.scoredPods)
129+
result := tt.picker.Pick(context.Background(), nil, tt.scoredPods)
74130

75131
if len(tt.scoredPods) == 0 && result != nil {
76132
t.Errorf("expected nil result for empty input, got %+v", result)
@@ -79,15 +135,14 @@ func TestPickMaxScorePicker(t *testing.T) {
79135

80136
if result != nil {
81137
got := result.TargetPod.GetPod().NamespacedName.Name
82-
match := false
83-
for _, want := range tt.wantNames {
84-
if got == want {
85-
match = true
86-
break
87-
}
138+
if diff := cmp.Diff(tt.wantPodName, got); diff != "" {
139+
t.Errorf("Unexpected target pod name (-want +got): %v", diff)
88140
}
89-
if !match {
90-
t.Errorf("got %q, want one of %v", got, tt.wantNames)
141+
gotFallback := result.FallbackPods
142+
for i, wantName := range tt.wantFallbackNames {
143+
if diff := cmp.Diff(wantName, gotFallback[i].GetPod().NamespacedName.Name); diff != "" {
144+
t.Errorf("Unexpected target pod name (-want +got): %v", diff)
145+
}
91146
}
92147
}
93148
})

pkg/epp/scheduling/framework/plugins/picker/random_picker.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,15 @@ func (p *RandomPicker) WithName(name string) *RandomPicker {
6969
func (p *RandomPicker) Pick(ctx context.Context, _ *types.CycleState, scoredPods []*types.ScoredPod) *types.ProfileRunResult {
7070
log.FromContext(ctx).V(logutil.DEBUG).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(scoredPods), scoredPods))
7171
i := rand.Intn(len(scoredPods))
72-
return &types.ProfileRunResult{TargetPod: scoredPods[i]}
72+
pods := []*types.ScoredPod{}
73+
74+
for index, pod := range scoredPods {
75+
fmt.Println(pod)
76+
if index == i {
77+
continue
78+
}
79+
pods = append(pods, pod)
80+
}
81+
82+
return &types.ProfileRunResult{TargetPod: scoredPods[i], FallbackPods: pods}
7383
}

pkg/epp/scheduling/framework/scheduler_profile_test.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,12 @@ func TestSchedulePlugins(t *testing.T) {
4242
[]k8stypes.NamespacedName{}, k8stypes.NamespacedName{Name: "pod1"})
4343

4444
tests := []struct {
45-
name string
46-
profile *SchedulerProfile
47-
input []types.Pod
48-
wantTargetPod k8stypes.NamespacedName
49-
targetPodScore float64
45+
name string
46+
profile *SchedulerProfile
47+
input []types.Pod
48+
wantTargetPod k8stypes.NamespacedName
49+
wantFallbackPods []*types.ScoredPod
50+
targetPodScore float64
5051
// Number of expected pods to score (after filter)
5152
numPodsToScore int
5253
err bool
@@ -63,7 +64,16 @@ func TestSchedulePlugins(t *testing.T) {
6364
&types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}},
6465
&types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}},
6566
},
66-
wantTargetPod: k8stypes.NamespacedName{Name: "pod1"},
67+
wantTargetPod: k8stypes.NamespacedName{Name: "pod1"},
68+
wantFallbackPods: []*types.ScoredPod{
69+
{
70+
Pod: &types.PodMetrics{Pod: &backend.Pod{
71+
NamespacedName: k8stypes.NamespacedName{Name: "pod2"},
72+
Labels: map[string]string{},
73+
}},
74+
Score: 1.1,
75+
},
76+
},
6777
targetPodScore: 1.1,
6878
numPodsToScore: 2,
6979
err: false,
@@ -80,7 +90,16 @@ func TestSchedulePlugins(t *testing.T) {
8090
&types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}},
8191
&types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}},
8292
},
83-
wantTargetPod: k8stypes.NamespacedName{Name: "pod1"},
93+
wantTargetPod: k8stypes.NamespacedName{Name: "pod1"},
94+
wantFallbackPods: []*types.ScoredPod{
95+
{
96+
Pod: &types.PodMetrics{Pod: &backend.Pod{
97+
NamespacedName: k8stypes.NamespacedName{Name: "pod2"},
98+
Labels: map[string]string{},
99+
}},
100+
Score: 50,
101+
},
102+
},
84103
targetPodScore: 50,
85104
numPodsToScore: 2,
86105
err: false,
@@ -138,7 +157,8 @@ func TestSchedulePlugins(t *testing.T) {
138157
Pod: &backend.Pod{NamespacedName: test.wantTargetPod},
139158
}
140159
wantRes := &types.ProfileRunResult{
141-
TargetPod: wantPod,
160+
TargetPod: wantPod,
161+
FallbackPods: test.wantFallbackPods,
142162
}
143163

144164
if diff := cmp.Diff(wantRes, got); diff != "" {
@@ -235,16 +255,20 @@ func (tp *testPlugin) Score(_ context.Context, _ *types.CycleState, _ *types.LLM
235255
func (tp *testPlugin) Pick(_ context.Context, _ *types.CycleState, scoredPods []*types.ScoredPod) *types.ProfileRunResult {
236256
tp.PickCallCount++
237257
tp.NumOfPickerCandidates = len(scoredPods)
258+
fallbackPods := []*types.ScoredPod{}
238259

239260
var winnerPod types.Pod
240261
for _, scoredPod := range scoredPods {
241262
if scoredPod.GetPod().NamespacedName.String() == tp.PickRes.String() {
242263
winnerPod = scoredPod.Pod
243264
tp.WinnerPodScore = scoredPod.Score
265+
} else {
266+
fallbackPods = append(fallbackPods, scoredPod)
244267
}
268+
245269
}
246270

247-
return &types.ProfileRunResult{TargetPod: winnerPod}
271+
return &types.ProfileRunResult{TargetPod: winnerPod, FallbackPods: fallbackPods}
248272
}
249273

250274
func (tp *testPlugin) PostCycle(_ context.Context, _ *types.CycleState, res *types.ProfileRunResult) {

pkg/epp/scheduling/scheduler_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func TestSchedule(t *testing.T) {
109109
},
110110
},
111111
},
112+
FallbackPods: []*types.ScoredPod{},
112113
},
113114
},
114115
PrimaryProfileName: "default",

pkg/epp/scheduling/types/types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ type PodMetrics struct {
7272

7373
// ProfileRunResult captures the profile run result.
7474
type ProfileRunResult struct {
75-
TargetPod Pod
75+
TargetPod Pod
76+
FallbackPods []*ScoredPod
7677
}
7778

7879
// SchedulingResult captures the result of the scheduling cycle.

0 commit comments

Comments
 (0)