Skip to content

Commit 1fbad64

Browse files
committed
feat: use PodList with predicate func instead of PodGetAll to obtain only fresh metrics
1 parent 9043e4f commit 1fbad64

File tree

17 files changed

+129
-105
lines changed

17 files changed

+129
-105
lines changed

cmd/epp/runner/runner.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
conformance_epp "sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp"
4040
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4141
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
42+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
4243
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config/loader"
4344
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4445
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
@@ -59,7 +60,7 @@ import (
5960
var (
6061
grpcPort = flag.Int(
6162
"grpcPort",
62-
runserver.DefaultGrpcPort,
63+
config.DefaultGrpcPort,
6364
"The gRPC port used for communicating with Envoy proxy")
6465
grpcHealthPort = flag.Int(
6566
"grpcHealthPort",
@@ -69,33 +70,26 @@ var (
6970
"metricsPort", 9090, "The metrics port")
7071
destinationEndpointHintKey = flag.String(
7172
"destinationEndpointHintKey",
72-
runserver.DefaultDestinationEndpointHintKey,
73+
config.DefaultDestinationEndpointHintKey,
7374
"Header and response metadata key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
7475
destinationEndpointHintMetadataNamespace = flag.String(
7576
"DestinationEndpointHintMetadataNamespace",
76-
runserver.DefaultDestinationEndpointHintMetadataNamespace,
77+
config.DefaultDestinationEndpointHintMetadataNamespace,
7778
"The key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the"+
7879
"target endpoint. If not set, then an outer namespace struct should not be created.")
7980
poolName = flag.String(
8081
"poolName",
81-
runserver.DefaultPoolName,
82+
config.DefaultPoolName,
8283
"Name of the InferencePool this Endpoint Picker is associated with.")
8384
poolNamespace = flag.String(
8485
"poolNamespace",
85-
runserver.DefaultPoolNamespace,
86+
config.DefaultPoolNamespace,
8687
"Namespace of the InferencePool this Endpoint Picker is associated with.")
87-
refreshMetricsInterval = flag.Duration(
88-
"refreshMetricsInterval",
89-
runserver.DefaultRefreshMetricsInterval,
90-
"interval to refresh metrics")
91-
refreshPrometheusMetricsInterval = flag.Duration(
92-
"refreshPrometheusMetricsInterval",
93-
runserver.DefaultRefreshPrometheusMetricsInterval,
94-
"interval to flush prometheus metrics")
88+
9589
logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity")
9690
secureServing = flag.Bool(
97-
"secureServing", runserver.DefaultSecureServing, "Enables secure serving. Defaults to true.")
98-
healthChecking = flag.Bool("healthChecking", runserver.DefaultHealthChecking, "Enables health checking")
91+
"secureServing", config.DefaultSecureServing, "Enables secure serving. Defaults to true.")
92+
healthChecking = flag.Bool("healthChecking", config.DefaultHealthChecking, "Enables health checking")
9993
certPath = flag.String(
10094
"certPath", "", "The path to the certificate for secure serving. The certificate and private key files "+
10195
"are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+
@@ -111,6 +105,16 @@ var (
111105
loraInfoMetric = flag.String("loraInfoMetric",
112106
"vllm:lora_requests_info",
113107
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
108+
109+
// metrics related flags
110+
refreshMetricsInterval = flag.Duration(
111+
"refreshMetricsInterval",
112+
config.DefaultRefreshMetricsInterval,
113+
"interval to refresh metrics")
114+
refreshPrometheusMetricsInterval = flag.Duration(
115+
"refreshPrometheusMetricsInterval",
116+
config.DefaultRefreshPrometheusMetricsInterval,
117+
"interval to flush prometheus metrics")
114118
metricsStalenessThreshold = flag.Duration("metricsStalenessThreshold",
115119
config.DefaultMetricsStalenessThreshold,
116120
"Duration after which metrics are considered stale. This is used to determine if a pod's metrics "+

pkg/epp/backend/metrics/fake.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
"context"
2121
"fmt"
2222
"sync"
23+
"time"
2324

2425
corev1 "k8s.io/api/core/v1"
2526
"k8s.io/apimachinery/pkg/types"
2627
"sigs.k8s.io/controller-runtime/pkg/log"
2728
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
29+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
2830
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2931
)
3032

@@ -49,6 +51,10 @@ func (fpm *FakePodMetrics) UpdatePod(pod *corev1.Pod) {
4951
}
5052
func (fpm *FakePodMetrics) StopRefreshLoop() {} // noop
5153

54+
func (fpm *FakePodMetrics) GetMetricsStalenessThreshold() time.Duration {
55+
return config.DefaultMetricsStalenessThreshold
56+
}
57+
5258
type FakePodMetricsClient struct {
5359
errMu sync.RWMutex
5460
Err map[types.NamespacedName]error

pkg/epp/backend/metrics/logger.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,20 @@ import (
2929
)
3030

3131
const (
32-
// Note currently the EPP treats stale metrics same as fresh.
33-
// TODO: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/336
34-
metricsValidityPeriod = 5 * time.Second
35-
debugPrintInterval = 5 * time.Second
32+
debugPrintInterval = 5 * time.Second
3633
)
3734

3835
type Datastore interface {
3936
PoolGet() (*v1alpha2.InferencePool, error)
4037
// PodMetrics operations
41-
// PodGetAll returns all pods and metrics, including fresh and stale.
38+
// PodGetAll returns all pods and metrics.
4239
PodGetAll() []PodMetrics
4340
PodList(func(PodMetrics) bool) []PodMetrics
4441
}
4542

4643
// StartMetricsLogger starts goroutines to 1) Print metrics debug logs if the DEBUG log level is
4744
// enabled; 2) flushes Prometheus metrics about the backend servers.
48-
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval time.Duration) {
45+
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval, metricsStalenessThreshold time.Duration) {
4946
logger := log.FromContext(ctx)
5047
ticker := time.NewTicker(refreshPrometheusMetricsInterval)
5148
go func() {
@@ -56,7 +53,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
5653
logger.V(logutil.DEFAULT).Info("Shutting down prometheus metrics thread")
5754
return
5855
case <-ticker.C: // Periodically refresh prometheus metrics for inference pool
59-
refreshPrometheusMetrics(logger, datastore)
56+
refreshPrometheusMetrics(logger, datastore, metricsStalenessThreshold)
6057
}
6158
}
6259
}()
@@ -73,10 +70,10 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
7370
return
7471
case <-ticker.C:
7572
podsWithFreshMetrics := datastore.PodList(func(pm PodMetrics) bool {
76-
return time.Since(pm.GetMetrics().UpdateTime) <= metricsValidityPeriod
73+
return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold
7774
})
7875
podsWithStaleMetrics := datastore.PodList(func(pm PodMetrics) bool {
79-
return time.Since(pm.GetMetrics().UpdateTime) > metricsValidityPeriod
76+
return time.Since(pm.GetMetrics().UpdateTime) > metricsStalenessThreshold
8077
})
8178
s := fmt.Sprintf("Current Pods and metrics gathered. Fresh metrics: %+v, Stale metrics: %+v", podsWithFreshMetrics, podsWithStaleMetrics)
8279
logger.V(logutil.VERBOSE).Info(s)
@@ -86,7 +83,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
8683
}
8784
}
8885

89-
func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore) {
86+
func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore, metricsStalenessThreshold time.Duration) {
9087
pool, err := datastore.PoolGet()
9188
if err != nil {
9289
// No inference pool or not initialize.
@@ -97,7 +94,9 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore) {
9794
var kvCacheTotal float64
9895
var queueTotal int
9996

100-
podMetrics := datastore.PodGetAll()
97+
podMetrics := datastore.PodList(func(pm PodMetrics) bool {
98+
return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold
99+
})
101100
logger.V(logutil.TRACE).Info("Refreshing Prometheus Metrics", "ReadyPods", len(podMetrics))
102101
if len(podMetrics) == 0 {
103102
return

pkg/epp/backend/metrics/pod_metrics.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ const (
3636
)
3737

3838
type podMetrics struct {
39-
pod atomic.Pointer[backend.Pod]
40-
metrics atomic.Pointer[MetricsState]
41-
pmc PodMetricsClient
42-
ds Datastore
43-
interval time.Duration
39+
pod atomic.Pointer[backend.Pod]
40+
metrics atomic.Pointer[MetricsState]
41+
pmc PodMetricsClient
42+
ds Datastore
43+
interval time.Duration
44+
stalenessThreshold time.Duration
4445

4546
startOnce sync.Once // ensures the refresh loop goroutine is started only once
4647
stopOnce sync.Once // ensures the done channel is closed only once
@@ -69,6 +70,10 @@ func (pm *podMetrics) UpdatePod(pod *corev1.Pod) {
6970
pm.pod.Store(toInternalPod(pod))
7071
}
7172

73+
func (pm *podMetrics) GetMetricsStalenessThreshold() time.Duration {
74+
return pm.stalenessThreshold
75+
}
76+
7277
func toInternalPod(pod *corev1.Pod) *backend.Pod {
7378
labels := make(map[string]string, len(pod.GetLabels()))
7479
for key, value := range pod.GetLabels() {

pkg/epp/backend/metrics/types.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,26 +31,27 @@ func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval, metricsS
3131
return &PodMetricsFactory{
3232
pmc: pmc,
3333
refreshMetricsInterval: refreshMetricsInterval,
34-
MetricsStalenessThreshold: metricsStalenessThreshold,
34+
metricsStalenessThreshold: metricsStalenessThreshold,
3535
}
3636
}
3737

3838
type PodMetricsFactory struct {
3939
pmc PodMetricsClient
4040
refreshMetricsInterval time.Duration
41-
MetricsStalenessThreshold time.Duration
41+
metricsStalenessThreshold time.Duration
4242
}
4343

4444
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {
4545
pod := toInternalPod(in)
4646
pm := &podMetrics{
47-
pmc: f.pmc,
48-
ds: ds,
49-
interval: f.refreshMetricsInterval,
50-
startOnce: sync.Once{},
51-
stopOnce: sync.Once{},
52-
done: make(chan struct{}),
53-
logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName),
47+
pmc: f.pmc,
48+
ds: ds,
49+
interval: f.refreshMetricsInterval,
50+
stalenessThreshold: f.metricsStalenessThreshold,
51+
startOnce: sync.Once{},
52+
stopOnce: sync.Once{},
53+
done: make(chan struct{}),
54+
logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName),
5455
}
5556
pm.pod.Store(pod)
5657
pm.metrics.Store(NewMetricsState())
@@ -62,7 +63,12 @@ func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.
6263
type PodMetrics interface {
6364
GetPod() *backend.Pod
6465
GetMetrics() *MetricsState
66+
GetMetricsStalenessThreshold() time.Duration
6567
UpdatePod(*corev1.Pod)
6668
StopRefreshLoop()
6769
String() string
6870
}
71+
72+
func FreshMetricsFn(pm PodMetrics) bool {
73+
return time.Since(pm.GetMetrics().UpdateTime) <= pm.GetMetricsStalenessThreshold()
74+
}

pkg/epp/common/config/defaults.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,16 @@ const (
2929
DefaultQueueThresholdCritical = 5
3030
// DefaultMetricsStalenessThreshold defines how old metrics can be before they
3131
// are considered stale.
32-
// Given the pod metrics refresh interval is 50ms, a threshold slightly above
33-
// that should be fine.
34-
DefaultMetricsStalenessThreshold = 200 * time.Millisecond
32+
// The staleness is determined by the refresh internal plus the latency of the metrics API.
33+
// To be on the safer side, we start with a larger threshold.
34+
DefaultMetricsStalenessThreshold = 2 * time.Second // default for --metricsStalenessThreshold
35+
DefaultGrpcPort = 9002 // default for --grpcPort
36+
DefaultDestinationEndpointHintMetadataNamespace = "envoy.lb" // default for --destinationEndpointHintMetadataNamespace
37+
DefaultDestinationEndpointHintKey = "x-gateway-destination-endpoint" // default for --destinationEndpointHintKey
38+
DefaultPoolName = "" // required but no default
39+
DefaultPoolNamespace = "default" // default for --poolNamespace
40+
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
41+
DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval
42+
DefaultSecureServing = true // default for --secureServing
43+
DefaultHealthChecking = false // default for --healthChecking
3544
)

pkg/epp/datastore/datastore.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"fmt"
2323
"reflect"
2424
"sync"
25-
"time"
2625

2726
corev1 "k8s.io/api/core/v1"
2827
"k8s.io/apimachinery/pkg/labels"
@@ -62,7 +61,7 @@ type Datastore interface {
6261
ModelGetAll() []*v1alpha2.InferenceModel
6362

6463
// PodMetrics operations
65-
// PodGetAll returns all pods and metrics, including fresh and stale.
64+
// PodGetAll returns all pods with stale and fresh metrics, only for testing.
6665
PodGetAll() []backendmetrics.PodMetrics
6766
// PodList lists pods matching the given predicate.
6867
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
@@ -93,9 +92,8 @@ type datastore struct {
9392
// key: InferenceModel.Spec.ModelName, value: *InferenceModel
9493
models map[string]*v1alpha2.InferenceModel
9594
// key: types.NamespacedName, value: backendmetrics.PodMetrics
96-
pods *sync.Map
97-
pmf *backendmetrics.PodMetricsFactory
98-
MetricsStalenessThreshold time.Duration
95+
pods *sync.Map
96+
pmf *backendmetrics.PodMetricsFactory
9997
}
10098

10199
func (ds *datastore) Clear() {
@@ -247,9 +245,7 @@ func (ds *datastore) ModelGetAll() []*v1alpha2.InferenceModel {
247245
// /// Pods/endpoints APIs ///
248246

249247
func (ds *datastore) PodGetAll() []backendmetrics.PodMetrics {
250-
return ds.PodList(func(pm backendmetrics.PodMetrics) bool {
251-
return time.Since(pm.GetMetrics().UpdateTime) <= ds.pmf.MetricsStalenessThreshold
252-
})
248+
return ds.PodList(func(backendmetrics.PodMetrics) bool { return true })
253249
}
254250

255251
func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {

pkg/epp/datastore/datastore_test.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"k8s.io/apimachinery/pkg/types"
3232
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3333
"sigs.k8s.io/controller-runtime/pkg/client/fake"
34+
3435
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
3536
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3637
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
@@ -263,6 +264,12 @@ var (
263264
},
264265
WaitingModels: map[string]int{},
265266
}
267+
pod3 = &corev1.Pod{
268+
ObjectMeta: metav1.ObjectMeta{
269+
Name: "pod3",
270+
},
271+
}
272+
266273
pod1NamespacedName = types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}
267274
pod2NamespacedName = types.NamespacedName{Name: pod2.Name, Namespace: pod2.Namespace}
268275
inferencePool = &v1alpha2.InferencePool{
@@ -312,17 +319,19 @@ func TestMetrics(t *testing.T) {
312319
},
313320
},
314321
storePods: []*corev1.Pod{pod1, pod2},
315-
want: []*backendmetrics.MetricsState{
316-
pod1Metrics,
317-
// Failed to fetch pod2 metrics so it remains the default values.
318-
{
319-
ActiveModels: map[string]int{},
320-
WaitingModels: map[string]int{},
321-
WaitingQueueSize: 0,
322-
KVCacheUsagePercent: 0,
323-
MaxActiveModels: 0,
322+
want: []*backendmetrics.MetricsState{pod1Metrics},
323+
},
324+
{
325+
name: "Filter stale metrics",
326+
pmc: &backendmetrics.FakePodMetricsClient{
327+
Res: map[types.NamespacedName]*backendmetrics.MetricsState{
328+
pod1NamespacedName: pod1Metrics,
329+
pod2NamespacedName: pod2Metrics,
324330
},
325331
},
332+
storePods: []*corev1.Pod{pod1, pod2, pod3},
333+
want: []*backendmetrics.MetricsState{pod1Metrics, pod2Metrics}, // pod3 metrics were stale and should not be included.
334+
326335
},
327336
}
328337

@@ -342,10 +351,9 @@ func TestMetrics(t *testing.T) {
342351
for _, pod := range test.storePods {
343352
ds.PodUpdateOrAddIfNotExist(pod)
344353
}
354+
time.Sleep(1 * time.Second) // Give some time for the metrics to be fetched.
345355
assert.EventuallyWithT(t, func(t *assert.CollectT) {
346-
got := ds.PodList(func(backendmetrics.PodMetrics) bool {
347-
return true
348-
})
356+
got := ds.PodList(backendmetrics.FreshMetricsFn)
349357
metrics := []*backendmetrics.MetricsState{}
350358
for _, one := range got {
351359
metrics = append(metrics, one.GetMetrics())

pkg/epp/metrics/collectors/inference_pool.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/prometheus/client_golang/prometheus"
2121
compbasemetrics "k8s.io/component-base/metrics"
2222

23+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2324
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
2425
metricsutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/metrics"
2526
)
@@ -62,7 +63,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
6263
return
6364
}
6465

65-
podMetrics := c.ds.PodGetAll()
66+
podMetrics := c.ds.PodList(backendmetrics.FreshMetricsFn)
6667
if len(podMetrics) == 0 {
6768
return
6869
}

pkg/epp/requestcontrol/director_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ func TestGetCandidatePodsForScheduling(t *testing.T) {
508508
},
509509
}
510510

511-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
511+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, config.DefaultMetricsStalenessThreshold)
512512
ds := datastore.NewDatastore(t.Context(), pmf)
513513
for _, testPod := range testInput {
514514
ds.PodUpdateOrAddIfNotExist(testPod)

0 commit comments

Comments
 (0)