Skip to content

Commit dfbfce2

Browse files
committed
feat: use PodList with predicate func instead of PodGetAll to obtain only fresh metrics
1 parent 72c650d commit dfbfce2

File tree

17 files changed

+126
-86
lines changed

17 files changed

+126
-86
lines changed

cmd/epp/runner/runner.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242

4343
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4444
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
45+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
4546
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config/loader"
4647
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4748
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
@@ -64,7 +65,7 @@ import (
6465
var (
6566
grpcPort = flag.Int(
6667
"grpc-port",
67-
runserver.DefaultGrpcPort,
68+
config.DefaultGrpcPort,
6869
"The gRPC port used for communicating with Envoy proxy")
6970
grpcHealthPort = flag.Int(
7071
"grpc-health-port",
@@ -80,20 +81,20 @@ var (
8081
"Enables pprof handlers. Defaults to true. Set to false to disable pprof handlers.")
8182
destinationEndpointHintKey = flag.String(
8283
"destination-endpoint-hint-key",
83-
runserver.DefaultDestinationEndpointHintKey,
84+
config.DefaultDestinationEndpointHintKey,
8485
"Header and response metadata key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
8586
destinationEndpointHintMetadataNamespace = flag.String(
8687
"destination-endpoint-hint-metadata-namespace",
87-
runserver.DefaultDestinationEndpointHintMetadataNamespace,
88+
config.DefaultDestinationEndpointHintMetadataNamespace,
8889
"The key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the"+
8990
"target endpoint. If not set, then an outer namespace struct should not be created.")
9091
poolName = flag.String(
9192
"pool-name",
92-
runserver.DefaultPoolName,
93+
config.DefaultPoolName,
9394
"Name of the InferencePool this Endpoint Picker is associated with.")
9495
poolNamespace = flag.String(
9596
"pool-namespace",
96-
runserver.DefaultPoolNamespace,
97+
config.DefaultPoolNamespace,
9798
"Namespace of the InferencePool this Endpoint Picker is associated with.")
9899
refreshMetricsInterval = flag.Duration(
99100
"refresh-metrics-interval",
@@ -109,11 +110,11 @@ var (
109110
"number for the log level verbosity")
110111
secureServing = flag.Bool(
111112
"secure-serving",
112-
runserver.DefaultSecureServing,
113+
config.DefaultSecureServing,
113114
"Enables secure serving. Defaults to true.")
114115
healthChecking = flag.Bool(
115116
"health-checking",
116-
runserver.DefaultHealthChecking,
117+
config.DefaultHealthChecking,
117118
"Enables health checking")
118119
certPath = flag.String(
119120
"cert-path",
@@ -135,6 +136,16 @@ var (
135136
"lora-info-metric",
136137
runserver.DefaultLoraInfoMetric,
137138
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
139+
140+
// metrics related flags
141+
refreshMetricsInterval = flag.Duration(
142+
"refreshMetricsInterval",
143+
config.DefaultRefreshMetricsInterval,
144+
"interval to refresh metrics")
145+
refreshPrometheusMetricsInterval = flag.Duration(
146+
"refreshPrometheusMetricsInterval",
147+
config.DefaultRefreshPrometheusMetricsInterval,
148+
"interval to flush prometheus metrics")
138149
metricsStalenessThreshold = flag.Duration("metricsStalenessThreshold",
139150
config.DefaultMetricsStalenessThreshold,
140151
"Duration after which metrics are considered stale. This is used to determine if a pod's metrics "+
@@ -339,6 +350,7 @@ func (r *Runner) Run(ctx context.Context) error {
339350
HealthChecking: *healthChecking,
340351
CertPath: *certPath,
341352
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
353+
MetricsStalenessThreshold: *metricsStalenessThreshold,
342354
Director: director,
343355
SaturationDetector: saturationDetector,
344356
}

pkg/epp/backend/metrics/fake.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ import (
2020
"context"
2121
"fmt"
2222
"sync"
23+
"time"
2324

2425
corev1 "k8s.io/api/core/v1"
2526
"k8s.io/apimachinery/pkg/types"
2627
"sigs.k8s.io/controller-runtime/pkg/log"
2728

2829
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
30+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
2931
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3032
)
3133

@@ -50,6 +52,10 @@ func (fpm *FakePodMetrics) UpdatePod(pod *corev1.Pod) {
5052
}
5153
func (fpm *FakePodMetrics) StopRefreshLoop() {} // noop
5254

55+
func (fpm *FakePodMetrics) GetMetricsStalenessThreshold() time.Duration {
56+
return config.DefaultMetricsStalenessThreshold
57+
}
58+
5359
type FakePodMetricsClient struct {
5460
errMu sync.RWMutex
5561
Err map[types.NamespacedName]error

pkg/epp/backend/metrics/logger.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,20 @@ import (
3030
)
3131

3232
const (
33-
// Note currently the EPP treats stale metrics same as fresh.
34-
// TODO: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/336
35-
metricsValidityPeriod = 5 * time.Second
36-
debugPrintInterval = 5 * time.Second
33+
debugPrintInterval = 5 * time.Second
3734
)
3835

3936
type Datastore interface {
4037
PoolGet() (*v1.InferencePool, error)
4138
// PodMetrics operations
42-
// PodGetAll returns all pods and metrics, including fresh and stale.
39+
// PodGetAll returns all pods and metrics.
4340
PodGetAll() []PodMetrics
4441
PodList(func(PodMetrics) bool) []PodMetrics
4542
}
4643

4744
// StartMetricsLogger starts goroutines to 1) Print metrics debug logs if the DEBUG log level is
4845
// enabled; 2) flushes Prometheus metrics about the backend servers.
49-
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval time.Duration) {
46+
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval, metricsStalenessThreshold time.Duration) {
5047
logger := log.FromContext(ctx)
5148
ticker := time.NewTicker(refreshPrometheusMetricsInterval)
5249
go func() {
@@ -57,7 +54,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
5754
logger.V(logutil.DEFAULT).Info("Shutting down prometheus metrics thread")
5855
return
5956
case <-ticker.C: // Periodically refresh prometheus metrics for inference pool
60-
refreshPrometheusMetrics(logger, datastore)
57+
refreshPrometheusMetrics(logger, datastore, metricsStalenessThreshold)
6158
}
6259
}
6360
}()
@@ -74,10 +71,10 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
7471
return
7572
case <-ticker.C:
7673
podsWithFreshMetrics := datastore.PodList(func(pm PodMetrics) bool {
77-
return time.Since(pm.GetMetrics().UpdateTime) <= metricsValidityPeriod
74+
return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold
7875
})
7976
podsWithStaleMetrics := datastore.PodList(func(pm PodMetrics) bool {
80-
return time.Since(pm.GetMetrics().UpdateTime) > metricsValidityPeriod
77+
return time.Since(pm.GetMetrics().UpdateTime) > metricsStalenessThreshold
8178
})
8279
s := fmt.Sprintf("Current Pods and metrics gathered. Fresh metrics: %+v, Stale metrics: %+v", podsWithFreshMetrics, podsWithStaleMetrics)
8380
logger.V(logutil.VERBOSE).Info(s)
@@ -87,7 +84,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
8784
}
8885
}
8986

90-
func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore) {
87+
func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore, metricsStalenessThreshold time.Duration) {
9188
pool, err := datastore.PoolGet()
9289
if err != nil {
9390
// No inference pool or not initialize.
@@ -98,7 +95,9 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore) {
9895
var kvCacheTotal float64
9996
var queueTotal int
10097

101-
podMetrics := datastore.PodGetAll()
98+
podMetrics := datastore.PodList(func(pm PodMetrics) bool {
99+
return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold
100+
})
102101
logger.V(logutil.TRACE).Info("Refreshing Prometheus Metrics", "ReadyPods", len(podMetrics))
103102
if len(podMetrics) == 0 {
104103
return

pkg/epp/backend/metrics/pod_metrics.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ const (
3636
)
3737

3838
type podMetrics struct {
39-
pod atomic.Pointer[backend.Pod]
40-
metrics atomic.Pointer[MetricsState]
41-
pmc PodMetricsClient
42-
ds Datastore
43-
interval time.Duration
39+
pod atomic.Pointer[backend.Pod]
40+
metrics atomic.Pointer[MetricsState]
41+
pmc PodMetricsClient
42+
ds Datastore
43+
interval time.Duration
44+
stalenessThreshold time.Duration
4445

4546
startOnce sync.Once // ensures the refresh loop goroutine is started only once
4647
stopOnce sync.Once // ensures the done channel is closed only once
@@ -69,6 +70,10 @@ func (pm *podMetrics) UpdatePod(pod *corev1.Pod) {
6970
pm.pod.Store(toInternalPod(pod))
7071
}
7172

73+
func (pm *podMetrics) GetMetricsStalenessThreshold() time.Duration {
74+
return pm.stalenessThreshold
75+
}
76+
7277
func toInternalPod(pod *corev1.Pod) *backend.Pod {
7378
labels := make(map[string]string, len(pod.GetLabels()))
7479
for key, value := range pod.GetLabels() {

pkg/epp/backend/metrics/types.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,26 +32,27 @@ func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval, metricsS
3232
return &PodMetricsFactory{
3333
pmc: pmc,
3434
refreshMetricsInterval: refreshMetricsInterval,
35-
MetricsStalenessThreshold: metricsStalenessThreshold,
35+
metricsStalenessThreshold: metricsStalenessThreshold,
3636
}
3737
}
3838

3939
type PodMetricsFactory struct {
4040
pmc PodMetricsClient
4141
refreshMetricsInterval time.Duration
42-
MetricsStalenessThreshold time.Duration
42+
metricsStalenessThreshold time.Duration
4343
}
4444

4545
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {
4646
pod := toInternalPod(in)
4747
pm := &podMetrics{
48-
pmc: f.pmc,
49-
ds: ds,
50-
interval: f.refreshMetricsInterval,
51-
startOnce: sync.Once{},
52-
stopOnce: sync.Once{},
53-
done: make(chan struct{}),
54-
logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName),
48+
pmc: f.pmc,
49+
ds: ds,
50+
interval: f.refreshMetricsInterval,
51+
stalenessThreshold: f.metricsStalenessThreshold,
52+
startOnce: sync.Once{},
53+
stopOnce: sync.Once{},
54+
done: make(chan struct{}),
55+
logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName),
5556
}
5657
pm.pod.Store(pod)
5758
pm.metrics.Store(NewMetricsState())
@@ -63,7 +64,12 @@ func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.
6364
type PodMetrics interface {
6465
GetPod() *backend.Pod
6566
GetMetrics() *MetricsState
67+
GetMetricsStalenessThreshold() time.Duration
6668
UpdatePod(*corev1.Pod)
6769
StopRefreshLoop()
6870
String() string
6971
}
72+
73+
func FreshMetricsFn(pm PodMetrics) bool {
74+
return time.Since(pm.GetMetrics().UpdateTime) <= pm.GetMetricsStalenessThreshold()
75+
}

pkg/epp/common/config/defaults.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,16 @@ const (
2929
DefaultQueueThresholdCritical = 5
3030
// DefaultMetricsStalenessThreshold defines how old metrics can be before they
3131
// are considered stale.
32-
// Given the pod metrics refresh interval is 50ms, a threshold slightly above
33-
// that should be fine.
34-
DefaultMetricsStalenessThreshold = 200 * time.Millisecond
32+
// The staleness is determined by the refresh internal plus the latency of the metrics API.
33+
// To be on the safer side, we start with a larger threshold.
34+
DefaultMetricsStalenessThreshold = 2 * time.Second // default for --metricsStalenessThreshold
35+
DefaultGrpcPort = 9002 // default for --grpcPort
36+
DefaultDestinationEndpointHintMetadataNamespace = "envoy.lb" // default for --destinationEndpointHintMetadataNamespace
37+
DefaultDestinationEndpointHintKey = "x-gateway-destination-endpoint" // default for --destinationEndpointHintKey
38+
DefaultPoolName = "" // required but no default
39+
DefaultPoolNamespace = "default" // default for --poolNamespace
40+
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
41+
DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval
42+
DefaultSecureServing = true // default for --secureServing
43+
DefaultHealthChecking = false // default for --healthChecking
3544
)

pkg/epp/datastore/datastore.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"fmt"
2323
"reflect"
2424
"sync"
25-
"time"
2625

2726
corev1 "k8s.io/api/core/v1"
2827
"k8s.io/apimachinery/pkg/labels"
@@ -64,7 +63,7 @@ type Datastore interface {
6463
ModelGetAll() []*v1alpha2.InferenceModel
6564

6665
// PodMetrics operations
67-
// PodGetAll returns all pods and metrics, including fresh and stale.
66+
// PodGetAll returns all pods with stale and fresh metrics, only for testing.
6867
PodGetAll() []backendmetrics.PodMetrics
6968
// PodList lists pods matching the given predicate.
7069
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
@@ -95,9 +94,8 @@ type datastore struct {
9594
// key: InferenceModel.Spec.ModelName, value: *InferenceModel
9695
models map[string]*v1alpha2.InferenceModel
9796
// key: types.NamespacedName, value: backendmetrics.PodMetrics
98-
pods *sync.Map
99-
pmf *backendmetrics.PodMetricsFactory
100-
MetricsStalenessThreshold time.Duration
97+
pods *sync.Map
98+
pmf *backendmetrics.PodMetricsFactory
10199
}
102100

103101
func (ds *datastore) Clear() {
@@ -249,9 +247,7 @@ func (ds *datastore) ModelGetAll() []*v1alpha2.InferenceModel {
249247
// /// Pods/endpoints APIs ///
250248

251249
func (ds *datastore) PodGetAll() []backendmetrics.PodMetrics {
252-
return ds.PodList(func(pm backendmetrics.PodMetrics) bool {
253-
return time.Since(pm.GetMetrics().UpdateTime) <= ds.pmf.MetricsStalenessThreshold
254-
})
250+
return ds.PodList(func(backendmetrics.PodMetrics) bool { return true })
255251
}
256252

257253
func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {

pkg/epp/datastore/datastore_test.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,12 @@ var (
265265
},
266266
WaitingModels: map[string]int{},
267267
}
268+
pod3 = &corev1.Pod{
269+
ObjectMeta: metav1.ObjectMeta{
270+
Name: "pod3",
271+
},
272+
}
273+
268274
pod1NamespacedName = types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}
269275
pod2NamespacedName = types.NamespacedName{Name: pod2.Name, Namespace: pod2.Namespace}
270276
inferencePool = &v1.InferencePool{
@@ -314,17 +320,19 @@ func TestMetrics(t *testing.T) {
314320
},
315321
},
316322
storePods: []*corev1.Pod{pod1, pod2},
317-
want: []*backendmetrics.MetricsState{
318-
pod1Metrics,
319-
// Failed to fetch pod2 metrics so it remains the default values.
320-
{
321-
ActiveModels: map[string]int{},
322-
WaitingModels: map[string]int{},
323-
WaitingQueueSize: 0,
324-
KVCacheUsagePercent: 0,
325-
MaxActiveModels: 0,
323+
want: []*backendmetrics.MetricsState{pod1Metrics},
324+
},
325+
{
326+
name: "Filter stale metrics",
327+
pmc: &backendmetrics.FakePodMetricsClient{
328+
Res: map[types.NamespacedName]*backendmetrics.MetricsState{
329+
pod1NamespacedName: pod1Metrics,
330+
pod2NamespacedName: pod2Metrics,
326331
},
327332
},
333+
storePods: []*corev1.Pod{pod1, pod2, pod3},
334+
want: []*backendmetrics.MetricsState{pod1Metrics, pod2Metrics}, // pod3 metrics were stale and should not be included.
335+
328336
},
329337
}
330338

@@ -344,10 +352,9 @@ func TestMetrics(t *testing.T) {
344352
for _, pod := range test.storePods {
345353
ds.PodUpdateOrAddIfNotExist(pod)
346354
}
355+
time.Sleep(1 * time.Second) // Give some time for the metrics to be fetched.
347356
assert.EventuallyWithT(t, func(t *assert.CollectT) {
348-
got := ds.PodList(func(backendmetrics.PodMetrics) bool {
349-
return true
350-
})
357+
got := ds.PodList(backendmetrics.FreshMetricsFn)
351358
metrics := []*backendmetrics.MetricsState{}
352359
for _, one := range got {
353360
metrics = append(metrics, one.GetMetrics())

pkg/epp/metrics/collectors/inference_pool.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/prometheus/client_golang/prometheus"
2121
compbasemetrics "k8s.io/component-base/metrics"
2222

23+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2324
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
2425
metricsutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/metrics"
2526
)
@@ -62,7 +63,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
6263
return
6364
}
6465

65-
podMetrics := c.ds.PodGetAll()
66+
podMetrics := c.ds.PodList(backendmetrics.FreshMetricsFn)
6667
if len(podMetrics) == 0 {
6768
return
6869
}

pkg/epp/requestcontrol/director_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ func TestGetCandidatePodsForScheduling(t *testing.T) {
532532
},
533533
}
534534

535-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
535+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, config.DefaultMetricsStalenessThreshold)
536536
ds := datastore.NewDatastore(t.Context(), pmf)
537537
for _, testPod := range testInput {
538538
ds.PodUpdateOrAddIfNotExist(testPod)

0 commit comments

Comments
 (0)