From e273d6bf322e4673e7f4ec6b686e8c88aba3073c Mon Sep 17 00:00:00 2001 From: BrianPark314 Date: Sat, 14 Jun 2025 16:27:56 +0900 Subject: [PATCH 1/7] feat: add vllmruntime Signed-off-by: BrianPark314 --- .../configs/vllm/vllm-runtime.yaml | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 src/gateway_inference_extension/configs/vllm/vllm-runtime.yaml diff --git a/src/gateway_inference_extension/configs/vllm/vllm-runtime.yaml b/src/gateway_inference_extension/configs/vllm/vllm-runtime.yaml new file mode 100644 index 000000000..f1e7a98ed --- /dev/null +++ b/src/gateway_inference_extension/configs/vllm/vllm-runtime.yaml @@ -0,0 +1,100 @@ +apiVersion: production-stack.vllm.ai/v1alpha1 +kind: VLLMRuntime +metadata: + name: vllm-llama3-1b-instruct # must match Gateway route back-reference + labels: + app.kubernetes.io/component: model-server + app.kubernetes.io/part-of: inference-gateway-demo +spec: + # --- Core vLLM flags -------------------------------------------------- + v1: true # keep vLLM v1 API surface on + tensorParallelSize: 1 + gpuMemoryUtilization: "0.9" + maxLoras: 2 + extraArgs: + - "--max-num-seq" # identical to old Deployment + - "1024" + - "--compilation-config" + - "3" + - "--max-lora-rank" + - "32" + - "--max-cpu-loras" + - "12" + + # --- Model ------------------------------------------------------------ + model: + modelURL: "meta-llama/Llama-3.2-1B-Instruct" + enableLoRA: true + dtype: "bfloat16" + maxModelLen: 4096 + maxNumSeqs: 1024 # duplicated for clarity + + # --- LoRA & cache off-loading ---------------------------------------- + lmCacheConfig: + enabled: true + remoteUrl: "lm://cacheserver-sample.default.svc.cluster.local:80" + remoteSerde: "naive" + cpuOffloadingBufferSize: "15" + diskOffloadingBufferSize: "0" + + # --- Runtime image ---------------------------------------------------- + image: + registry: "docker.io" + name: "lmcache/vllm-openai:2025-05-05-v1" + pullPolicy: "IfNotPresent" + + # --- Resources -------------------------------------------------------- + resources: + cpu: "8" + memory: "24Gi" + gpu: "1" + + # --- Secret & env ----------------------------------------------------- + hfTokenSecret: + name: "hf-token" + env: + - name: VLLM_USE_V1 + value: "1" + - name: PORT + value: "8000" + - name: VLLM_ALLOW_RUNTIME_LORA_UPDATING + value: "true" + + # --- Replication & strategy ------------------------------------------ + replicas: 2 + deploymentStrategy: "RollingUpdate" + + # --- Pod-level customisation (for adapter syncer & volumes) ---------- + podTemplate: + spec: + enableServiceLinks: false # avoid VLLM_PORT collision + terminationGracePeriodSeconds: 130 + volumes: + - name: data + emptyDir: {} + - name: shm + emptyDir: + medium: Memory + - name: adapters + emptyDir: {} + - name: config-volume + configMap: + name: vllm-llama3-1b-instruct-adapters + initContainers: + - name: lora-adapter-syncer + image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/lora-syncer:main + env: + - name: DYNAMIC_LORA_ROLLOUT_CONFIG + value: "/config/configmap.yaml" + volumeMounts: + - name: config-volume + mountPath: /config + containers: + - name: vllm + volumeMounts: + - name: data + mountPath: /data + - name: shm + mountPath: /dev/shm + - name: adapters + mountPath: /adapters From 53500d67654b142e002dff494da567662e21d917 Mon Sep 17 00:00:00 2001 From: BrianPark314 Date: Sat, 14 Jun 2025 17:40:01 +0900 Subject: [PATCH 2/7] feat: add prefix aware picker Signed-off-by: BrianPark314 --- .../prefix_aware_picker.go | 152 ++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 src/gateway_inference_extension/prefix_aware_picker.go diff --git a/src/gateway_inference_extension/prefix_aware_picker.go b/src/gateway_inference_extension/prefix_aware_picker.go new file mode 100644 index 000000000..9a51a1975 --- /dev/null +++ b/src/gateway_inference_extension/prefix_aware_picker.go @@ -0,0 +1,152 @@ +/* +Copyright 2025 The vLLM Production Stack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +package picker + +import ( + "math/rand" + "sync" + "time" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +var _ plugins.Picker = &PrefixMatchPicker{} + +// PrefixMatchPicker selects the engine whose URL was returned by the +// longest-prefix match against previously-seen prompts (same idea as the +// Python `route_request`). Ties are broken at random. +type PrefixMatchPicker struct { + trie *hashTrie + rnd *rand.Rand +} + +// NewPrefixMatchPicker returns a ready-to-use picker instance. +func NewPrefixMatchPicker() *PrefixMatchPicker { + return &PrefixMatchPicker{ + trie: newHashTrie(), + rnd: rand.New(rand.NewSource(time.Now().UnixNano())), + } +} + +func (p *PrefixMatchPicker) Name() string { return "prefixmatch" } + +// Pick implements plugins.Picker. +// +// SchedulingContext is assumed to carry the inference request body in +// ctx.RequestBody (map[string]any) with the prompt at key "prompt". Adjust +// the accessor if your integration differs. +func (p *PrefixMatchPicker) Pick( + ctx *types.SchedulingContext, + scoredPods []*types.ScoredPod, +) *types.Result { + if len(scoredPods) == 0 { + return &types.Result{} + } + + prompt, _ := ctx.RequestBody["prompt"].(string) + + // 1. Build the set of available endpoints. + available := make(map[string]struct{}, len(scoredPods)) + for _, sp := range scoredPods { + ep := sp.GetPod().EndpointURL // <-- adapt this accessor + available[ep] = struct{}{} + } + + // 2. Longest-prefix match within the trie. + matched := p.trie.longestPrefixMatch(prompt, available) + + // 3. Fallback: no match --> all endpoints are candidates. + if len(matched) == 0 { + for ep := range available { + matched[ep] = struct{}{} + } + } + + // 4. Convert the matched set to a slice and pick randomly. + endpoints := make([]string, 0, len(matched)) + for ep := range matched { + endpoints = append(endpoints, ep) + } + selected := endpoints[p.rnd.Intn(len(endpoints))] + + // 5. Cache the decision for future prefix look-ups. + p.trie.insert(prompt, selected) + + // 6. Return the pod whose URL matches `selected`. + for _, sp := range scoredPods { + if sp.GetPod().EndpointURL == selected { // same accessor as above + return &types.Result{TargetPod: sp} + } + } + // Should never hit; safe fallback. + return &types.Result{TargetPod: scoredPods[0]} +} + +/*---------------------------- trie implementation ---------------------------*/ + +type hashTrie struct { + mu sync.RWMutex + children map[rune]*hashTrie + endpoints map[string]struct{} +} + +func newHashTrie() *hashTrie { + return &hashTrie{children: make(map[rune]*hashTrie)} +} + +func (t *hashTrie) insert(key, endpoint string) { + t.mu.Lock() + defer t.mu.Unlock() + + node := t + for _, r := range key { + child, ok := node.children[r] + if !ok { + child = newHashTrie() + node.children[r] = child + } + node = child + } + if node.endpoints == nil { + node.endpoints = make(map[string]struct{}) + } + node.endpoints[endpoint] = struct{}{} +} + +func (t *hashTrie) longestPrefixMatch( + key string, + available map[string]struct{}, +) map[string]struct{} { + t.mu.RLock() + defer t.mu.RUnlock() + + var lastMatch map[string]struct{} + node := t + for _, r := range key { + if node.endpoints != nil { + lastMatch = node.endpoints + } + child, ok := node.children[r] + if !ok { + break + } + node = child + } + // Filter by `available`. + res := make(map[string]struct{}) + for ep := range lastMatch { + if _, ok := available[ep]; ok { + res[ep] = struct{}{} + } + } + return res +} \ No newline at end of file From bca73e54e1440bb955a2a8992fbd92a303dc146a Mon Sep 17 00:00:00 2001 From: BrianPark314 Date: Sat, 14 Jun 2025 18:00:39 +0900 Subject: [PATCH 3/7] feat: add prefix aware picker Signed-off-by: BrianPark314 --- src/gateway_inference_extension/Dockerfile | 1 + src/gateway_inference_extension/prefix_aware_picker.go | 2 +- src/gateway_inference_extension/scheduler.patch | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/gateway_inference_extension/Dockerfile b/src/gateway_inference_extension/Dockerfile index 4c47a218f..848161f34 100644 --- a/src/gateway_inference_extension/Dockerfile +++ b/src/gateway_inference_extension/Dockerfile @@ -25,6 +25,7 @@ RUN git clone https://github.com/kubernetes-sigs/gateway-api-inference-extension git apply scheduler.patch && \ cd ../../../.. && \ cp /src/roundrobin_picker.go gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker/roundrobin_picker.go && \ + cp /src/prefix_aware_picker.go gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker/prefix_aware_picker.go && \ mkdir -p /src/pkg/ && \ cp -r gateway-api-inference-extension/pkg/epp/ /src/pkg/epp && \ cp gateway-api-inference-extension/go.mod /src && \ diff --git a/src/gateway_inference_extension/prefix_aware_picker.go b/src/gateway_inference_extension/prefix_aware_picker.go index 9a51a1975..894c602cf 100644 --- a/src/gateway_inference_extension/prefix_aware_picker.go +++ b/src/gateway_inference_extension/prefix_aware_picker.go @@ -149,4 +149,4 @@ func (t *hashTrie) longestPrefixMatch( } } return res -} \ No newline at end of file +} diff --git a/src/gateway_inference_extension/scheduler.patch b/src/gateway_inference_extension/scheduler.patch index 95d679a6c..6b32d12b2 100644 --- a/src/gateway_inference_extension/scheduler.patch +++ b/src/gateway_inference_extension/scheduler.patch @@ -16,6 +16,7 @@ index b484cde..c7688a8 100644 scorers: map[plugins.Scorer]int{}, - picker: &picker.RandomPicker{}, + picker: &picker.RoundRobinPicker{}, ++ picker: &picker.PrefixAwarePicker{}, postSchedulePlugins: []plugins.PostSchedule{}, } From d6f4f7ddba3fd23462c082e92316fac53b1e23f4 Mon Sep 17 00:00:00 2001 From: BrianPark314 Date: Sat, 12 Jul 2025 11:29:27 +0900 Subject: [PATCH 4/7] feat: update prefix aware logic Signed-off-by: BrianPark314 --- .../roundrobin_picker.go | 213 +++++++++++++++--- 1 file changed, 184 insertions(+), 29 deletions(-) diff --git a/src/gateway_inference_extension/roundrobin_picker.go b/src/gateway_inference_extension/roundrobin_picker.go index de1c718a1..410eafde0 100644 --- a/src/gateway_inference_extension/roundrobin_picker.go +++ b/src/gateway_inference_extension/roundrobin_picker.go @@ -6,53 +6,208 @@ you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ package picker import ( - "fmt" - "sort" - "sync/atomic" + "math/rand" + "strings" + "sync" + "time" + + "github.com/cespare/xxhash/v2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" - logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) -var _ plugins.Picker = &RoundRobinPicker{} +const chunkSize = 128 + +var _ plugins.Picker = &PrefixMatchPicker{} -// RoundRobinPicker picks pods in a round-robin fashion, cycling through the list of candidates. -type RoundRobinPicker struct { - // currentIndex tracks the current position in the list of pods - currentIndex uint64 +// PrefixMatchPicker selects the engine whose URL was returned by the +// longest-prefix match against previously-seen prompts (same idea as the +// Python `route_request`). Ties are broken at random. +type PrefixMatchPicker struct { + trie *hashTrie + rnd *rand.Rand } -func (p *RoundRobinPicker) Name() string { - return "roundrobin" +// NewPrefixMatchPicker returns a ready-to-use picker instance. +func NewPrefixMatchPicker() *PrefixMatchPicker { + return &PrefixMatchPicker{ + trie: newHashTrie(), + rnd: rand.New(rand.NewSource(time.Now().UnixNano())), + } } -func (p *RoundRobinPicker) Pick(ctx *types.SchedulingContext, scoredPods []*types.ScoredPod) *types.Result { +func (p *PrefixMatchPicker) Name() string { return "prefixmatch" } + +// Pick implements plugins.Picker. +// +// SchedulingContext is assumed to carry the inference request body in +// ctx.RequestBody (map[string]any) with the prompt at key "prompt". Adjust +// the accessor if your integration differs. +func (p *PrefixMatchPicker) Pick( + ctx *types.SchedulingContext, + scoredPods []*types.ScoredPod, +) *types.Result { if len(scoredPods) == 0 { return &types.Result{} } - // sort the candidates by NamespacedName - sort.Slice(scoredPods, func(i, j int) bool { - return scoredPods[i].GetPod().NamespacedName.String() < scoredPods[j].GetPod().NamespacedName.String() - }) - ctx.Logger.V(logutil.DEBUG).Info(fmt.Sprintf("Sorted candidates: %+v", scoredPods)) - // select the next pod in the list - index := int(atomic.AddUint64(&p.currentIndex, 1) - 1) - index = index % len(scoredPods) - ctx.Logger.V(logutil.DEBUG).Info(fmt.Sprintf("Selecting pod at index %d from %d candidates in a round-robin fashion: %+v", - index, len(scoredPods), scoredPods)) - return &types.Result{TargetPod: scoredPods[index]} + var prompt string + + if msgs, ok := ctx.RequestBody["messages"]; ok { + if arr, ok := msgs.([]any); ok { + var parts []string + for _, m := range arr { + mm, ok := m.(map[string]any) + if !ok { + continue + } + switch c := mm["content"].(type) { + case string: + parts = append(parts, c) + case []any: + for _, part := range c { + mp, ok := part.(map[string]any) + if !ok { + continue + } + if mp["type"] == "text" { + if txt, ok := mp["text"].(string); ok { + parts = append(parts, txt) + } + } + } + } + } + prompt = strings.Join(parts, "\n") + } + } + + if prompt == "" { + prompt, _ = ctx.RequestBody["prompt"].(string) + } + + // 1. Build the set of available endpoints. + available := make(map[string]struct{}, len(scoredPods)) + for _, sp := range scoredPods { + ep := sp.GetPod().EndpointURL // <-- adapt this accessor + available[ep] = struct{}{} + } + + // 2. Longest-prefix match within the trie. + matched := p.trie.longestPrefixMatch(prompt, available) + + // 3. Fallback: no match --> all endpoints are candidates. + if len(matched) == 0 { + for ep := range available { + matched[ep] = struct{}{} + } + } + + // 4. Convert the matched set to a slice and pick randomly. + endpoints := make([]string, 0, len(matched)) + for ep := range matched { + endpoints = append(endpoints, ep) + } + selected := endpoints[p.rnd.Intn(len(endpoints))] + + // 5. Cache the decision for future prefix look-ups. + p.trie.insert(prompt, selected) + + // 6. Return the pod whose URL matches `selected`. + for _, sp := range scoredPods { + if sp.GetPod().EndpointURL == selected { // same accessor as above + return &types.Result{TargetPod: sp} + } + } + // Should never hit; safe fallback. + return &types.Result{TargetPod: scoredPods[0]} +} + +/*---------------------------- trie implementation ---------------------------*/ + +type hashTrie struct { + mu sync.RWMutex + children map[uint64]*hashTrie + endpoints map[string]struct{} +} + +func newHashTrie() *hashTrie { + return &hashTrie{children: make(map[uint64]*hashTrie)} +} + +func intersection(a, b map[string]struct{}) map[string]struct{} { + res := make(map[string]struct{}) + for k := range a { + if _, ok := b[k]; ok { + res[k] = struct{}{} + } + } + return res +} + +func chunkAndHash(s string) []uint64 { + hashes := make([]uint64, 0, (len(s)+chunkSize-1)/chunkSize) + for i := 0; i < len(s); i += chunkSize { + end := i + chunkSize + if end > len(s) { + end = len(s) + } + hashes = append(hashes, xxhash.Sum64([]byte(s[i:end]))) + } + return hashes +} + +func (t *hashTrie) insert(key, endpoint string) { + t.mu.Lock() + defer t.mu.Unlock() + + node := t + if node.endpoints == nil { + node.endpoints = make(map[string]struct{}) + } + node.endpoints[endpoint] = struct{}{} + + for _, h := range chunkAndHash(key) { + child, ok := node.children[h] + if !ok { + child = newHashTrie() + node.children[h] = child + } + node = child + if node.endpoints == nil { + node.endpoints = make(map[string]struct{}) + } + node.endpoints[endpoint] = struct{}{} + } +} + +func (t *hashTrie) longestPrefixMatch( + key string, + available map[string]struct{}, +) map[string]struct{} { + t.mu.RLock() + defer t.mu.RUnlock() + node := t + matched := intersection(node.endpoints, available) + + for _, h := range chunkAndHash(key) { + child, ok := node.children[h] + if !ok { + break + } + node = child + cand := intersection(node.endpoints, available) + if len(cand) == 0 { + break + } + matched = cand + } + return matched } From 81123e5a2c0d538b0afefe8776becdfdd9aa59be Mon Sep 17 00:00:00 2001 From: BrianPark314 Date: Wed, 30 Jul 2025 22:28:09 +0900 Subject: [PATCH 5/7] chore: revert roundrobin picker Signed-off-by: BrianPark314 --- .../roundrobin_picker.go | 213 +++--------------- 1 file changed, 29 insertions(+), 184 deletions(-) diff --git a/src/gateway_inference_extension/roundrobin_picker.go b/src/gateway_inference_extension/roundrobin_picker.go index 410eafde0..de1c718a1 100644 --- a/src/gateway_inference_extension/roundrobin_picker.go +++ b/src/gateway_inference_extension/roundrobin_picker.go @@ -6,208 +6,53 @@ you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ package picker import ( - "math/rand" - "strings" - "sync" - "time" - - "github.com/cespare/xxhash/v2" + "fmt" + "sort" + "sync/atomic" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) -const chunkSize = 128 - -var _ plugins.Picker = &PrefixMatchPicker{} +var _ plugins.Picker = &RoundRobinPicker{} -// PrefixMatchPicker selects the engine whose URL was returned by the -// longest-prefix match against previously-seen prompts (same idea as the -// Python `route_request`). Ties are broken at random. -type PrefixMatchPicker struct { - trie *hashTrie - rnd *rand.Rand +// RoundRobinPicker picks pods in a round-robin fashion, cycling through the list of candidates. +type RoundRobinPicker struct { + // currentIndex tracks the current position in the list of pods + currentIndex uint64 } -// NewPrefixMatchPicker returns a ready-to-use picker instance. -func NewPrefixMatchPicker() *PrefixMatchPicker { - return &PrefixMatchPicker{ - trie: newHashTrie(), - rnd: rand.New(rand.NewSource(time.Now().UnixNano())), - } +func (p *RoundRobinPicker) Name() string { + return "roundrobin" } -func (p *PrefixMatchPicker) Name() string { return "prefixmatch" } - -// Pick implements plugins.Picker. -// -// SchedulingContext is assumed to carry the inference request body in -// ctx.RequestBody (map[string]any) with the prompt at key "prompt". Adjust -// the accessor if your integration differs. -func (p *PrefixMatchPicker) Pick( - ctx *types.SchedulingContext, - scoredPods []*types.ScoredPod, -) *types.Result { +func (p *RoundRobinPicker) Pick(ctx *types.SchedulingContext, scoredPods []*types.ScoredPod) *types.Result { if len(scoredPods) == 0 { return &types.Result{} } + // sort the candidates by NamespacedName + sort.Slice(scoredPods, func(i, j int) bool { + return scoredPods[i].GetPod().NamespacedName.String() < scoredPods[j].GetPod().NamespacedName.String() + }) + ctx.Logger.V(logutil.DEBUG).Info(fmt.Sprintf("Sorted candidates: %+v", scoredPods)) + // select the next pod in the list + index := int(atomic.AddUint64(&p.currentIndex, 1) - 1) + index = index % len(scoredPods) + ctx.Logger.V(logutil.DEBUG).Info(fmt.Sprintf("Selecting pod at index %d from %d candidates in a round-robin fashion: %+v", + index, len(scoredPods), scoredPods)) - var prompt string - - if msgs, ok := ctx.RequestBody["messages"]; ok { - if arr, ok := msgs.([]any); ok { - var parts []string - for _, m := range arr { - mm, ok := m.(map[string]any) - if !ok { - continue - } - switch c := mm["content"].(type) { - case string: - parts = append(parts, c) - case []any: - for _, part := range c { - mp, ok := part.(map[string]any) - if !ok { - continue - } - if mp["type"] == "text" { - if txt, ok := mp["text"].(string); ok { - parts = append(parts, txt) - } - } - } - } - } - prompt = strings.Join(parts, "\n") - } - } - - if prompt == "" { - prompt, _ = ctx.RequestBody["prompt"].(string) - } - - // 1. Build the set of available endpoints. - available := make(map[string]struct{}, len(scoredPods)) - for _, sp := range scoredPods { - ep := sp.GetPod().EndpointURL // <-- adapt this accessor - available[ep] = struct{}{} - } - - // 2. Longest-prefix match within the trie. - matched := p.trie.longestPrefixMatch(prompt, available) - - // 3. Fallback: no match --> all endpoints are candidates. - if len(matched) == 0 { - for ep := range available { - matched[ep] = struct{}{} - } - } - - // 4. Convert the matched set to a slice and pick randomly. - endpoints := make([]string, 0, len(matched)) - for ep := range matched { - endpoints = append(endpoints, ep) - } - selected := endpoints[p.rnd.Intn(len(endpoints))] - - // 5. Cache the decision for future prefix look-ups. - p.trie.insert(prompt, selected) - - // 6. Return the pod whose URL matches `selected`. - for _, sp := range scoredPods { - if sp.GetPod().EndpointURL == selected { // same accessor as above - return &types.Result{TargetPod: sp} - } - } - // Should never hit; safe fallback. - return &types.Result{TargetPod: scoredPods[0]} -} - -/*---------------------------- trie implementation ---------------------------*/ - -type hashTrie struct { - mu sync.RWMutex - children map[uint64]*hashTrie - endpoints map[string]struct{} -} - -func newHashTrie() *hashTrie { - return &hashTrie{children: make(map[uint64]*hashTrie)} -} - -func intersection(a, b map[string]struct{}) map[string]struct{} { - res := make(map[string]struct{}) - for k := range a { - if _, ok := b[k]; ok { - res[k] = struct{}{} - } - } - return res -} - -func chunkAndHash(s string) []uint64 { - hashes := make([]uint64, 0, (len(s)+chunkSize-1)/chunkSize) - for i := 0; i < len(s); i += chunkSize { - end := i + chunkSize - if end > len(s) { - end = len(s) - } - hashes = append(hashes, xxhash.Sum64([]byte(s[i:end]))) - } - return hashes -} - -func (t *hashTrie) insert(key, endpoint string) { - t.mu.Lock() - defer t.mu.Unlock() - - node := t - if node.endpoints == nil { - node.endpoints = make(map[string]struct{}) - } - node.endpoints[endpoint] = struct{}{} - - for _, h := range chunkAndHash(key) { - child, ok := node.children[h] - if !ok { - child = newHashTrie() - node.children[h] = child - } - node = child - if node.endpoints == nil { - node.endpoints = make(map[string]struct{}) - } - node.endpoints[endpoint] = struct{}{} - } -} - -func (t *hashTrie) longestPrefixMatch( - key string, - available map[string]struct{}, -) map[string]struct{} { - t.mu.RLock() - defer t.mu.RUnlock() + return &types.Result{TargetPod: scoredPods[index]} - node := t - matched := intersection(node.endpoints, available) - - for _, h := range chunkAndHash(key) { - child, ok := node.children[h] - if !ok { - break - } - node = child - cand := intersection(node.endpoints, available) - if len(cand) == 0 { - break - } - matched = cand - } - return matched } From e6633bd9368fcfaadbcf9470dbcb3726323bccdb Mon Sep 17 00:00:00 2001 From: BrianPark314 Date: Thu, 31 Jul 2025 20:03:41 +0900 Subject: [PATCH 6/7] chore: fix pa picker logic Signed-off-by: BrianPark314 --- .../prefix_aware_picker.go | 109 ++++++++++++++---- 1 file changed, 85 insertions(+), 24 deletions(-) diff --git a/src/gateway_inference_extension/prefix_aware_picker.go b/src/gateway_inference_extension/prefix_aware_picker.go index 894c602cf..405286cfc 100644 --- a/src/gateway_inference_extension/prefix_aware_picker.go +++ b/src/gateway_inference_extension/prefix_aware_picker.go @@ -12,13 +12,18 @@ package picker import ( "math/rand" + "strings" "sync" "time" + "github.com/cespare/xxhash/v2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) +const chunkSize = 128 + var _ plugins.Picker = &PrefixMatchPicker{} // PrefixMatchPicker selects the engine whose URL was returned by the @@ -52,7 +57,40 @@ func (p *PrefixMatchPicker) Pick( return &types.Result{} } - prompt, _ := ctx.RequestBody["prompt"].(string) + var prompt string + + if msgs, ok := ctx.RequestBody["messages"]; ok { + if arr, ok := msgs.([]any); ok { + var parts []string + for _, m := range arr { + mm, ok := m.(map[string]any) + if !ok { + continue + } + switch c := mm["content"].(type) { + case string: + parts = append(parts, c) + case []any: + for _, part := range c { + mp, ok := part.(map[string]any) + if !ok { + continue + } + if mp["type"] == "text" { + if txt, ok := mp["text"].(string); ok { + parts = append(parts, txt) + } + } + } + } + } + prompt = strings.Join(parts, "\n") + } + } + + if prompt == "" { + prompt, _ = ctx.RequestBody["prompt"].(string) + } // 1. Build the set of available endpoints. available := make(map[string]struct{}, len(scoredPods)) @@ -95,12 +133,34 @@ func (p *PrefixMatchPicker) Pick( type hashTrie struct { mu sync.RWMutex - children map[rune]*hashTrie + children map[uint64]*hashTrie endpoints map[string]struct{} } func newHashTrie() *hashTrie { - return &hashTrie{children: make(map[rune]*hashTrie)} + return &hashTrie{children: make(map[uint64]*hashTrie)} +} + +func intersection(a, b map[string]struct{}) map[string]struct{} { + res := make(map[string]struct{}) + for k := range a { + if _, ok := b[k]; ok { + res[k] = struct{}{} + } + } + return res +} + +func chunkAndHash(s string) []uint64 { + hashes := make([]uint64, 0, (len(s)+chunkSize-1)/chunkSize) + for i := 0; i < len(s); i += chunkSize { + end := i + chunkSize + if end > len(s) { + end = len(s) + } + hashes = append(hashes, xxhash.Sum64([]byte(s[i:end]))) + } + return hashes } func (t *hashTrie) insert(key, endpoint string) { @@ -108,18 +168,23 @@ func (t *hashTrie) insert(key, endpoint string) { defer t.mu.Unlock() node := t - for _, r := range key { - child, ok := node.children[r] + if node.endpoints == nil { + node.endpoints = make(map[string]struct{}) + } + node.endpoints[endpoint] = struct{}{} + + for _, h := range chunkAndHash(key) { + child, ok := node.children[h] if !ok { child = newHashTrie() - node.children[r] = child + node.children[h] = child } node = child + if node.endpoints == nil { + node.endpoints = make(map[string]struct{}) + } + node.endpoints[endpoint] = struct{}{} } - if node.endpoints == nil { - node.endpoints = make(map[string]struct{}) - } - node.endpoints[endpoint] = struct{}{} } func (t *hashTrie) longestPrefixMatch( @@ -129,24 +194,20 @@ func (t *hashTrie) longestPrefixMatch( t.mu.RLock() defer t.mu.RUnlock() - var lastMatch map[string]struct{} node := t - for _, r := range key { - if node.endpoints != nil { - lastMatch = node.endpoints - } - child, ok := node.children[r] + matched := intersection(node.endpoints, available) + + for _, h := range chunkAndHash(key) { + child, ok := node.children[h] if !ok { break } node = child - } - // Filter by `available`. - res := make(map[string]struct{}) - for ep := range lastMatch { - if _, ok := available[ep]; ok { - res[ep] = struct{}{} + cand := intersection(node.endpoints, available) + if len(cand) == 0 { + break } + matched = cand } - return res -} + return matched +} \ No newline at end of file From 9dcc93a43712c896b0822c5e85bc6c32bcc52b75 Mon Sep 17 00:00:00 2001 From: BrianPark314 Date: Fri, 1 Aug 2025 16:10:33 +0900 Subject: [PATCH 7/7] chore: add newline Signed-off-by: BrianPark314 --- src/gateway_inference_extension/prefix_aware_picker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gateway_inference_extension/prefix_aware_picker.go b/src/gateway_inference_extension/prefix_aware_picker.go index 405286cfc..410eafde0 100644 --- a/src/gateway_inference_extension/prefix_aware_picker.go +++ b/src/gateway_inference_extension/prefix_aware_picker.go @@ -210,4 +210,4 @@ func (t *hashTrie) longestPrefixMatch( matched = cand } return matched -} \ No newline at end of file +}