Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 39 additions & 24 deletions pkg/cache/cache_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cache
import (
"context"
"fmt"
"strconv"
"time"

prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
Expand All @@ -28,11 +27,16 @@ import (
)

const (
defaultMetricPort = 8000
// When the engine's HTTP proxy is separated from the engine itself,
// the request port and metrics port may differ, so a dedicated metrics port is required.
MetricPortLabel = "model.aibrix.ai/metric-port"
engineLabel = "model.aibrix.ai/engine"
portLabel = "model.aibrix.ai/port"
modelLabel = "model.aibrix.ai/name"
defaultMetricPort = 8000
defaultEngineLabelValue = "vllm"
defaultPodMetricRefreshIntervalInMS = 50
defaultPodMetricsWorkerCount = 10
)

var (
Expand All @@ -44,6 +48,7 @@ var (
metrics.AvgGenerationThroughputToksPerS,
metrics.GPUCacheUsagePerc,
metrics.CPUCacheUsagePerc,
metrics.EngineUtilization,
}

// histogram metric example - time_to_first_token_seconds, _sum, _bucket _count.
Expand Down Expand Up @@ -117,8 +122,6 @@ func (c *Store) getPodModelMetricName(modelName string, metricName string) strin
return fmt.Sprintf("%s/%s", modelName, metricName)
}

const defaultPodMetricsWorkerCount = 10

func (c *Store) updatePodMetrics() {
c.metaPods.Range(func(key string, metaPod *Pod) bool {
if !utils.FilterReadyPod(metaPod.Pod) {
Expand Down Expand Up @@ -173,7 +176,7 @@ func (c *Store) updateSimpleMetricFromRawMetrics(pod *Pod, allMetrics map[string
}

// TODO: we should refact metricName to fit other engine
metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", metricName)]
metricFamily, exists := c.fetchMetrics(pod, allMetrics, metricName)
if !exists {
klog.V(4).Infof("Cannot find %v in the pod metrics", metricName)
continue
Expand Down Expand Up @@ -208,8 +211,7 @@ func (c *Store) updateHistogramMetricFromRawMetrics(pod *Pod, allMetrics map[str
klog.V(4).Infof("Cannot find %v in the metric list", metricName)
continue
}

metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", metricName)]
metricFamily, exists := c.fetchMetrics(pod, allMetrics, metricName)
if !exists {
klog.V(4).Infof("Cannot find %v in the pod metrics", metricName)
continue
Expand All @@ -235,7 +237,6 @@ func (c *Store) updateHistogramMetricFromRawMetrics(pod *Pod, allMetrics map[str
}

klog.V(5).InfoS("Successfully parsed metrics", "metric", metricName, "model", modelName, "PodIP", pod.Status.PodIP, "Port", podMetricPort, "metricValue", metricValue)

}
}
}
Expand All @@ -250,7 +251,7 @@ func (c *Store) updateQueryLabelMetricFromRawMetrics(pod *Pod, allMetrics map[st
}
rawMetricName := metric.RawMetricName
scope := metric.MetricScope
metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", rawMetricName)]
metricFamily, exists := c.fetchMetrics(pod, allMetrics, rawMetricName)
if !exists {
klog.V(4).Infof("Cannot find %v in the pod metrics", rawMetricName)
continue
Expand Down Expand Up @@ -330,14 +331,42 @@ func (c *Store) queryUpdatePromQLMetrics(ctx context.Context, metric metrics.Met
return nil
}

func (c *Store) fetchMetrics(pod *Pod, allMetrics map[string]*dto.MetricFamily, labelMetricName string) (*dto.MetricFamily, bool) {
metric, exists := metrics.Metrics[labelMetricName]
if !exists {
klog.V(4).Infof("Cannot find labelMetricName %v in collected metrics names", labelMetricName)
return nil, false
}
engineType, err := getPodLabel(pod, engineLabel)
if engineType == "" {
klog.V(4).Infof(err.Error())
engineType = defaultEngineLabelValue
}
rawMetricName, ok := metric.EngineMetricsNameMapping[engineType]
if !ok {
klog.V(4).Infof("Cannot find engine type %v mapping for metrics %v", engineType, labelMetricName)
return nil, false
}
metricFamily, exists := allMetrics[rawMetricName]
if !exists {
klog.V(4).Infof("Cannot find raw metrics %v, engine type %v", rawMetricName, engineType)
return nil, false
}
return metricFamily, true
}

// Update `PodMetrics` and `PodModelMetrics` according to the metric scope
// TODO: replace in-place metric update podMetrics and podModelMetrics to fresh copy for preventing stale metric keys
func (c *Store) updatePodRecord(pod *Pod, modelName string, metricName string, scope metrics.MetricScope, metricValue metrics.MetricValue) error {
if scope == metrics.PodMetricScope {
pod.Metrics.Store(metricName, metricValue)
} else if scope == metrics.PodModelMetricScope {
var err error
if modelName == "" {
return fmt.Errorf("modelName should not be empty for scope %v", scope)
modelName, err = getPodLabel(pod, modelLabel)
if err != nil {
return fmt.Errorf("modelName should not be empty for scope %v", scope)
}
}
pod.ModelMetrics.Store(c.getPodModelMetricName(modelName, metricName), metricValue)
} else {
Expand Down Expand Up @@ -366,17 +395,3 @@ func (c *Store) aggregateMetrics() {
}
}
}

func getPodMetricPort(pod *Pod) int {
if pod == nil || pod.Labels == nil {
return defaultMetricPort
}
if v, ok := pod.Labels[MetricPortLabel]; ok && v != "" {
if p, err := strconv.Atoi(v); err == nil {
return p
} else {
klog.Warningf("Invalid value for label %s on pod %s/%s: %q. Using default port %d.", MetricPortLabel, pod.Namespace, pod.Name, v, defaultMetricPort)
}
}
return defaultMetricPort
}
48 changes: 48 additions & 0 deletions pkg/cache/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2025 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"fmt"
"strconv"

"k8s.io/klog/v2"
)

func getPodMetricPort(pod *Pod) int {
if pod == nil || pod.Labels == nil {
return defaultMetricPort
}
if v, ok := pod.Labels[MetricPortLabel]; ok && v != "" {
if p, err := strconv.Atoi(v); err == nil {
return p
} else {
klog.Warningf("Invalid value for label %s on pod %s/%s: %q. Using default port %d.", MetricPortLabel, pod.Namespace, pod.Name, v, defaultMetricPort)
}
}
return defaultMetricPort
}

func getPodLabel(pod *Pod, labelName string) (string, error) {
labelTarget, ok := pod.Labels[labelName]
if !ok {
klog.V(4).Infof("No label %v name for pod %v, default to %v", labelName, pod.Name, defaultEngineLabelValue)
err := fmt.Errorf("error executing query: no label %v found for pod %v", labelName, pod.Name)
return "", err
}
return labelTarget, nil
}
Loading