Skip to content

Commit 6bde389

Browse files
committed
feat: use PodList with predicate func instead of PodGetAll to obtain only fresh metrics
1 parent bd69d8c commit 6bde389

File tree

17 files changed

+124
-102
lines changed

17 files changed

+124
-102
lines changed

cmd/epp/runner/runner.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
conformance_epp "sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp"
3939
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4040
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
41+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
4142
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config/loader"
4243
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4344
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
@@ -58,7 +59,7 @@ import (
5859
var (
5960
grpcPort = flag.Int(
6061
"grpcPort",
61-
runserver.DefaultGrpcPort,
62+
config.DefaultGrpcPort,
6263
"The gRPC port used for communicating with Envoy proxy")
6364
grpcHealthPort = flag.Int(
6465
"grpcHealthPort",
@@ -68,33 +69,26 @@ var (
6869
"metricsPort", 9090, "The metrics port")
6970
destinationEndpointHintKey = flag.String(
7071
"destinationEndpointHintKey",
71-
runserver.DefaultDestinationEndpointHintKey,
72+
config.DefaultDestinationEndpointHintKey,
7273
"Header and response metadata key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
7374
destinationEndpointHintMetadataNamespace = flag.String(
7475
"DestinationEndpointHintMetadataNamespace",
75-
runserver.DefaultDestinationEndpointHintMetadataNamespace,
76+
config.DefaultDestinationEndpointHintMetadataNamespace,
7677
"The key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the"+
7778
"target endpoint. If not set, then an outer namespace struct should not be created.")
7879
poolName = flag.String(
7980
"poolName",
80-
runserver.DefaultPoolName,
81+
config.DefaultPoolName,
8182
"Name of the InferencePool this Endpoint Picker is associated with.")
8283
poolNamespace = flag.String(
8384
"poolNamespace",
84-
runserver.DefaultPoolNamespace,
85+
config.DefaultPoolNamespace,
8586
"Namespace of the InferencePool this Endpoint Picker is associated with.")
86-
refreshMetricsInterval = flag.Duration(
87-
"refreshMetricsInterval",
88-
runserver.DefaultRefreshMetricsInterval,
89-
"interval to refresh metrics")
90-
refreshPrometheusMetricsInterval = flag.Duration(
91-
"refreshPrometheusMetricsInterval",
92-
runserver.DefaultRefreshPrometheusMetricsInterval,
93-
"interval to flush prometheus metrics")
87+
9488
logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity")
9589
secureServing = flag.Bool(
96-
"secureServing", runserver.DefaultSecureServing, "Enables secure serving. Defaults to true.")
97-
healthChecking = flag.Bool("healthChecking", runserver.DefaultHealthChecking, "Enables health checking")
90+
"secureServing", config.DefaultSecureServing, "Enables secure serving. Defaults to true.")
91+
healthChecking = flag.Bool("healthChecking", config.DefaultHealthChecking, "Enables health checking")
9892
certPath = flag.String(
9993
"certPath", "", "The path to the certificate for secure serving. The certificate and private key files "+
10094
"are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+
@@ -110,6 +104,16 @@ var (
110104
loraInfoMetric = flag.String("loraInfoMetric",
111105
"vllm:lora_requests_info",
112106
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
107+
108+
// metrics related flags
109+
refreshMetricsInterval = flag.Duration(
110+
"refreshMetricsInterval",
111+
config.DefaultRefreshMetricsInterval,
112+
"interval to refresh metrics")
113+
refreshPrometheusMetricsInterval = flag.Duration(
114+
"refreshPrometheusMetricsInterval",
115+
config.DefaultRefreshPrometheusMetricsInterval,
116+
"interval to flush prometheus metrics")
113117
metricsStalenessThreshold = flag.Duration("metricsStalenessThreshold",
114118
config.DefaultMetricsStalenessThreshold,
115119
"Duration after which metrics are considered stale. This is used to determine if a pod's metrics "+

pkg/epp/backend/metrics/fake.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
"context"
2121
"fmt"
2222
"sync"
23+
"time"
2324

2425
corev1 "k8s.io/api/core/v1"
2526
"k8s.io/apimachinery/pkg/types"
2627
"sigs.k8s.io/controller-runtime/pkg/log"
2728
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
29+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
2830
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2931
)
3032

@@ -49,6 +51,10 @@ func (fpm *FakePodMetrics) UpdatePod(pod *corev1.Pod) {
4951
}
5052
func (fpm *FakePodMetrics) StopRefreshLoop() {} // noop
5153

54+
func (fpm *FakePodMetrics) GetMetricsStalenessThreshold() time.Duration {
55+
return config.DefaultMetricsStalenessThreshold
56+
}
57+
5258
type FakePodMetricsClient struct {
5359
errMu sync.RWMutex
5460
Err map[types.NamespacedName]error

pkg/epp/backend/metrics/logger.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,20 @@ import (
2929
)
3030

3131
const (
32-
// Note currently the EPP treats stale metrics same as fresh.
33-
// TODO: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/336
34-
metricsValidityPeriod = 5 * time.Second
35-
debugPrintInterval = 5 * time.Second
32+
debugPrintInterval = 5 * time.Second
3633
)
3734

3835
type Datastore interface {
3936
PoolGet() (*v1alpha2.InferencePool, error)
4037
// PodMetrics operations
41-
// PodGetAll returns all pods and metrics, including fresh and stale.
38+
// PodGetAll returns all pods and metrics.
4239
PodGetAll() []PodMetrics
4340
PodList(func(PodMetrics) bool) []PodMetrics
4441
}
4542

4643
// StartMetricsLogger starts goroutines to 1) Print metrics debug logs if the DEBUG log level is
4744
// enabled; 2) flushes Prometheus metrics about the backend servers.
48-
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval time.Duration) {
45+
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval, metricsStalenessThreshold time.Duration) {
4946
logger := log.FromContext(ctx)
5047
ticker := time.NewTicker(refreshPrometheusMetricsInterval)
5148
go func() {
@@ -73,10 +70,10 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
7370
return
7471
case <-ticker.C:
7572
podsWithFreshMetrics := datastore.PodList(func(pm PodMetrics) bool {
76-
return time.Since(pm.GetMetrics().UpdateTime) <= metricsValidityPeriod
73+
return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold
7774
})
7875
podsWithStaleMetrics := datastore.PodList(func(pm PodMetrics) bool {
79-
return time.Since(pm.GetMetrics().UpdateTime) > metricsValidityPeriod
76+
return time.Since(pm.GetMetrics().UpdateTime) > metricsStalenessThreshold
8077
})
8178
s := fmt.Sprintf("Current Pods and metrics gathered. Fresh metrics: %+v, Stale metrics: %+v", podsWithFreshMetrics, podsWithStaleMetrics)
8279
logger.V(logutil.VERBOSE).Info(s)

pkg/epp/backend/metrics/pod_metrics.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ const (
3636
)
3737

3838
type podMetrics struct {
39-
pod atomic.Pointer[backend.Pod]
40-
metrics atomic.Pointer[MetricsState]
41-
pmc PodMetricsClient
42-
ds Datastore
43-
interval time.Duration
39+
pod atomic.Pointer[backend.Pod]
40+
metrics atomic.Pointer[MetricsState]
41+
pmc PodMetricsClient
42+
ds Datastore
43+
interval time.Duration
44+
stalenessThreshold time.Duration
4445

4546
startOnce sync.Once // ensures the refresh loop goroutine is started only once
4647
stopOnce sync.Once // ensures the done channel is closed only once
@@ -69,6 +70,10 @@ func (pm *podMetrics) UpdatePod(pod *corev1.Pod) {
6970
pm.pod.Store(toInternalPod(pod))
7071
}
7172

73+
func (pm *podMetrics) GetMetricsStalenessThreshold() time.Duration {
74+
return pm.stalenessThreshold
75+
}
76+
7277
func toInternalPod(pod *corev1.Pod) *backend.Pod {
7378
labels := make(map[string]string, len(pod.GetLabels()))
7479
for key, value := range pod.GetLabels() {

pkg/epp/backend/metrics/types.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,26 +31,27 @@ func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval, metricsS
3131
return &PodMetricsFactory{
3232
pmc: pmc,
3333
refreshMetricsInterval: refreshMetricsInterval,
34-
MetricsStalenessThreshold: metricsStalenessThreshold,
34+
metricsStalenessThreshold: metricsStalenessThreshold,
3535
}
3636
}
3737

3838
type PodMetricsFactory struct {
3939
pmc PodMetricsClient
4040
refreshMetricsInterval time.Duration
41-
MetricsStalenessThreshold time.Duration
41+
metricsStalenessThreshold time.Duration
4242
}
4343

4444
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {
4545
pod := toInternalPod(in)
4646
pm := &podMetrics{
47-
pmc: f.pmc,
48-
ds: ds,
49-
interval: f.refreshMetricsInterval,
50-
startOnce: sync.Once{},
51-
stopOnce: sync.Once{},
52-
done: make(chan struct{}),
53-
logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName),
47+
pmc: f.pmc,
48+
ds: ds,
49+
interval: f.refreshMetricsInterval,
50+
stalenessThreshold: f.metricsStalenessThreshold,
51+
startOnce: sync.Once{},
52+
stopOnce: sync.Once{},
53+
done: make(chan struct{}),
54+
logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName),
5455
}
5556
pm.pod.Store(pod)
5657
pm.metrics.Store(newMetricsState())
@@ -62,7 +63,12 @@ func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.
6263
type PodMetrics interface {
6364
GetPod() *backend.Pod
6465
GetMetrics() *MetricsState
66+
GetMetricsStalenessThreshold() time.Duration
6567
UpdatePod(*corev1.Pod)
6668
StopRefreshLoop()
6769
String() string
6870
}
71+
72+
func FreshMetricsFn(pm PodMetrics) bool {
73+
return time.Since(pm.GetMetrics().UpdateTime) <= pm.GetMetricsStalenessThreshold()
74+
}

pkg/epp/common/config/defaults.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,16 @@ const (
2929
DefaultQueueThresholdCritical = 5
3030
// DefaultMetricsStalenessThreshold defines how old metrics can be before they
3131
// are considered stale.
32-
// Given the pod metrics refresh interval is 50ms, a threshold slightly above
33-
// that should be fine.
34-
DefaultMetricsStalenessThreshold = 200 * time.Millisecond
32+
// The staleness is determined by the refresh internal plus the latency of the metrics API.
33+
// To be on the safer side, we start with a larger threshold.
34+
DefaultMetricsStalenessThreshold = 2 * time.Second // default for --metricsStalenessThreshold
35+
DefaultGrpcPort = 9002 // default for --grpcPort
36+
DefaultDestinationEndpointHintMetadataNamespace = "envoy.lb" // default for --destinationEndpointHintMetadataNamespace
37+
DefaultDestinationEndpointHintKey = "x-gateway-destination-endpoint" // default for --destinationEndpointHintKey
38+
DefaultPoolName = "" // required but no default
39+
DefaultPoolNamespace = "default" // default for --poolNamespace
40+
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
41+
DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval
42+
DefaultSecureServing = true // default for --secureServing
43+
DefaultHealthChecking = false // default for --healthChecking
3544
)

pkg/epp/datastore/datastore.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"fmt"
2323
"reflect"
2424
"sync"
25-
"time"
2625

2726
corev1 "k8s.io/api/core/v1"
2827
"k8s.io/apimachinery/pkg/labels"
@@ -62,7 +61,7 @@ type Datastore interface {
6261
ModelGetAll() []*v1alpha2.InferenceModel
6362

6463
// PodMetrics operations
65-
// PodGetAll returns all pods and metrics, including fresh and stale.
64+
// PodGetAll returns all pods with stale and fresh metrics, only for testing.
6665
PodGetAll() []backendmetrics.PodMetrics
6766
// PodList lists pods matching the given predicate.
6867
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
@@ -93,9 +92,8 @@ type datastore struct {
9392
// key: InferenceModel.Spec.ModelName, value: *InferenceModel
9493
models map[string]*v1alpha2.InferenceModel
9594
// key: types.NamespacedName, value: backendmetrics.PodMetrics
96-
pods *sync.Map
97-
pmf *backendmetrics.PodMetricsFactory
98-
MetricsStalenessThreshold time.Duration
95+
pods *sync.Map
96+
pmf *backendmetrics.PodMetricsFactory
9997
}
10098

10199
func (ds *datastore) Clear() {
@@ -247,9 +245,7 @@ func (ds *datastore) ModelGetAll() []*v1alpha2.InferenceModel {
247245
// /// Pods/endpoints APIs ///
248246

249247
func (ds *datastore) PodGetAll() []backendmetrics.PodMetrics {
250-
return ds.PodList(func(pm backendmetrics.PodMetrics) bool {
251-
return time.Since(pm.GetMetrics().UpdateTime) <= ds.pmf.MetricsStalenessThreshold
252-
})
248+
return ds.PodList(func(backendmetrics.PodMetrics) bool { return true })
253249
}
254250

255251
func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {

pkg/epp/datastore/datastore_test.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"k8s.io/apimachinery/pkg/types"
3232
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3333
"sigs.k8s.io/controller-runtime/pkg/client/fake"
34+
3435
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
3536
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3637
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
@@ -263,6 +264,12 @@ var (
263264
},
264265
WaitingModels: map[string]int{},
265266
}
267+
pod3 = &corev1.Pod{
268+
ObjectMeta: metav1.ObjectMeta{
269+
Name: "pod3",
270+
},
271+
}
272+
266273
pod1NamespacedName = types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}
267274
pod2NamespacedName = types.NamespacedName{Name: pod2.Name, Namespace: pod2.Namespace}
268275
inferencePool = &v1alpha2.InferencePool{
@@ -312,17 +319,19 @@ func TestMetrics(t *testing.T) {
312319
},
313320
},
314321
storePods: []*corev1.Pod{pod1, pod2},
315-
want: []*backendmetrics.MetricsState{
316-
pod1Metrics,
317-
// Failed to fetch pod2 metrics so it remains the default values.
318-
{
319-
ActiveModels: map[string]int{},
320-
WaitingModels: map[string]int{},
321-
WaitingQueueSize: 0,
322-
KVCacheUsagePercent: 0,
323-
MaxActiveModels: 0,
322+
want: []*backendmetrics.MetricsState{pod1Metrics},
323+
},
324+
{
325+
name: "Filter stale metrics",
326+
pmc: &backendmetrics.FakePodMetricsClient{
327+
Res: map[types.NamespacedName]*backendmetrics.MetricsState{
328+
pod1NamespacedName: pod1Metrics,
329+
pod2NamespacedName: pod2Metrics,
324330
},
325331
},
332+
storePods: []*corev1.Pod{pod1, pod2, pod3},
333+
want: []*backendmetrics.MetricsState{pod1Metrics, pod2Metrics}, // pod3 metrics were stale and should not be included.
334+
326335
},
327336
}
328337

@@ -342,10 +351,9 @@ func TestMetrics(t *testing.T) {
342351
for _, pod := range test.storePods {
343352
ds.PodUpdateOrAddIfNotExist(pod)
344353
}
354+
time.Sleep(1 * time.Second) // Give some time for the metrics to be fetched.
345355
assert.EventuallyWithT(t, func(t *assert.CollectT) {
346-
got := ds.PodList(func(backendmetrics.PodMetrics) bool {
347-
return true
348-
})
356+
got := ds.PodList(backendmetrics.FreshMetricsFn)
349357
metrics := []*backendmetrics.MetricsState{}
350358
for _, one := range got {
351359
metrics = append(metrics, one.GetMetrics())

pkg/epp/metrics/collectors/inference_pool.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/prometheus/client_golang/prometheus"
2121
compbasemetrics "k8s.io/component-base/metrics"
2222

23+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2324
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
2425
metricsutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/metrics"
2526
)
@@ -62,7 +63,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
6263
return
6364
}
6465

65-
podMetrics := c.ds.PodGetAll()
66+
podMetrics := c.ds.PodList(backendmetrics.FreshMetricsFn)
6667
if len(podMetrics) == 0 {
6768
return
6869
}

pkg/epp/requestcontrol/director.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
139139
// Snapshot pod metrics from the datastore to:
140140
// 1. Reduce concurrent access to the datastore.
141141
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
142-
candidatePods := schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll())
142+
candidatePods := schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodList(backendmetrics.FreshMetricsFn))
143143
results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, candidatePods)
144144
if err != nil {
145145
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}

0 commit comments

Comments
 (0)