Skip to content

Commit bd69d8c

Browse files
committed
feat: Make metrics stale time configurable
1 parent 36175ba commit bd69d8c

File tree

19 files changed

+93
-104
lines changed

19 files changed

+93
-104
lines changed

cmd/epp/runner/runner.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ var (
110110
loraInfoMetric = flag.String("loraInfoMetric",
111111
"vllm:lora_requests_info",
112112
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
113+
metricsStalenessThreshold = flag.Duration("metricsStalenessThreshold",
114+
config.DefaultMetricsStalenessThreshold,
115+
"Duration after which metrics are considered stale. This is used to determine if a pod's metrics "+
116+
"are fresh enough to be used for scheduling decisions.")
113117
// configuration flags
114118
configFile = flag.String("configFile", "", "The path to the configuration file")
115119
configText = flag.String("configText", "", "The configuration specified as text, in lieu of a file")
@@ -187,7 +191,8 @@ func (r *Runner) Run(ctx context.Context) error {
187191
return err
188192
}
189193
verifyMetricMapping(*mapping, setupLog)
190-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
194+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping},
195+
*refreshMetricsInterval, *metricsStalenessThreshold)
191196

192197
datastore := datastore.NewDatastore(ctx, pmf)
193198

pkg/epp/backend/metrics/pod_metrics_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/types"
2929
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
30+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
3031
)
3132

3233
var (
@@ -61,7 +62,7 @@ var (
6162
func TestMetricsRefresh(t *testing.T) {
6263
ctx := context.Background()
6364
pmc := &FakePodMetricsClient{}
64-
pmf := NewPodMetricsFactory(pmc, time.Millisecond)
65+
pmf := NewPodMetricsFactory(pmc, time.Millisecond, config.DefaultMetricsStalenessThreshold)
6566

6667
// The refresher is initialized with empty metrics.
6768
pm := pmf.NewPodMetrics(ctx, pod1, &fakeDataStore{})

pkg/epp/backend/metrics/types.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,18 @@ import (
2727
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
2828
)
2929

30-
func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval time.Duration) *PodMetricsFactory {
30+
func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval, metricsStalenessThreshold time.Duration) *PodMetricsFactory {
3131
return &PodMetricsFactory{
32-
pmc: pmc,
33-
refreshMetricsInterval: refreshMetricsInterval,
32+
pmc: pmc,
33+
refreshMetricsInterval: refreshMetricsInterval,
34+
MetricsStalenessThreshold: metricsStalenessThreshold,
3435
}
3536
}
3637

3738
type PodMetricsFactory struct {
38-
pmc PodMetricsClient
39-
refreshMetricsInterval time.Duration
39+
pmc PodMetricsClient
40+
refreshMetricsInterval time.Duration
41+
MetricsStalenessThreshold time.Duration
4042
}
4143

4244
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {

pkg/epp/common/config/defaults.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,18 @@ limitations under the License.
1818
// different EPP components.
1919
package config
2020

21+
import "time"
22+
2123
const (
2224
// DefaultKVCacheThreshold is the default KV cache utilization (0.0 to 1.0)
2325
// threshold.
2426
DefaultKVCacheThreshold = 0.8
2527
// DefaultQueueThresholdCritical is the default backend waiting queue size
2628
// threshold.
2729
DefaultQueueThresholdCritical = 5
30+
// DefaultMetricsStalenessThreshold defines how old metrics can be before they
31+
// are considered stale.
32+
// Given the pod metrics refresh interval is 50ms, a threshold slightly above
33+
// that should be fine.
34+
DefaultMetricsStalenessThreshold = 200 * time.Millisecond
2835
)

pkg/epp/controller/inferencemodel_reconciler_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"sigs.k8s.io/controller-runtime/pkg/client/fake"
3333
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
3434
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
35+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
3536
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3637
utiltest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing"
3738
)
@@ -194,7 +195,7 @@ func TestInferenceModelReconciler(t *testing.T) {
194195
WithObjects(initObjs...).
195196
WithIndex(&v1alpha2.InferenceModel{}, datastore.ModelNameIndexKey, indexInferenceModelsByModelName).
196197
Build()
197-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
198+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, config.DefaultMetricsStalenessThreshold)
198199
ds := datastore.NewDatastore(t.Context(), pmf)
199200
for _, m := range test.modelsInStore {
200201
ds.ModelSetIfOlder(m)

pkg/epp/controller/inferencepool_reconciler_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"sigs.k8s.io/controller-runtime/pkg/client/fake"
3333
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
3434
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
35+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
3536
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3637
utiltest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing"
3738
)
@@ -94,7 +95,7 @@ func TestInferencePoolReconciler(t *testing.T) {
9495
req := ctrl.Request{NamespacedName: namespacedName}
9596
ctx := context.Background()
9697

97-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
98+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, config.DefaultMetricsStalenessThreshold)
9899
datastore := datastore.NewDatastore(ctx, pmf)
99100
inferencePoolReconciler := &InferencePoolReconciler{Client: fakeClient, Datastore: datastore}
100101

@@ -169,7 +170,9 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string {
169170
params.wantPods = []string{}
170171
}
171172
gotPods := []string{}
172-
for _, pm := range datastore.PodGetAll() {
173+
for _, pm := range datastore.PodList(func(backendmetrics.PodMetrics) bool {
174+
return true
175+
}) {
173176
gotPods = append(gotPods, pm.GetPod().NamespacedName.Name)
174177
}
175178
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {

pkg/epp/controller/pod_reconciler_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"sigs.k8s.io/controller-runtime/pkg/client/fake"
3434
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
3535
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
36+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
3637
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3738
utiltest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing"
3839
)
@@ -43,7 +44,7 @@ var (
4344
basePod3 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3"}, Status: corev1.PodStatus{PodIP: "address-3"}}
4445
basePod11 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Status: corev1.PodStatus{PodIP: "address-11"}}
4546
pmc = &backendmetrics.FakePodMetricsClient{}
46-
pmf = backendmetrics.NewPodMetricsFactory(pmc, time.Second)
47+
pmf = backendmetrics.NewPodMetricsFactory(pmc, time.Second, config.DefaultMetricsStalenessThreshold)
4748
)
4849

4950
func TestPodReconciler(t *testing.T) {
@@ -197,7 +198,9 @@ func TestPodReconciler(t *testing.T) {
197198
}
198199

199200
var gotPods []*corev1.Pod
200-
for _, pm := range store.PodGetAll() {
201+
for _, pm := range store.PodList(func(backendmetrics.PodMetrics) bool {
202+
return true
203+
}) {
201204
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().NamespacedName.Name, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().Address}}
202205
gotPods = append(gotPods, pod)
203206
}

pkg/epp/datastore/datastore.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"reflect"
2424
"sync"
25+
"time"
2526

2627
corev1 "k8s.io/api/core/v1"
2728
"k8s.io/apimachinery/pkg/labels"
@@ -92,8 +93,9 @@ type datastore struct {
9293
// key: InferenceModel.Spec.ModelName, value: *InferenceModel
9394
models map[string]*v1alpha2.InferenceModel
9495
// key: types.NamespacedName, value: backendmetrics.PodMetrics
95-
pods *sync.Map
96-
pmf *backendmetrics.PodMetricsFactory
96+
pods *sync.Map
97+
pmf *backendmetrics.PodMetricsFactory
98+
MetricsStalenessThreshold time.Duration
9799
}
98100

99101
func (ds *datastore) Clear() {
@@ -245,7 +247,9 @@ func (ds *datastore) ModelGetAll() []*v1alpha2.InferenceModel {
245247
// /// Pods/endpoints APIs ///
246248

247249
func (ds *datastore) PodGetAll() []backendmetrics.PodMetrics {
248-
return ds.PodList(func(backendmetrics.PodMetrics) bool { return true })
250+
return ds.PodList(func(pm backendmetrics.PodMetrics) bool {
251+
return time.Since(pm.GetMetrics().UpdateTime) <= ds.pmf.MetricsStalenessThreshold
252+
})
249253
}
250254

251255
func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {

pkg/epp/datastore/datastore_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"sigs.k8s.io/controller-runtime/pkg/client/fake"
3434
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
3535
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
36+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
3637
testutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing"
3738
)
3839

@@ -80,7 +81,7 @@ func TestPool(t *testing.T) {
8081
fakeClient := fake.NewClientBuilder().
8182
WithScheme(scheme).
8283
Build()
83-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
84+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, config.DefaultMetricsStalenessThreshold)
8485
datastore := NewDatastore(context.Background(), pmf)
8586
_ = datastore.PoolSet(context.Background(), fakeClient, tt.inferencePool)
8687
gotPool, gotErr := datastore.PoolGet()
@@ -212,7 +213,7 @@ func TestModel(t *testing.T) {
212213
}
213214
for _, test := range tests {
214215
t.Run(test.name, func(t *testing.T) {
215-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
216+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, config.DefaultMetricsStalenessThreshold)
216217
ds := NewDatastore(t.Context(), pmf)
217218
for _, m := range test.existingModels {
218219
ds.ModelSetIfOlder(m)
@@ -335,14 +336,16 @@ func TestMetrics(t *testing.T) {
335336
fakeClient := fake.NewClientBuilder().
336337
WithScheme(scheme).
337338
Build()
338-
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond)
339+
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond, config.DefaultMetricsStalenessThreshold)
339340
ds := NewDatastore(ctx, pmf)
340341
_ = ds.PoolSet(ctx, fakeClient, inferencePool)
341342
for _, pod := range test.storePods {
342343
ds.PodUpdateOrAddIfNotExist(pod)
343344
}
344345
assert.EventuallyWithT(t, func(t *assert.CollectT) {
345-
got := ds.PodGetAll()
346+
got := ds.PodList(func(backendmetrics.PodMetrics) bool {
347+
return true
348+
})
346349
metrics := []*backendmetrics.MetricsState{}
347350
for _, one := range got {
348351
metrics = append(metrics, one.GetMetrics())
@@ -428,15 +431,17 @@ func TestPods(t *testing.T) {
428431
for _, test := range tests {
429432
t.Run(test.name, func(t *testing.T) {
430433
ctx := context.Background()
431-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
434+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, config.DefaultMetricsStalenessThreshold)
432435
ds := NewDatastore(t.Context(), pmf)
433436
for _, pod := range test.existingPods {
434437
ds.PodUpdateOrAddIfNotExist(pod)
435438
}
436439

437440
test.op(ctx, ds)
438441
var gotPods []*corev1.Pod
439-
for _, pm := range ds.PodGetAll() {
442+
for _, pm := range ds.PodList(func(backendmetrics.PodMetrics) bool {
443+
return true
444+
}) {
440445
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().NamespacedName.Name, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().Address}}
441446
gotPods = append(gotPods, pod)
442447
}

pkg/epp/metrics/collectors/inference_pool_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"sigs.k8s.io/controller-runtime/pkg/client/fake"
3131
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
3232
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
33+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
3334
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3435
)
3536

@@ -48,7 +49,7 @@ var (
4849
)
4950

5051
func TestNoMetricsCollected(t *testing.T) {
51-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
52+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, config.DefaultMetricsStalenessThreshold)
5253
datastore := datastore.NewDatastore(context.Background(), pmf)
5354

5455
collector := &inferencePoolMetricsCollector{
@@ -66,7 +67,7 @@ func TestMetricsCollected(t *testing.T) {
6667
pod1NamespacedName: pod1Metrics,
6768
},
6869
}
69-
pmf := backendmetrics.NewPodMetricsFactory(pmc, time.Millisecond)
70+
pmf := backendmetrics.NewPodMetricsFactory(pmc, time.Millisecond, config.DefaultMetricsStalenessThreshold)
7071
ds := datastore.NewDatastore(context.Background(), pmf)
7172

7273
scheme := runtime.NewScheme()

0 commit comments

Comments
 (0)