Skip to content

cleanup: refactor PodList calls to prepare for making pod metrics staleness configurable #1046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,6 @@ var (
"pool-namespace",
runserver.DefaultPoolNamespace,
"Namespace of the InferencePool this Endpoint Picker is associated with.")
refreshMetricsInterval = flag.Duration(
"refresh-metrics-interval",
runserver.DefaultRefreshMetricsInterval,
"interval to refresh metrics")
refreshPrometheusMetricsInterval = flag.Duration(
"refresh-prometheus-metrics-interval",
runserver.DefaultRefreshPrometheusMetricsInterval,
"interval to flush prometheus metrics")
logVerbosity = flag.Int(
"v",
logging.DEFAULT,
Expand Down Expand Up @@ -135,6 +127,19 @@ var (
"lora-info-metric",
runserver.DefaultLoraInfoMetric,
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")

// metrics related flags
refreshMetricsInterval = flag.Duration(
"refresh-metrics-interval",
runserver.DefaultRefreshMetricsInterval,
"interval to refresh metrics")
refreshPrometheusMetricsInterval = flag.Duration(
"refresh-prometheus-metrics-interval",
runserver.DefaultRefreshPrometheusMetricsInterval,
"interval to flush prometheus metrics")
metricsStalenessThreshold = flag.Duration("metrics-staleness-threshold",
runserver.DefaultMetricsStalenessThreshold,
"Duration after which metrics are considered stale. This is used to determine if a pod's metrics are fresh enough.")
// configuration flags
configFile = flag.String(
"config-file",
Expand Down Expand Up @@ -268,7 +273,8 @@ func (r *Runner) Run(ctx context.Context) error {
ModelServerMetricsPath: *modelServerMetricsPath,
ModelServerMetricsScheme: *modelServerMetricsScheme,
Client: metricsHttpClient,
}, *refreshMetricsInterval)
},
*refreshMetricsInterval, *metricsStalenessThreshold)

datastore := datastore.NewDatastore(ctx, pmf)

Expand Down Expand Up @@ -337,6 +343,7 @@ func (r *Runner) Run(ctx context.Context) error {
HealthChecking: *healthChecking,
CertPath: *certPath,
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
MetricsStalenessThreshold: *metricsStalenessThreshold,
Director: director,
SaturationDetector: saturationDetector,
}
Expand Down
21 changes: 9 additions & 12 deletions pkg/epp/backend/metrics/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,18 @@ import (
)

const (
// Note currently the EPP treats stale metrics same as fresh.
// TODO: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/336
metricsValidityPeriod = 5 * time.Second
debugPrintInterval = 5 * time.Second
debugPrintInterval = 5 * time.Second
)

type Datastore interface {
PoolGet() (*v1.InferencePool, error)
// PodMetrics operations
// PodGetAll returns all pods and metrics, including fresh and stale.
PodGetAll() []PodMetrics
PodList(func(PodMetrics) bool) []PodMetrics
}

// StartMetricsLogger starts goroutines to 1) Print metrics debug logs if the DEBUG log level is
// enabled; 2) flushes Prometheus metrics about the backend servers.
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval time.Duration) {
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval, metricsStalenessThreshold time.Duration) {
logger := log.FromContext(ctx)
ticker := time.NewTicker(refreshPrometheusMetricsInterval)
go func() {
Expand All @@ -57,7 +52,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
logger.V(logutil.DEFAULT).Info("Shutting down prometheus metrics thread")
return
case <-ticker.C: // Periodically refresh prometheus metrics for inference pool
refreshPrometheusMetrics(logger, datastore)
refreshPrometheusMetrics(logger, datastore, metricsStalenessThreshold)
}
}
}()
Expand All @@ -74,10 +69,10 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
return
case <-ticker.C:
podsWithFreshMetrics := datastore.PodList(func(pm PodMetrics) bool {
return time.Since(pm.GetMetrics().UpdateTime) <= metricsValidityPeriod
return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold
})
podsWithStaleMetrics := datastore.PodList(func(pm PodMetrics) bool {
return time.Since(pm.GetMetrics().UpdateTime) > metricsValidityPeriod
return time.Since(pm.GetMetrics().UpdateTime) > metricsStalenessThreshold
})
s := fmt.Sprintf("Current Pods and metrics gathered. Fresh metrics: %+v, Stale metrics: %+v", podsWithFreshMetrics, podsWithStaleMetrics)
logger.V(logutil.VERBOSE).Info(s)
Expand All @@ -87,7 +82,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
}
}

func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore) {
func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore, metricsStalenessThreshold time.Duration) {
pool, err := datastore.PoolGet()
if err != nil {
// No inference pool or not initialize.
Expand All @@ -98,7 +93,9 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore) {
var kvCacheTotal float64
var queueTotal int

podMetrics := datastore.PodGetAll()
podMetrics := datastore.PodList(func(pm PodMetrics) bool {
return time.Since(pm.GetMetrics().UpdateTime) <= metricsStalenessThreshold
})
logger.V(logutil.TRACE).Info("Refreshing Prometheus Metrics", "ReadyPods", len(podMetrics))
if len(podMetrics) == 0 {
return
Expand Down
7 changes: 2 additions & 5 deletions pkg/epp/backend/metrics/pod_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
func TestMetricsRefresh(t *testing.T) {
ctx := context.Background()
pmc := &FakePodMetricsClient{}
pmf := NewPodMetricsFactory(pmc, time.Millisecond)
pmf := NewPodMetricsFactory(pmc, time.Millisecond, time.Second*2)

// The refresher is initialized with empty metrics.
pm := pmf.NewPodMetrics(ctx, pod1, &fakeDataStore{})
Expand Down Expand Up @@ -90,10 +90,7 @@ type fakeDataStore struct{}
func (f *fakeDataStore) PoolGet() (*v1.InferencePool, error) {
return &v1.InferencePool{Spec: v1.InferencePoolSpec{TargetPortNumber: 8000}}, nil
}
func (f *fakeDataStore) PodGetAll() []PodMetrics {
// Not implemented.
return nil
}

func (f *fakeDataStore) PodList(func(PodMetrics) bool) []PodMetrics {
// Not implemented.
return nil
Expand Down
16 changes: 11 additions & 5 deletions pkg/epp/backend/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,22 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
)

func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval time.Duration) *PodMetricsFactory {
var (
AllPodPredicate = func(PodMetrics) bool { return true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo? AllPodsPredicate?

)

func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval, metricsStalenessThreshold time.Duration) *PodMetricsFactory {
return &PodMetricsFactory{
pmc: pmc,
refreshMetricsInterval: refreshMetricsInterval,
pmc: pmc,
refreshMetricsInterval: refreshMetricsInterval,
metricsStalenessThreshold: metricsStalenessThreshold,
}
}

type PodMetricsFactory struct {
pmc PodMetricsClient
refreshMetricsInterval time.Duration
pmc PodMetricsClient
refreshMetricsInterval time.Duration
metricsStalenessThreshold time.Duration
}

func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/controller/inferenceobjective_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestInferenceObjectiveReconciler(t *testing.T) {
WithObjects(initObjs...).
WithIndex(&v1alpha2.InferenceObjective{}, datastore.ModelNameIndexKey, indexInferenceObjectivesByModelName).
Build()
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
ds := datastore.NewDatastore(t.Context(), pmf)
for _, m := range test.objectivessInStore {
ds.ObjectiveSetIfOlder(m)
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/controller/inferencepool_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestInferencePoolReconciler(t *testing.T) {
req := ctrl.Request{NamespacedName: namespacedName}
ctx := context.Background()

pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
datastore := datastore.NewDatastore(ctx, pmf)
inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore}

Expand Down Expand Up @@ -172,7 +172,7 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string {
params.wantPods = []string{}
}
gotPods := []string{}
for _, pm := range datastore.PodGetAll() {
for _, pm := range datastore.PodList(backendmetrics.AllPodPredicate) {
gotPods = append(gotPods, pm.GetPod().NamespacedName.Name)
}
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/controller/pod_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var (
basePod3 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3"}, Status: corev1.PodStatus{PodIP: "address-3"}}
basePod11 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Status: corev1.PodStatus{PodIP: "address-11"}}
pmc = &backendmetrics.FakePodMetricsClient{}
pmf = backendmetrics.NewPodMetricsFactory(pmc, time.Second)
pmf = backendmetrics.NewPodMetricsFactory(pmc, time.Second, time.Second*2)
)

func TestPodReconciler(t *testing.T) {
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestPodReconciler(t *testing.T) {
}

var gotPods []*corev1.Pod
for _, pm := range store.PodGetAll() {
for _, pm := range store.PodList(backendmetrics.AllPodPredicate) {
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().NamespacedName.Name, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().Address}}
gotPods = append(gotPods, pod)
}
Expand Down
10 changes: 2 additions & 8 deletions pkg/epp/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ type Datastore interface {
ObjectiveResync(ctx context.Context, reader client.Reader, modelName string) (bool, error)
ObjectiveGetAll() []*v1alpha2.InferenceObjective

// PodMetrics operations
// PodGetAll returns all pods and metrics, including fresh and stale.
PodGetAll() []backendmetrics.PodMetrics
// PodList lists pods matching the given predicate.
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool
Expand Down Expand Up @@ -245,11 +242,8 @@ func (ds *datastore) ObjectiveGetAll() []*v1alpha2.InferenceObjective {
}

// /// Pods/endpoints APIs ///

func (ds *datastore) PodGetAll() []backendmetrics.PodMetrics {
return ds.PodList(func(backendmetrics.PodMetrics) bool { return true })
}

// TODO: add a flag for callers to specify the staleness threshold for metrics.
// ref: https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/1046#discussion_r2246351694
func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
res := []backendmetrics.PodMetrics{}

Expand Down
17 changes: 9 additions & 8 deletions pkg/epp/datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestPool(t *testing.T) {
fakeClient := fake.NewClientBuilder().
WithScheme(scheme).
Build()
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
datastore := NewDatastore(context.Background(), pmf)
_ = datastore.PoolSet(context.Background(), fakeClient, tt.inferencePool)
gotPool, gotErr := datastore.PoolGet()
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestModel(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
ds := NewDatastore(t.Context(), pmf)
for _, m := range test.existingModels {
ds.ObjectiveSetIfOlder(m)
Expand Down Expand Up @@ -265,6 +265,7 @@ var (
},
WaitingModels: map[string]int{},
}

pod1NamespacedName = types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}
pod2NamespacedName = types.NamespacedName{Name: pod2.Name, Namespace: pod2.Namespace}
inferencePool = &v1.InferencePool{
Expand Down Expand Up @@ -314,8 +315,7 @@ func TestMetrics(t *testing.T) {
},
},
storePods: []*corev1.Pod{pod1, pod2},
want: []*backendmetrics.MetricsState{
pod1Metrics,
want: []*backendmetrics.MetricsState{pod1Metrics,
// Failed to fetch pod2 metrics so it remains the default values.
{
ActiveModels: map[string]int{},
Expand All @@ -338,14 +338,15 @@ func TestMetrics(t *testing.T) {
fakeClient := fake.NewClientBuilder().
WithScheme(scheme).
Build()
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond)
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond, time.Second*2)
ds := NewDatastore(ctx, pmf)
_ = ds.PoolSet(ctx, fakeClient, inferencePool)
for _, pod := range test.storePods {
ds.PodUpdateOrAddIfNotExist(pod)
}
time.Sleep(1 * time.Second) // Give some time for the metrics to be fetched.
assert.EventuallyWithT(t, func(t *assert.CollectT) {
got := ds.PodGetAll()
got := ds.PodList(backendmetrics.AllPodPredicate)
metrics := []*backendmetrics.MetricsState{}
for _, one := range got {
metrics = append(metrics, one.GetMetrics())
Expand Down Expand Up @@ -431,15 +432,15 @@ func TestPods(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
ds := NewDatastore(t.Context(), pmf)
for _, pod := range test.existingPods {
ds.PodUpdateOrAddIfNotExist(pod)
}

test.op(ctx, ds)
var gotPods []*corev1.Pod
for _, pm := range ds.PodGetAll() {
for _, pm := range ds.PodList(backendmetrics.AllPodPredicate) {
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().NamespacedName.Name, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().Address}}
gotPods = append(gotPods, pod)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/epp/metrics/collectors/inference_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
compbasemetrics "k8s.io/component-base/metrics"

backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
metricsutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/metrics"
)
Expand Down Expand Up @@ -62,7 +63,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
return
}

podMetrics := c.ds.PodGetAll()
podMetrics := c.ds.PodList(backendmetrics.AllPodPredicate)
if len(podMetrics) == 0 {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/metrics/collectors/inference_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (
)

func TestNoMetricsCollected(t *testing.T) {
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
datastore := datastore.NewDatastore(context.Background(), pmf)

collector := &inferencePoolMetricsCollector{
Expand All @@ -67,7 +67,7 @@ func TestMetricsCollected(t *testing.T) {
pod1NamespacedName: pod1Metrics,
},
}
pmf := backendmetrics.NewPodMetricsFactory(pmc, time.Millisecond)
pmf := backendmetrics.NewPodMetricsFactory(pmc, time.Millisecond, time.Second*2)
ds := datastore.NewDatastore(context.Background(), pmf)

scheme := runtime.NewScheme()
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet

subsetMap, found := requestMetadata[subsetHintNamespace].(map[string]any)
if !found {
return d.toSchedulerPodMetrics(d.datastore.PodGetAll())
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate))
}

// Check if endpoint key is present in the subset map and ensure there is at least one value
endpointSubsetList, found := subsetMap[subsetHintKey].([]any)
if !found {
return d.toSchedulerPodMetrics(d.datastore.PodGetAll())
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate))
} else if len(endpointSubsetList) == 0 {
loggerTrace.Info("found empty subset filter in request metadata, filtering all pods")
return []schedulingtypes.Pod{}
Expand Down Expand Up @@ -288,7 +288,7 @@ func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestC
}

func (d *Director) GetRandomPod() *backend.Pod {
pods := d.datastore.PodGetAll()
pods := d.datastore.PodList(backendmetrics.AllPodPredicate)
if len(pods) == 0 {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/requestcontrol/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestDirector_HandleRequest(t *testing.T) {
ObjRef()

// Datastore setup
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
ds := datastore.NewDatastore(t.Context(), pmf)
ds.ObjectiveSetIfOlder(imFoodReview)
ds.ObjectiveSetIfOlder(imFoodReviewResolve)
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestGetCandidatePodsForScheduling(t *testing.T) {
},
}

pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
ds := datastore.NewDatastore(t.Context(), pmf)
for _, testPod := range testInput {
ds.PodUpdateOrAddIfNotExist(testPod)
Expand Down Expand Up @@ -654,7 +654,7 @@ func TestGetRandomPod(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Millisecond)
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Millisecond, time.Second*2)
ds := datastore.NewDatastore(t.Context(), pmf)
for _, pod := range test.storePods {
ds.PodUpdateOrAddIfNotExist(pod)
Expand Down
Loading