Skip to content

Commit 71b7bf6

Browse files
committed
Add fallback logic
1 parent 36d2474 commit 71b7bf6

File tree

2 files changed

+55
-27
lines changed

2 files changed

+55
-27
lines changed

pkg/epp/requestcontrol/director.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,20 +239,26 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC
239239
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "results must be greater than zero"}
240240
}
241241
// primary profile is used to set destination
242-
// TODO should use multiple destinations according to epp protocol. current code assumes a single target
243-
targetPod := result.ProfileResults[result.PrimaryProfileName].TargetPods[0].GetPod()
244-
245242
pool, err := d.datastore.PoolGet()
246243
if err != nil {
247244
return reqCtx, err
248245
}
246+
targetPods := []*backend.Pod{}
249247
targetPort := int(pool.Spec.TargetPortNumber)
248+
targetEndpoints := []string{}
249+
250+
for _, pod := range result.ProfileResults[result.PrimaryProfileName].TargetPods {
251+
curPod := pod.GetPod()
252+
curEndpoint := net.JoinHostPort(curPod.Address, strconv.Itoa(targetPort))
253+
targetPods = append(targetPods, curPod)
254+
targetEndpoints = append(targetEndpoints, curEndpoint)
255+
}
250256

251-
endpoint := net.JoinHostPort(targetPod.Address, strconv.Itoa(targetPort))
252-
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", targetPod)
257+
multiEndpointString := strings.Join(targetEndpoints, ",")
258+
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", multiEndpointString)
253259

254-
reqCtx.TargetPod = targetPod
255-
reqCtx.TargetEndpoint = endpoint
260+
reqCtx.TargetPod = targetPods[0]
261+
reqCtx.TargetEndpoint = multiEndpointString
256262

257263
d.runPreRequestPlugins(ctx, reqCtx.SchedulingRequest, result, targetPort)
258264

@@ -274,6 +280,8 @@ func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestC
274280
Headers: reqCtx.Response.Headers,
275281
}
276282

283+
// TODO: to extend fallback functionality, handle cases where target pod is unavailable
284+
// https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1224
277285
d.runPostResponsePlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod)
278286

279287
return reqCtx, nil

pkg/epp/requestcontrol/director_test.go

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package requestcontrol
1919
import (
2020
"context"
2121
"errors"
22+
"fmt"
2223
"testing"
2324
"time"
2425

@@ -109,26 +110,29 @@ func TestDirector_HandleRequest(t *testing.T) {
109110
},
110111
}
111112

112-
// Pod setup
113-
testPod := &corev1.Pod{
114-
ObjectMeta: metav1.ObjectMeta{
115-
Name: "pod1",
116-
Namespace: "default",
117-
Labels: map[string]string{"app": "inference"},
118-
},
119-
Status: corev1.PodStatus{
120-
PodIP: "192.168.1.100",
121-
Phase: corev1.PodRunning,
122-
Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}},
123-
},
124-
}
125113
scheme := runtime.NewScheme()
126114
_ = clientgoscheme.AddToScheme(scheme)
127115
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()
128116
if err := ds.PoolSet(ctx, fakeClient, pool); err != nil {
129117
t.Fatalf("Error while setting inference pool: %v", err)
130118
}
131-
ds.PodUpdateOrAddIfNotExist(testPod)
119+
120+
for i := range 5 {
121+
// Pod setup
122+
testPod := &corev1.Pod{
123+
ObjectMeta: metav1.ObjectMeta{
124+
Name: fmt.Sprintf("pod%v", i+1),
125+
Namespace: "default",
126+
Labels: map[string]string{"app": "inference"},
127+
},
128+
Status: corev1.PodStatus{
129+
PodIP: fmt.Sprintf("192.168.%v.100", i+1),
130+
Phase: corev1.PodRunning,
131+
Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}},
132+
},
133+
}
134+
ds.PodUpdateOrAddIfNotExist(testPod)
135+
}
132136

133137
defaultSuccessfulScheduleResults := &schedulingtypes.SchedulingResult{
134138
ProfileResults: map[string]*schedulingtypes.ProfileRunResult{
@@ -142,6 +146,22 @@ func TestDirector_HandleRequest(t *testing.T) {
142146
},
143147
},
144148
},
149+
&schedulingtypes.ScoredPod{
150+
Pod: &schedulingtypes.PodMetrics{
151+
Pod: &backend.Pod{
152+
Address: "192.168.2.100",
153+
NamespacedName: k8stypes.NamespacedName{Name: "pod2", Namespace: "default"},
154+
},
155+
},
156+
},
157+
&schedulingtypes.ScoredPod{
158+
Pod: &schedulingtypes.PodMetrics{
159+
Pod: &backend.Pod{
160+
Address: "192.168.4.100",
161+
NamespacedName: k8stypes.NamespacedName{Name: "pod4", Namespace: "default"},
162+
},
163+
},
164+
},
145165
},
146166
},
147167
},
@@ -174,7 +194,7 @@ func TestDirector_HandleRequest(t *testing.T) {
174194
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
175195
Address: "192.168.1.100",
176196
},
177-
TargetEndpoint: "192.168.1.100:8000",
197+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
178198
},
179199
wantMutatedBodyModel: model,
180200
},
@@ -199,7 +219,7 @@ func TestDirector_HandleRequest(t *testing.T) {
199219
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
200220
Address: "192.168.1.100",
201221
},
202-
TargetEndpoint: "192.168.1.100:8000",
222+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
203223
},
204224
wantMutatedBodyModel: model,
205225
},
@@ -228,7 +248,7 @@ func TestDirector_HandleRequest(t *testing.T) {
228248
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
229249
Address: "192.168.1.100",
230250
},
231-
TargetEndpoint: "192.168.1.100:8000",
251+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
232252
},
233253
wantMutatedBodyModel: model,
234254
},
@@ -249,7 +269,7 @@ func TestDirector_HandleRequest(t *testing.T) {
249269
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
250270
Address: "192.168.1.100",
251271
},
252-
TargetEndpoint: "192.168.1.100:8000",
272+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
253273
},
254274
wantMutatedBodyModel: modelSheddable,
255275
},
@@ -270,7 +290,7 @@ func TestDirector_HandleRequest(t *testing.T) {
270290
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
271291
Address: "192.168.1.100",
272292
},
273-
TargetEndpoint: "192.168.1.100:8000",
293+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
274294
},
275295
wantMutatedBodyModel: "resolved-target-model-A",
276296
},
@@ -286,7 +306,7 @@ func TestDirector_HandleRequest(t *testing.T) {
286306
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
287307
Address: "192.168.1.100",
288308
},
289-
TargetEndpoint: "192.168.1.100:8000",
309+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
290310
},
291311
wantMutatedBodyModel: "food-review-1",
292312
reqBodyMap: map[string]any{

0 commit comments

Comments
 (0)