Skip to content

Commit 30e439c

Browse files
committed
Add fallback logic
1 parent 9dacc6c commit 30e439c

File tree

3 files changed

+143
-49
lines changed

3 files changed

+143
-49
lines changed

pkg/epp/handlers/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ type StreamingServer struct {
8181
// Specifically, there are fields related to the ext-proc protocol, and then fields related to the lifecycle of the request.
8282
// We should split these apart as this monolithic object exposes too much data to too many layers.
8383
type RequestContext struct {
84-
TargetPod *backend.Pod
84+
TargetPods []*backend.Pod
8585
TargetEndpoint string
8686
Model string
8787
ResolvedTargetModel string

pkg/epp/requestcontrol/director.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -239,20 +239,29 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC
239239
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "results must be greater than zero"}
240240
}
241241
// primary profile is used to set destination
242-
// TODO should use multiple destinations according to epp protocol. current code assumes a single target
243-
targetPod := result.ProfileResults[result.PrimaryProfileName].TargetPods[0].GetPod()
244-
245242
pool, err := d.datastore.PoolGet()
246243
if err != nil {
247244
return reqCtx, err
248245
}
246+
targetPods := []*backend.Pod{}
249247
targetPort := int(pool.Spec.TargetPortNumber)
248+
targetEndpoints := []string{}
249+
250+
for _, pod := range result.ProfileResults[result.PrimaryProfileName].TargetPods {
251+
curPod := pod.GetPod()
252+
curEndpoint := net.JoinHostPort(curPod.Address, strconv.Itoa(targetPort))
253+
254+
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", curPod)
255+
256+
targetPods = append(targetPods, curPod)
257+
targetEndpoints = append(targetEndpoints, curEndpoint)
250258

251-
endpoint := net.JoinHostPort(targetPod.Address, strconv.Itoa(targetPort))
252-
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", targetPod)
259+
}
260+
261+
combinedEndpointsString := strings.Join(targetEndpoints, ",")
253262

254-
reqCtx.TargetPod = targetPod
255-
reqCtx.TargetEndpoint = endpoint
263+
reqCtx.TargetPods = targetPods
264+
reqCtx.TargetEndpoint = combinedEndpointsString
256265

257266
d.runPreRequestPlugins(ctx, reqCtx.SchedulingRequest, result, targetPort)
258267

@@ -274,7 +283,12 @@ func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestC
274283
Headers: reqCtx.Response.Headers,
275284
}
276285

277-
d.runPostResponsePlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod)
286+
var targetPod *backend.Pod
287+
if len(reqCtx.TargetPods) > 0 {
288+
targetPod = reqCtx.TargetPods[0]
289+
}
290+
291+
d.runPostResponsePlugins(ctx, reqCtx.SchedulingRequest, response, targetPod)
278292

279293
return reqCtx, nil
280294
}

pkg/epp/requestcontrol/director_test.go

Lines changed: 120 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package requestcontrol
1919
import (
2020
"context"
2121
"errors"
22+
"fmt"
2223
"testing"
2324
"time"
2425

@@ -109,26 +110,29 @@ func TestDirector_HandleRequest(t *testing.T) {
109110
},
110111
}
111112

112-
// Pod setup
113-
testPod := &corev1.Pod{
114-
ObjectMeta: metav1.ObjectMeta{
115-
Name: "pod1",
116-
Namespace: "default",
117-
Labels: map[string]string{"app": "inference"},
118-
},
119-
Status: corev1.PodStatus{
120-
PodIP: "192.168.1.100",
121-
Phase: corev1.PodRunning,
122-
Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}},
123-
},
124-
}
125113
scheme := runtime.NewScheme()
126114
_ = clientgoscheme.AddToScheme(scheme)
127115
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()
128116
if err := ds.PoolSet(ctx, fakeClient, pool); err != nil {
129117
t.Fatalf("Error while setting inference pool: %v", err)
130118
}
131-
ds.PodUpdateOrAddIfNotExist(testPod)
119+
120+
for i := range 5 {
121+
// Pod setup
122+
testPod := &corev1.Pod{
123+
ObjectMeta: metav1.ObjectMeta{
124+
Name: fmt.Sprintf("pod%v", i+1),
125+
Namespace: "default",
126+
Labels: map[string]string{"app": "inference"},
127+
},
128+
Status: corev1.PodStatus{
129+
PodIP: fmt.Sprintf("192.168.%v.100", i+1),
130+
Phase: corev1.PodRunning,
131+
Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}},
132+
},
133+
}
134+
ds.PodUpdateOrAddIfNotExist(testPod)
135+
}
132136

133137
defaultSuccessfulScheduleResults := &schedulingtypes.SchedulingResult{
134138
ProfileResults: map[string]*schedulingtypes.ProfileRunResult{
@@ -142,6 +146,22 @@ func TestDirector_HandleRequest(t *testing.T) {
142146
},
143147
},
144148
},
149+
&schedulingtypes.ScoredPod{
150+
Pod: &schedulingtypes.PodMetrics{
151+
Pod: &backend.Pod{
152+
Address: "192.168.2.100",
153+
NamespacedName: k8stypes.NamespacedName{Name: "pod2", Namespace: "default"},
154+
},
155+
},
156+
},
157+
&schedulingtypes.ScoredPod{
158+
Pod: &schedulingtypes.PodMetrics{
159+
Pod: &backend.Pod{
160+
Address: "192.168.4.100",
161+
NamespacedName: k8stypes.NamespacedName{Name: "pod4", Namespace: "default"},
162+
},
163+
},
164+
},
145165
},
146166
},
147167
},
@@ -170,11 +190,21 @@ func TestDirector_HandleRequest(t *testing.T) {
170190
wantReqCtx: &handlers.RequestContext{
171191
Model: model,
172192
ResolvedTargetModel: model,
173-
TargetPod: &backend.Pod{
174-
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
175-
Address: "192.168.1.100",
193+
TargetPods: []*backend.Pod{
194+
{
195+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
196+
Address: "192.168.1.100",
197+
},
198+
{
199+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod2"},
200+
Address: "192.168.2.100",
201+
},
202+
{
203+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod4"},
204+
Address: "192.168.4.100",
205+
},
176206
},
177-
TargetEndpoint: "192.168.1.100:8000",
207+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
178208
},
179209
wantMutatedBodyModel: model,
180210
},
@@ -195,11 +225,21 @@ func TestDirector_HandleRequest(t *testing.T) {
195225
wantReqCtx: &handlers.RequestContext{
196226
Model: model,
197227
ResolvedTargetModel: model,
198-
TargetPod: &backend.Pod{
199-
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
200-
Address: "192.168.1.100",
228+
TargetPods: []*backend.Pod{
229+
{
230+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
231+
Address: "192.168.1.100",
232+
},
233+
{
234+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod2"},
235+
Address: "192.168.2.100",
236+
},
237+
{
238+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod4"},
239+
Address: "192.168.4.100",
240+
},
201241
},
202-
TargetEndpoint: "192.168.1.100:8000",
242+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
203243
},
204244
wantMutatedBodyModel: model,
205245
},
@@ -224,11 +264,21 @@ func TestDirector_HandleRequest(t *testing.T) {
224264
wantReqCtx: &handlers.RequestContext{
225265
Model: model,
226266
ResolvedTargetModel: model,
227-
TargetPod: &backend.Pod{
228-
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
229-
Address: "192.168.1.100",
267+
TargetPods: []*backend.Pod{
268+
{
269+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
270+
Address: "192.168.1.100",
271+
},
272+
{
273+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod2"},
274+
Address: "192.168.2.100",
275+
},
276+
{
277+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod4"},
278+
Address: "192.168.4.100",
279+
},
230280
},
231-
TargetEndpoint: "192.168.1.100:8000",
281+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
232282
},
233283
wantMutatedBodyModel: model,
234284
},
@@ -245,11 +295,21 @@ func TestDirector_HandleRequest(t *testing.T) {
245295
wantReqCtx: &handlers.RequestContext{
246296
Model: modelSheddable,
247297
ResolvedTargetModel: modelSheddable,
248-
TargetPod: &backend.Pod{
249-
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
250-
Address: "192.168.1.100",
298+
TargetPods: []*backend.Pod{
299+
{
300+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
301+
Address: "192.168.1.100",
302+
},
303+
{
304+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod2"},
305+
Address: "192.168.2.100",
306+
},
307+
{
308+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod4"},
309+
Address: "192.168.4.100",
310+
},
251311
},
252-
TargetEndpoint: "192.168.1.100:8000",
312+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
253313
},
254314
wantMutatedBodyModel: modelSheddable,
255315
},
@@ -266,11 +326,21 @@ func TestDirector_HandleRequest(t *testing.T) {
266326
wantReqCtx: &handlers.RequestContext{
267327
Model: modelWithResolvedTarget,
268328
ResolvedTargetModel: "resolved-target-model-A",
269-
TargetPod: &backend.Pod{
270-
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
271-
Address: "192.168.1.100",
329+
TargetPods: []*backend.Pod{
330+
{
331+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
332+
Address: "192.168.1.100",
333+
},
334+
{
335+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod2"},
336+
Address: "192.168.2.100",
337+
},
338+
{
339+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod4"},
340+
Address: "192.168.4.100",
341+
},
272342
},
273-
TargetEndpoint: "192.168.1.100:8000",
343+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
274344
},
275345
wantMutatedBodyModel: "resolved-target-model-A",
276346
},
@@ -282,11 +352,21 @@ func TestDirector_HandleRequest(t *testing.T) {
282352
wantReqCtx: &handlers.RequestContext{
283353
Model: "food-review-1",
284354
ResolvedTargetModel: "food-review-1",
285-
TargetPod: &backend.Pod{
286-
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
287-
Address: "192.168.1.100",
355+
TargetPods: []*backend.Pod{
356+
{
357+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
358+
Address: "192.168.1.100",
359+
},
360+
{
361+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod2"},
362+
Address: "192.168.2.100",
363+
},
364+
{
365+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod4"},
366+
Address: "192.168.4.100",
367+
},
288368
},
289-
TargetEndpoint: "192.168.1.100:8000",
369+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
290370
},
291371
wantMutatedBodyModel: "food-review-1",
292372
reqBodyMap: map[string]any{
@@ -389,7 +469,7 @@ func TestDirector_HandleRequest(t *testing.T) {
389469
assert.Equal(t, test.wantReqCtx.Model, returnedReqCtx.Model, "reqCtx.Model mismatch")
390470
assert.Equal(t, test.wantReqCtx.ResolvedTargetModel, returnedReqCtx.ResolvedTargetModel,
391471
"reqCtx.ResolvedTargetModel mismatch")
392-
assert.Equal(t, test.wantReqCtx.TargetPod, returnedReqCtx.TargetPod, "reqCtx.TargetPod mismatch")
472+
assert.Equal(t, test.wantReqCtx.TargetPods, returnedReqCtx.TargetPods, "reqCtx.TargetPod mismatch")
393473
assert.Equal(t, test.wantReqCtx.TargetEndpoint, returnedReqCtx.TargetEndpoint, "reqCtx.TargetEndpoint mismatch")
394474
}
395475

@@ -675,7 +755,7 @@ func TestDirector_HandleResponse(t *testing.T) {
675755
Headers: map[string]string{"X-Test-Response-Header": "TestValue"},
676756
},
677757

678-
TargetPod: &backend.Pod{NamespacedName: types.NamespacedName{Namespace: "namespace1", Name: "test-pod-name"}},
758+
TargetPods: []*backend.Pod{{NamespacedName: types.NamespacedName{Namespace: "namespace1", Name: "test-pod-name"}}},
679759
}
680760

681761
_, err := director.HandleResponse(ctx, reqCtx)

0 commit comments

Comments
 (0)