Skip to content

Commit 1f04487

Browse files
committed
Add fallback logic
1 parent 9a5491f commit 1f04487

File tree

3 files changed

+56
-27
lines changed

3 files changed

+56
-27
lines changed

pkg/epp/requestcontrol/director.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -238,20 +238,29 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC
238238
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "results must be greater than zero"}
239239
}
240240
// primary profile is used to set destination
241-
// TODO should use multiple destinations according to epp protocol. current code assumes a single target
242-
targetPod := result.ProfileResults[result.PrimaryProfileName].TargetPods[0].GetPod()
243-
244241
pool, err := d.datastore.PoolGet()
245242
if err != nil {
246243
return reqCtx, err
247244
}
245+
targetPod := result.ProfileResults[result.PrimaryProfileName].TargetPods[0].GetPod()
248246
targetPort := int(pool.Spec.TargetPortNumber)
247+
targetEndpoints := []string{}
248+
249+
for _, pod := range result.ProfileResults[result.PrimaryProfileName].TargetPods {
250+
curPod := pod.GetPod()
251+
curEndpoint := net.JoinHostPort(curPod.Address, strconv.Itoa(targetPort))
252+
253+
// TODO should use multiple destinations according to epp protocol. current code assumes a single target
254+
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", curPod)
255+
256+
targetEndpoints = append(targetEndpoints, curEndpoint)
257+
258+
}
249259

250-
endpoint := net.JoinHostPort(targetPod.Address, strconv.Itoa(targetPort))
251-
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", targetPod)
260+
combinedEndpointsString := strings.Join(targetEndpoints, ",")
252261

253262
reqCtx.TargetPod = targetPod
254-
reqCtx.TargetEndpoint = endpoint
263+
reqCtx.TargetEndpoint = combinedEndpointsString
255264

256265
d.runPreRequestPlugins(ctx, reqCtx.SchedulingRequest, result, targetPort)
257266

pkg/epp/requestcontrol/director_test.go

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package requestcontrol
1919
import (
2020
"context"
2121
"errors"
22+
"fmt"
2223
"testing"
2324
"time"
2425

@@ -107,26 +108,29 @@ func TestDirector_HandleRequest(t *testing.T) {
107108
},
108109
}
109110

110-
// Pod setup
111-
testPod := &corev1.Pod{
112-
ObjectMeta: metav1.ObjectMeta{
113-
Name: "pod1",
114-
Namespace: "default",
115-
Labels: map[string]string{"app": "inference"},
116-
},
117-
Status: corev1.PodStatus{
118-
PodIP: "192.168.1.100",
119-
Phase: corev1.PodRunning,
120-
Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}},
121-
},
122-
}
123111
scheme := runtime.NewScheme()
124112
_ = clientgoscheme.AddToScheme(scheme)
125113
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()
126114
if err := ds.PoolSet(ctx, fakeClient, pool); err != nil {
127115
t.Fatalf("Error while setting inference pool: %v", err)
128116
}
129-
ds.PodUpdateOrAddIfNotExist(testPod)
117+
118+
for i := range 5 {
119+
// Pod setup
120+
testPod := &corev1.Pod{
121+
ObjectMeta: metav1.ObjectMeta{
122+
Name: fmt.Sprintf("pod%v", i+1),
123+
Namespace: "default",
124+
Labels: map[string]string{"app": "inference"},
125+
},
126+
Status: corev1.PodStatus{
127+
PodIP: fmt.Sprintf("192.168.%v.100", i+1),
128+
Phase: corev1.PodRunning,
129+
Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}},
130+
},
131+
}
132+
ds.PodUpdateOrAddIfNotExist(testPod)
133+
}
130134

131135
defaultSuccessfulScheduleResults := &schedulingtypes.SchedulingResult{
132136
ProfileResults: map[string]*schedulingtypes.ProfileRunResult{
@@ -140,6 +144,22 @@ func TestDirector_HandleRequest(t *testing.T) {
140144
},
141145
},
142146
},
147+
&schedulingtypes.ScoredPod{
148+
Pod: &schedulingtypes.PodMetrics{
149+
Pod: &backend.Pod{
150+
Address: "192.168.2.100",
151+
NamespacedName: k8stypes.NamespacedName{Name: "pod2", Namespace: "default"},
152+
},
153+
},
154+
},
155+
&schedulingtypes.ScoredPod{
156+
Pod: &schedulingtypes.PodMetrics{
157+
Pod: &backend.Pod{
158+
Address: "192.168.4.100",
159+
NamespacedName: k8stypes.NamespacedName{Name: "pod4", Namespace: "default"},
160+
},
161+
},
162+
},
143163
},
144164
},
145165
},
@@ -172,7 +192,7 @@ func TestDirector_HandleRequest(t *testing.T) {
172192
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
173193
Address: "192.168.1.100",
174194
},
175-
TargetEndpoint: "192.168.1.100:8000",
195+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
176196
},
177197
wantMutatedBodyModel: model,
178198
},
@@ -197,7 +217,7 @@ func TestDirector_HandleRequest(t *testing.T) {
197217
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
198218
Address: "192.168.1.100",
199219
},
200-
TargetEndpoint: "192.168.1.100:8000",
220+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
201221
},
202222
wantMutatedBodyModel: model,
203223
},
@@ -226,7 +246,7 @@ func TestDirector_HandleRequest(t *testing.T) {
226246
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
227247
Address: "192.168.1.100",
228248
},
229-
TargetEndpoint: "192.168.1.100:8000",
249+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
230250
},
231251
wantMutatedBodyModel: model,
232252
},
@@ -247,7 +267,7 @@ func TestDirector_HandleRequest(t *testing.T) {
247267
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
248268
Address: "192.168.1.100",
249269
},
250-
TargetEndpoint: "192.168.1.100:8000",
270+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
251271
},
252272
wantMutatedBodyModel: modelSheddable,
253273
},
@@ -268,7 +288,7 @@ func TestDirector_HandleRequest(t *testing.T) {
268288
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
269289
Address: "192.168.1.100",
270290
},
271-
TargetEndpoint: "192.168.1.100:8000",
291+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
272292
},
273293
wantMutatedBodyModel: "resolved-target-model-A",
274294
},
@@ -284,7 +304,7 @@ func TestDirector_HandleRequest(t *testing.T) {
284304
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
285305
Address: "192.168.1.100",
286306
},
287-
TargetEndpoint: "192.168.1.100:8000",
307+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
288308
},
289309
wantMutatedBodyModel: "food-review-1",
290310
reqBodyMap: map[string]any{

pkg/epp/scheduling/framework/plugins/picker/common.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ limitations under the License.
1717
package picker
1818

1919
const (
20-
DefaultMaxNumOfEndpoints = 1 // common default to all pickers
20+
DefaultMaxNumOfEndpoints = 2 // common default to all pickers, should have atleast one fallback
2121
)
2222

2323
// pickerParameters defines the common parameters for all pickers

0 commit comments

Comments
 (0)