Skip to content

Commit 12943a7

Browse files
committed
feat: rename PodGetAll to PodGetAllWithFreshMetrics and add unit test
1 parent bd69d8c commit 12943a7

File tree

11 files changed

+36
-35
lines changed

11 files changed

+36
-35
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
conformance_epp "sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp"
3939
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4040
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
41+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
4142
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config/loader"
4243
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4344
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"

pkg/epp/backend/metrics/logger.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ const (
3838
type Datastore interface {
3939
PoolGet() (*v1alpha2.InferencePool, error)
4040
// PodMetrics operations
41-
// PodGetAll returns all pods and metrics, including fresh and stale.
42-
PodGetAll() []PodMetrics
41+
// PodGetAllWithFreshMetrics returns all pods and metrics, only including fresh.
42+
PodGetAllWithFreshMetrics() []PodMetrics
4343
PodList(func(PodMetrics) bool) []PodMetrics
4444
}
4545

@@ -97,7 +97,7 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore) {
9797
var kvCacheTotal float64
9898
var queueTotal int
9999

100-
podMetrics := datastore.PodGetAll()
100+
podMetrics := datastore.PodGetAllWithFreshMetrics()
101101
logger.V(logutil.TRACE).Info("Refreshing Prometheus Metrics", "ReadyPods", len(podMetrics))
102102
if len(podMetrics) == 0 {
103103
return

pkg/epp/backend/metrics/pod_metrics_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ type fakeDataStore struct{}
9090
func (f *fakeDataStore) PoolGet() (*v1alpha2.InferencePool, error) {
9191
return &v1alpha2.InferencePool{Spec: v1alpha2.InferencePoolSpec{TargetPortNumber: 8000}}, nil
9292
}
93-
func (f *fakeDataStore) PodGetAll() []PodMetrics {
93+
func (f *fakeDataStore) PodGetAllWithFreshMetrics() []PodMetrics {
9494
// Not implemented.
9595
return nil
9696
}

pkg/epp/controller/inferencepool_reconciler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string {
165165
return "pool:" + diff
166166
}
167167

168-
// Default wantPods if not set because PodGetAll returns an empty slice when empty.
168+
// Default wantPods if not set because PodGetAllWithFreshMetrics returns an empty slice when empty.
169169
if params.wantPods == nil {
170170
params.wantPods = []string{}
171171
}

pkg/epp/datastore/datastore.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ type Datastore interface {
6262
ModelGetAll() []*v1alpha2.InferenceModel
6363

6464
// PodMetrics operations
65-
// PodGetAll returns all pods and metrics, including fresh and stale.
66-
PodGetAll() []backendmetrics.PodMetrics
65+
// PodGetAllWithFreshMetrics returns all pods and metrics, only including fresh.
66+
PodGetAllWithFreshMetrics() []backendmetrics.PodMetrics
6767
// PodList lists pods matching the given predicate.
6868
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
6969
PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool
@@ -246,7 +246,7 @@ func (ds *datastore) ModelGetAll() []*v1alpha2.InferenceModel {
246246

247247
// /// Pods/endpoints APIs ///
248248

249-
func (ds *datastore) PodGetAll() []backendmetrics.PodMetrics {
249+
func (ds *datastore) PodGetAllWithFreshMetrics() []backendmetrics.PodMetrics {
250250
return ds.PodList(func(pm backendmetrics.PodMetrics) bool {
251251
return time.Since(pm.GetMetrics().UpdateTime) <= ds.pmf.MetricsStalenessThreshold
252252
})

pkg/epp/datastore/datastore_test.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,12 @@ var (
263263
},
264264
WaitingModels: map[string]int{},
265265
}
266+
pod3 = &corev1.Pod{
267+
ObjectMeta: metav1.ObjectMeta{
268+
Name: "pod2",
269+
},
270+
}
271+
266272
pod1NamespacedName = types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}
267273
pod2NamespacedName = types.NamespacedName{Name: pod2.Name, Namespace: pod2.Namespace}
268274
inferencePool = &v1alpha2.InferencePool{
@@ -312,17 +318,19 @@ func TestMetrics(t *testing.T) {
312318
},
313319
},
314320
storePods: []*corev1.Pod{pod1, pod2},
315-
want: []*backendmetrics.MetricsState{
316-
pod1Metrics,
317-
// Failed to fetch pod2 metrics so it remains the default values.
318-
{
319-
ActiveModels: map[string]int{},
320-
WaitingModels: map[string]int{},
321-
WaitingQueueSize: 0,
322-
KVCacheUsagePercent: 0,
323-
MaxActiveModels: 0,
321+
want: []*backendmetrics.MetricsState{pod1Metrics},
322+
},
323+
{
324+
name: "Filter stale metrics",
325+
pmc: &backendmetrics.FakePodMetricsClient{
326+
Res: map[types.NamespacedName]*backendmetrics.MetricsState{
327+
pod1NamespacedName: pod1Metrics,
328+
pod2NamespacedName: pod2Metrics,
324329
},
325330
},
331+
storePods: []*corev1.Pod{pod1, pod2, pod3},
332+
want: []*backendmetrics.MetricsState{pod1Metrics, pod2Metrics}, // pod3 metrics were stale and should not be included.
333+
326334
},
327335
}
328336

@@ -336,16 +344,15 @@ func TestMetrics(t *testing.T) {
336344
fakeClient := fake.NewClientBuilder().
337345
WithScheme(scheme).
338346
Build()
339-
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond, config.DefaultMetricsStalenessThreshold)
347+
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond, time.Second*10)
340348
ds := NewDatastore(ctx, pmf)
341349
_ = ds.PoolSet(ctx, fakeClient, inferencePool)
342350
for _, pod := range test.storePods {
343351
ds.PodUpdateOrAddIfNotExist(pod)
344352
}
353+
time.Sleep(1 * time.Second) // Give some time for the metrics to be fetched.
345354
assert.EventuallyWithT(t, func(t *assert.CollectT) {
346-
got := ds.PodList(func(backendmetrics.PodMetrics) bool {
347-
return true
348-
})
355+
got := ds.PodGetAllWithFreshMetrics()
349356
metrics := []*backendmetrics.MetricsState{}
350357
for _, one := range got {
351358
metrics = append(metrics, one.GetMetrics())

pkg/epp/metrics/collectors/inference_pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
6262
return
6363
}
6464

65-
podMetrics := c.ds.PodGetAll()
65+
podMetrics := c.ds.PodGetAllWithFreshMetrics()
6666
if len(podMetrics) == 0 {
6767
return
6868
}

pkg/epp/requestcontrol/director.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
139139
// Snapshot pod metrics from the datastore to:
140140
// 1. Reduce concurrent access to the datastore.
141141
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
142-
candidatePods := schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll())
142+
candidatePods := schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAllWithFreshMetrics())
143143
results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, candidatePods)
144144
if err != nil {
145145
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}

pkg/epp/saturationdetector/saturationdetector.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ type Config struct {
5656

5757
// Datastore provides an interface to access backend pod metrics.
5858
type Datastore interface {
59-
PodGetAll() []backendmetrics.PodMetrics
59+
PodGetAllWithFreshMetrics() []backendmetrics.PodMetrics
6060
}
6161

6262
// Detector determines system saturation based on metrics from the Datastore.
@@ -100,7 +100,7 @@ func NewDetector(config *Config, datastore Datastore, logger logr.Logger) *Detec
100100
// (no capacity).
101101
func (d *Detector) IsSaturated(ctx context.Context) bool {
102102
logger := log.FromContext(ctx).WithName(loggerName)
103-
allPodsMetrics := d.datastore.PodGetAll()
103+
allPodsMetrics := d.datastore.PodGetAllWithFreshMetrics()
104104
if len(allPodsMetrics) == 0 {
105105
logger.V(logutil.VERBOSE).Info("No pods found in datastore; system is considered SATURATED (no capacity).")
106106
// If there are no pods, there is no capacity to serve requests.
@@ -121,13 +121,6 @@ func (d *Detector) IsSaturated(ctx context.Context) bool {
121121
continue
122122
}
123123

124-
// Check for metric staleness
125-
// if time.Since(metrics.UpdateTime) > d.config.MetricsStalenessThreshold {
126-
// logger.V(logutil.TRACE).Info("Pod metrics are stale, considered as not having good capacity",
127-
// "pod", podNn, "updateTime", metrics.UpdateTime, "stalenessThreshold", d.config.MetricsStalenessThreshold)
128-
// continue
129-
// }
130-
131124
// Check queue depth
132125
if metrics.WaitingQueueSize > d.config.QueueDepthThreshold {
133126
logger.V(logutil.TRACE).Info("Pod WaitingQueueSize is above threshold, considered as not having good capacity",

pkg/epp/saturationdetector/saturationdetector_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ type mockDatastore struct {
3737
pods []*backendmetrics.FakePodMetrics
3838
}
3939

40-
// PodGetAll returns all pod metrics from the fake datastore.
41-
func (fds *mockDatastore) PodGetAll() []backendmetrics.PodMetrics {
40+
// PodGetAllWithFreshMetrics returns all pod metrics from the fake datastore.
41+
func (fds *mockDatastore) PodGetAllWithFreshMetrics() []backendmetrics.PodMetrics {
4242
pm := make([]backendmetrics.PodMetrics, 0, len(fds.pods))
4343
for _, pod := range fds.pods {
4444
pm = append(pm, pod)

0 commit comments

Comments
 (0)