@@ -17,7 +17,6 @@ package cache
17
17
import (
18
18
"context"
19
19
"fmt"
20
- "strconv"
21
20
"time"
22
21
23
22
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
@@ -28,11 +27,16 @@ import (
28
27
)
29
28
30
29
const (
31
- defaultMetricPort = 8000
32
30
// When the engine's HTTP proxy is separated from the engine itself,
33
31
// the request port and metrics port may differ, so a dedicated metrics port is required.
34
32
MetricPortLabel = "model.aibrix.ai/metric-port"
33
+ engineLabel = "model.aibrix.ai/engine"
34
+ portLabel = "model.aibrix.ai/port"
35
+ modelLabel = "model.aibrix.ai/name"
36
+ defaultMetricPort = 8000
37
+ defaultEngineLabelValue = "vllm"
35
38
defaultPodMetricRefreshIntervalInMS = 50
39
+ defaultPodMetricsWorkerCount = 10
36
40
)
37
41
38
42
var (
44
48
metrics .AvgGenerationThroughputToksPerS ,
45
49
metrics .GPUCacheUsagePerc ,
46
50
metrics .CPUCacheUsagePerc ,
51
+ metrics .EngineUtilization ,
47
52
}
48
53
49
54
// histogram metric example - time_to_first_token_seconds, _sum, _bucket _count.
@@ -117,8 +122,6 @@ func (c *Store) getPodModelMetricName(modelName string, metricName string) strin
117
122
return fmt .Sprintf ("%s/%s" , modelName , metricName )
118
123
}
119
124
120
- const defaultPodMetricsWorkerCount = 10
121
-
122
125
func (c * Store ) updatePodMetrics () {
123
126
c .metaPods .Range (func (key string , metaPod * Pod ) bool {
124
127
if ! utils .FilterReadyPod (metaPod .Pod ) {
@@ -173,7 +176,7 @@ func (c *Store) updateSimpleMetricFromRawMetrics(pod *Pod, allMetrics map[string
173
176
}
174
177
175
178
// TODO: we should refact metricName to fit other engine
176
- metricFamily , exists := allMetrics [ fmt . Sprintf ( "vllm:%s" , metricName )]
179
+ metricFamily , exists := c . fetchMetrics ( pod , allMetrics , metricName )
177
180
if ! exists {
178
181
klog .V (4 ).Infof ("Cannot find %v in the pod metrics" , metricName )
179
182
continue
@@ -208,8 +211,7 @@ func (c *Store) updateHistogramMetricFromRawMetrics(pod *Pod, allMetrics map[str
208
211
klog .V (4 ).Infof ("Cannot find %v in the metric list" , metricName )
209
212
continue
210
213
}
211
-
212
- metricFamily , exists := allMetrics [fmt .Sprintf ("vllm:%s" , metricName )]
214
+ metricFamily , exists := c .fetchMetrics (pod , allMetrics , metricName )
213
215
if ! exists {
214
216
klog .V (4 ).Infof ("Cannot find %v in the pod metrics" , metricName )
215
217
continue
@@ -235,7 +237,6 @@ func (c *Store) updateHistogramMetricFromRawMetrics(pod *Pod, allMetrics map[str
235
237
}
236
238
237
239
klog .V (5 ).InfoS ("Successfully parsed metrics" , "metric" , metricName , "model" , modelName , "PodIP" , pod .Status .PodIP , "Port" , podMetricPort , "metricValue" , metricValue )
238
-
239
240
}
240
241
}
241
242
}
@@ -250,7 +251,7 @@ func (c *Store) updateQueryLabelMetricFromRawMetrics(pod *Pod, allMetrics map[st
250
251
}
251
252
rawMetricName := metric .RawMetricName
252
253
scope := metric .MetricScope
253
- metricFamily , exists := allMetrics [ fmt . Sprintf ( "vllm:%s" , rawMetricName )]
254
+ metricFamily , exists := c . fetchMetrics ( pod , allMetrics , rawMetricName )
254
255
if ! exists {
255
256
klog .V (4 ).Infof ("Cannot find %v in the pod metrics" , rawMetricName )
256
257
continue
@@ -330,14 +331,42 @@ func (c *Store) queryUpdatePromQLMetrics(ctx context.Context, metric metrics.Met
330
331
return nil
331
332
}
332
333
334
+ func (c * Store ) fetchMetrics (pod * Pod , allMetrics map [string ]* dto.MetricFamily , labelMetricName string ) (* dto.MetricFamily , bool ) {
335
+ metric , exists := metrics .Metrics [labelMetricName ]
336
+ if ! exists {
337
+ klog .V (4 ).Infof ("Cannot find labelMetricName %v in collected metrics names" , labelMetricName )
338
+ return nil , false
339
+ }
340
+ engineType , err := getPodLabel (pod , engineLabel )
341
+ if engineType == "" {
342
+ klog .V (4 ).Infof (err .Error ())
343
+ engineType = defaultEngineLabelValue
344
+ }
345
+ rawMetricName , ok := metric .EngineMetricsNameMapping [engineType ]
346
+ if ! ok {
347
+ klog .V (4 ).Infof ("Cannot find engine type %v mapping for metrics %v" , engineType , labelMetricName )
348
+ return nil , false
349
+ }
350
+ metricFamily , exists := allMetrics [rawMetricName ]
351
+ if ! exists {
352
+ klog .V (4 ).Infof ("Cannot find raw metrics %v, engine type %v" , rawMetricName , engineType )
353
+ return nil , false
354
+ }
355
+ return metricFamily , true
356
+ }
357
+
333
358
// Update `PodMetrics` and `PodModelMetrics` according to the metric scope
334
359
// TODO: replace in-place metric update podMetrics and podModelMetrics to fresh copy for preventing stale metric keys
335
360
func (c * Store ) updatePodRecord (pod * Pod , modelName string , metricName string , scope metrics.MetricScope , metricValue metrics.MetricValue ) error {
336
361
if scope == metrics .PodMetricScope {
337
362
pod .Metrics .Store (metricName , metricValue )
338
363
} else if scope == metrics .PodModelMetricScope {
364
+ var err error
339
365
if modelName == "" {
340
- return fmt .Errorf ("modelName should not be empty for scope %v" , scope )
366
+ modelName , err = getPodLabel (pod , modelLabel )
367
+ if err != nil {
368
+ return fmt .Errorf ("modelName should not be empty for scope %v" , scope )
369
+ }
341
370
}
342
371
pod .ModelMetrics .Store (c .getPodModelMetricName (modelName , metricName ), metricValue )
343
372
} else {
@@ -366,17 +395,3 @@ func (c *Store) aggregateMetrics() {
366
395
}
367
396
}
368
397
}
369
-
370
- func getPodMetricPort (pod * Pod ) int {
371
- if pod == nil || pod .Labels == nil {
372
- return defaultMetricPort
373
- }
374
- if v , ok := pod .Labels [MetricPortLabel ]; ok && v != "" {
375
- if p , err := strconv .Atoi (v ); err == nil {
376
- return p
377
- } else {
378
- klog .Warningf ("Invalid value for label %s on pod %s/%s: %q. Using default port %d." , MetricPortLabel , pod .Namespace , pod .Name , v , defaultMetricPort )
379
- }
380
- }
381
- return defaultMetricPort
382
- }
0 commit comments