Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/_static/llama-stack-spec.html
Original file line number Diff line number Diff line change
Expand Up @@ -16067,12 +16067,16 @@
"value": {
"type": "number",
"description": "The numeric value of the metric at this timestamp"
},
"unit": {
"type": "string"
}
},
"additionalProperties": false,
"required": [
"timestamp",
"value"
"value",
"unit"
],
"title": "MetricDataPoint",
"description": "A single data point in a metric time series."
Expand Down
3 changes: 3 additions & 0 deletions docs/_static/llama-stack-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11954,10 +11954,13 @@ components:
type: number
description: >-
The numeric value of the metric at this timestamp
unit:
type: string
additionalProperties: false
required:
- timestamp
- value
- unit
title: MetricDataPoint
description: >-
A single data point in a metric time series.
Expand Down
3 changes: 2 additions & 1 deletion llama_stack/apis/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ class MetricDataPoint(BaseModel):

timestamp: int
value: float
unit: str


@json_schema_type
Expand Down Expand Up @@ -518,7 +519,7 @@ async def query_metrics(
metric_name: str,
start_time: int,
end_time: int | None = None,
granularity: str | None = "1d",
granularity: str | None = None,
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

import datetime
import threading
from typing import Any

Expand Down Expand Up @@ -145,11 +146,41 @@ async def query_metrics(
metric_name: str,
start_time: int,
end_time: int | None = None,
granularity: str | None = "1d",
granularity: str | None = None,
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse:
raise NotImplementedError("Querying metrics is not implemented")
"""Query metrics from the telemetry store.

Args:
metric_name: The name of the metric to query (e.g., "prompt_tokens")
start_time: Start time as Unix timestamp
end_time: End time as Unix timestamp (defaults to now if None)
granularity: Time granularity for aggregation
query_type: Type of query (RANGE or INSTANT)
label_matchers: Label filters to apply

Returns:
QueryMetricsResponse with metric time series data
"""
# Convert timestamps to datetime objects
start_dt = datetime.datetime.fromtimestamp(start_time, datetime.UTC)
end_dt = datetime.datetime.fromtimestamp(end_time, datetime.UTC) if end_time else None

# Use SQLite trace store if available
if hasattr(self, "trace_store") and self.trace_store:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this conditional? can the meta-reference provider have no trace-store? in general, this feels odd. at the very least, you should error when there is no trace store and someone is querying for metrics instead of silently returning nothing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, I think this was to satisfy mypy. I will double check if trace_store can be nil, and if it can't just remove this but if it can, raise an error.

return await self.trace_store.query_metrics(
metric_name=metric_name,
start_time=start_dt,
end_time=end_dt,
granularity=granularity,
query_type=query_type,
label_matchers=label_matchers,
)
else:
raise ValueError(
f"In order to query_metrics, you must have {TelemetrySink.SQLITE} set in your telemetry sinks"
)

def _log_unstructured(self, event: UnstructuredLogEvent, ttl_seconds: int) -> None:
with self._lock:
Expand Down
196 changes: 194 additions & 2 deletions llama_stack/providers/utils/telemetry/sqlite_trace_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,23 @@
# the root directory of this source tree.

import json
from datetime import datetime
from datetime import UTC, datetime
from typing import Protocol

import aiosqlite

from llama_stack.apis.telemetry import QueryCondition, Span, SpanWithStatus, Trace
from llama_stack.apis.telemetry import (
MetricDataPoint,
MetricLabel,
MetricLabelMatcher,
MetricQueryType,
MetricSeries,
QueryCondition,
QueryMetricsResponse,
Span,
SpanWithStatus,
Trace,
)


class TraceStore(Protocol):
Expand All @@ -29,11 +40,192 @@ async def get_span_tree(
max_depth: int | None = None,
) -> dict[str, SpanWithStatus]: ...

async def query_metrics(
self,
metric_name: str,
start_time: datetime,
end_time: datetime | None = None,
granularity: str | None = "1d",
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse: ...


class SQLiteTraceStore(TraceStore):
def __init__(self, conn_string: str):
self.conn_string = conn_string

async def query_metrics(
self,
metric_name: str,
start_time: datetime,
end_time: datetime | None = None,
granularity: str | None = None,
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse:
if end_time is None:
end_time = datetime.now(UTC)

# Build base query
if query_type == MetricQueryType.INSTANT:
query = """
SELECT
se.name,
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
json_extract(se.attributes, '$.unit') as unit,
se.attributes
FROM span_events se
WHERE se.name = ?
AND se.timestamp BETWEEN ? AND ?
"""
else:
if granularity:
time_format = self._get_time_format_for_granularity(granularity)
query = f"""
SELECT
se.name,
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
json_extract(se.attributes, '$.unit') as unit,
se.attributes,
strftime('{time_format}', se.timestamp) as bucket_start
FROM span_events se
WHERE se.name = ?
AND se.timestamp BETWEEN ? AND ?
"""
else:
query = """
SELECT
se.name,
json_extract(se.attributes, '$.value') as value,
json_extract(se.attributes, '$.unit') as unit,
se.attributes,
se.timestamp
FROM span_events se
WHERE se.name = ?
AND se.timestamp BETWEEN ? AND ?
"""

params = [f"metric.{metric_name}", start_time.isoformat(), end_time.isoformat()]

# Labels that will be attached to the MetricSeries (preserve matcher labels)
all_labels: list[MetricLabel] = []
matcher_label_names = set()
if label_matchers:
for matcher in label_matchers:
json_path = f"$.{matcher.name}"
if matcher.operator == "=":
query += f" AND json_extract(se.attributes, '{json_path}') = ?"
params.append(matcher.value)
elif matcher.operator == "!=":
query += f" AND json_extract(se.attributes, '{json_path}') != ?"
params.append(matcher.value)
elif matcher.operator == "=~":
query += f" AND json_extract(se.attributes, '{json_path}') LIKE ?"
params.append(f"%{matcher.value}%")
elif matcher.operator == "!~":
query += f" AND json_extract(se.attributes, '{json_path}') NOT LIKE ?"
params.append(f"%{matcher.value}%")
# Preserve filter context in output
all_labels.append(MetricLabel(name=matcher.name, value=str(matcher.value)))
matcher_label_names.add(matcher.name)

# GROUP BY / ORDER BY logic
if query_type == MetricQueryType.RANGE and granularity:
group_time_format = self._get_time_format_for_granularity(granularity)
query += f" GROUP BY strftime('{group_time_format}', se.timestamp), json_extract(se.attributes, '$.unit')"
query += " ORDER BY bucket_start"
elif query_type == MetricQueryType.INSTANT:
query += " GROUP BY json_extract(se.attributes, '$.unit')"
else:
query += " ORDER BY se.timestamp"

# Execute query
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()

if not rows:
return QueryMetricsResponse(data=[])

data_points = []
# We want to add attribute labels, but only those not already present as matcher labels.
attr_label_names = set()
for row in rows:
# Parse JSON attributes safely, if there are no attributes (weird), just don't add the labels to the result.
try:
attributes = json.loads(row["attributes"] or "{}")
except (TypeError, json.JSONDecodeError):
attributes = {}

value = row["value"]
unit = row["unit"] or ""

# Add labels from attributes without duplicating matcher labels, if we don't do this, there will be a lot of duplicate label in the result.
for k, v in attributes.items():
if k not in ["value", "unit"] and k not in matcher_label_names and k not in attr_label_names:
all_labels.append(MetricLabel(name=k, value=str(v)))
attr_label_names.add(k)

# Determine timestamp
if query_type == MetricQueryType.RANGE and granularity:
try:
bucket_start_raw = row["bucket_start"]
except KeyError as e:
raise ValueError(
"DB did not have a bucket_start time in row when using granularity, this indicates improper formatting"
) from e
# this value could also be there, but be NULL, I think.
if bucket_start_raw is None:
raise ValueError("bucket_start is None check time format and data")
bucket_start = datetime.fromisoformat(bucket_start_raw)
timestamp = int(bucket_start.timestamp())
elif query_type == MetricQueryType.INSTANT:
timestamp = int(datetime.now(UTC).timestamp())
else:
try:
timestamp_raw = row["timestamp"]
except KeyError as e:
raise ValueError(
"DB did not have a timestamp in row, this indicates improper formatting"
) from e
# this value could also be there, but be NULL, I think.
if timestamp_raw is None:
raise ValueError("timestamp is None check time format and data")
timestamp_iso = datetime.fromisoformat(timestamp_raw)
timestamp = int(timestamp_iso.timestamp())

data_points.append(
MetricDataPoint(
timestamp=timestamp,
value=value,
unit=unit,
)
)

metric_series = [MetricSeries(metric=metric_name, labels=all_labels, values=data_points)]
return QueryMetricsResponse(data=metric_series)

def _get_time_format_for_granularity(self, granularity: str | None) -> str:
"""Get the SQLite strftime format string for a given granularity.
Args:
granularity: Granularity string (e.g., "1m", "5m", "1h", "1d")
Returns:
SQLite strftime format string for the granularity
"""
if granularity is None:
raise ValueError("granularity cannot be None for this method - use separate logic for no aggregation")

if granularity.endswith("d"):
return "%Y-%m-%d 00:00:00"
elif granularity.endswith("h"):
return "%Y-%m-%d %H:00:00"
elif granularity.endswith("m"):
return "%Y-%m-%d %H:%M:00"
else:
return "%Y-%m-%d %H:%M:00" # Default to most granular which will give us the most timestamps.

async def query_traces(
self,
attribute_filters: list[QueryCondition] | None = None,
Expand Down
Binary file modified tests/integration/recordings/index.sqlite
Binary file not shown.
39 changes: 39 additions & 0 deletions tests/integration/recordings/responses/390f0c7dac96.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"request": {
"method": "POST",
"url": "http://localhost:11434/api/generate",
"headers": {},
"body": {
"model": "llama3.2:3b-instruct-fp16",
"raw": true,
"prompt": "<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n<|eot_id|><|start_header_id|>user<|end_header_id|>\n\nTest metrics generation 1<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n",
"options": {
"temperature": 0.0
},
"stream": false
},
"endpoint": "/api/generate",
"model": "llama3.2:3b-instruct-fp16"
},
"response": {
"body": {
"__type__": "ollama._types.GenerateResponse",
"__data__": {
"model": "llama3.2:3b-instruct-fp16",
"created_at": "2025-08-11T15:51:18.170868Z",
"done": true,
"done_reason": "stop",
"total_duration": 5240614083,
"load_duration": 9823416,
"prompt_eval_count": 21,
"prompt_eval_duration": 21000000,
"eval_count": 310,
"eval_duration": 5209000000,
"response": "This is the start of a test. I'll provide some sample data and you can try to generate metrics based on it.\n\n**Data:**\n\nLet's say we have a dataset of user interactions with an e-commerce website. The data includes:\n\n| User ID | Product Name | Purchase Date | Quantity | Price |\n| --- | --- | --- | --- | --- |\n| 1 | iPhone 13 | 2022-01-01 | 2 | 999.99 |\n| 1 | MacBook Air | 2022-01-05 | 1 | 1299.99 |\n| 2 | Samsung TV | 2022-01-10 | 3 | 899.99 |\n| 3 | iPhone 13 | 2022-01-15 | 1 | 999.99 |\n| 4 | MacBook Pro | 2022-01-20 | 2 | 1799.99 |\n\n**Task:**\n\nYour task is to generate the following metrics based on this data:\n\n1. Average order value (AOV)\n2. Conversion rate\n3. Average revenue per user (ARPU)\n4. Customer lifetime value (CLV)\n\nPlease provide your answers in a format like this:\n\n| Metric | Value |\n| --- | --- |\n| AOV | 1234.56 |\n| Conversion Rate | 0.25 |\n| ARPU | 1000.00 |\n| CLV | 5000.00 |\n\nGo ahead and generate the metrics!",
"thinking": null,
"context": null
}
},
"is_streaming": false
}
}
Loading
Loading