5
5
# the root directory of this source tree.
6
6
7
7
import json
8
- from datetime import datetime
8
+ from datetime import UTC , datetime
9
9
from typing import Protocol
10
10
11
11
import aiosqlite
12
12
13
- from llama_stack .apis .telemetry import QueryCondition , Span , SpanWithStatus , Trace
13
+ from llama_stack .apis .telemetry import (
14
+ MetricDataPoint ,
15
+ MetricLabel ,
16
+ MetricLabelMatcher ,
17
+ MetricQueryType ,
18
+ MetricSeries ,
19
+ QueryCondition ,
20
+ QueryMetricsResponse ,
21
+ Span ,
22
+ SpanWithStatus ,
23
+ Trace ,
24
+ )
14
25
15
26
16
27
class TraceStore (Protocol ):
@@ -29,11 +40,192 @@ async def get_span_tree(
29
40
max_depth : int | None = None ,
30
41
) -> dict [str , SpanWithStatus ]: ...
31
42
43
+ async def query_metrics (
44
+ self ,
45
+ metric_name : str ,
46
+ start_time : datetime ,
47
+ end_time : datetime | None = None ,
48
+ granularity : str | None = "1d" ,
49
+ query_type : MetricQueryType = MetricQueryType .RANGE ,
50
+ label_matchers : list [MetricLabelMatcher ] | None = None ,
51
+ ) -> QueryMetricsResponse : ...
52
+
32
53
33
54
class SQLiteTraceStore (TraceStore ):
34
55
def __init__ (self , conn_string : str ):
35
56
self .conn_string = conn_string
36
57
58
+ async def query_metrics (
59
+ self ,
60
+ metric_name : str ,
61
+ start_time : datetime ,
62
+ end_time : datetime | None = None ,
63
+ granularity : str | None = None ,
64
+ query_type : MetricQueryType = MetricQueryType .RANGE ,
65
+ label_matchers : list [MetricLabelMatcher ] | None = None ,
66
+ ) -> QueryMetricsResponse :
67
+ if end_time is None :
68
+ end_time = datetime .now (UTC )
69
+
70
+ # Build base query
71
+ if query_type == MetricQueryType .INSTANT :
72
+ query = """
73
+ SELECT
74
+ se.name,
75
+ SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
76
+ json_extract(se.attributes, '$.unit') as unit,
77
+ se.attributes
78
+ FROM span_events se
79
+ WHERE se.name = ?
80
+ AND se.timestamp BETWEEN ? AND ?
81
+ """
82
+ else :
83
+ if granularity :
84
+ time_format = self ._get_time_format_for_granularity (granularity )
85
+ query = f"""
86
+ SELECT
87
+ se.name,
88
+ SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
89
+ json_extract(se.attributes, '$.unit') as unit,
90
+ se.attributes,
91
+ strftime('{ time_format } ', se.timestamp) as bucket_start
92
+ FROM span_events se
93
+ WHERE se.name = ?
94
+ AND se.timestamp BETWEEN ? AND ?
95
+ """
96
+ else :
97
+ query = """
98
+ SELECT
99
+ se.name,
100
+ json_extract(se.attributes, '$.value') as value,
101
+ json_extract(se.attributes, '$.unit') as unit,
102
+ se.attributes,
103
+ se.timestamp
104
+ FROM span_events se
105
+ WHERE se.name = ?
106
+ AND se.timestamp BETWEEN ? AND ?
107
+ """
108
+
109
+ params = [f"metric.{ metric_name } " , start_time .isoformat (), end_time .isoformat ()]
110
+
111
+ # Labels that will be attached to the MetricSeries (preserve matcher labels)
112
+ all_labels : list [MetricLabel ] = []
113
+ matcher_label_names = set ()
114
+ if label_matchers :
115
+ for matcher in label_matchers :
116
+ json_path = f"$.{ matcher .name } "
117
+ if matcher .operator == "=" :
118
+ query += f" AND json_extract(se.attributes, '{ json_path } ') = ?"
119
+ params .append (matcher .value )
120
+ elif matcher .operator == "!=" :
121
+ query += f" AND json_extract(se.attributes, '{ json_path } ') != ?"
122
+ params .append (matcher .value )
123
+ elif matcher .operator == "=~" :
124
+ query += f" AND json_extract(se.attributes, '{ json_path } ') LIKE ?"
125
+ params .append (f"%{ matcher .value } %" )
126
+ elif matcher .operator == "!~" :
127
+ query += f" AND json_extract(se.attributes, '{ json_path } ') NOT LIKE ?"
128
+ params .append (f"%{ matcher .value } %" )
129
+ # Preserve filter context in output
130
+ all_labels .append (MetricLabel (name = matcher .name , value = str (matcher .value )))
131
+ matcher_label_names .add (matcher .name )
132
+
133
+ # GROUP BY / ORDER BY logic
134
+ if query_type == MetricQueryType .RANGE and granularity :
135
+ group_time_format = self ._get_time_format_for_granularity (granularity )
136
+ query += f" GROUP BY strftime('{ group_time_format } ', se.timestamp), json_extract(se.attributes, '$.unit')"
137
+ query += " ORDER BY bucket_start"
138
+ elif query_type == MetricQueryType .INSTANT :
139
+ query += " GROUP BY json_extract(se.attributes, '$.unit')"
140
+ else :
141
+ query += " ORDER BY se.timestamp"
142
+
143
+ # Execute query
144
+ async with aiosqlite .connect (self .conn_string ) as conn :
145
+ conn .row_factory = aiosqlite .Row
146
+ async with conn .execute (query , params ) as cursor :
147
+ rows = await cursor .fetchall ()
148
+
149
+ if not rows :
150
+ return QueryMetricsResponse (data = [])
151
+
152
+ data_points = []
153
+ # We want to add attribute labels, but only those not already present as matcher labels.
154
+ attr_label_names = set ()
155
+ for row in rows :
156
+ # Parse JSON attributes safely, if there are no attributes (weird), just don't add the labels to the result.
157
+ try :
158
+ attributes = json .loads (row ["attributes" ] or "{}" )
159
+ except (TypeError , json .JSONDecodeError ):
160
+ attributes = {}
161
+
162
+ value = row ["value" ]
163
+ unit = row ["unit" ] or ""
164
+
165
+ # Add labels from attributes without duplicating matcher labels, if we don't do this, there will be a lot of duplicate label in the result.
166
+ for k , v in attributes .items ():
167
+ if k not in ["value" , "unit" ] and k not in matcher_label_names and k not in attr_label_names :
168
+ all_labels .append (MetricLabel (name = k , value = str (v )))
169
+ attr_label_names .add (k )
170
+
171
+ # Determine timestamp
172
+ if query_type == MetricQueryType .RANGE and granularity :
173
+ try :
174
+ bucket_start_raw = row ["bucket_start" ]
175
+ except KeyError as e :
176
+ raise ValueError (
177
+ "DB did not have a bucket_start time in row when using granularity, this indicates improper formatting"
178
+ ) from e
179
+ # this value could also be there, but be NULL, I think.
180
+ if bucket_start_raw is None :
181
+ raise ValueError ("bucket_start is None check time format and data" )
182
+ bucket_start = datetime .fromisoformat (bucket_start_raw )
183
+ timestamp = int (bucket_start .timestamp ())
184
+ elif query_type == MetricQueryType .INSTANT :
185
+ timestamp = int (datetime .now (UTC ).timestamp ())
186
+ else :
187
+ try :
188
+ timestamp_raw = row ["timestamp" ]
189
+ except KeyError as e :
190
+ raise ValueError (
191
+ "DB did not have a timestamp in row, this indicates improper formatting"
192
+ ) from e
193
+ # this value could also be there, but be NULL, I think.
194
+ if timestamp_raw is None :
195
+ raise ValueError ("timestamp is None check time format and data" )
196
+ timestamp_iso = datetime .fromisoformat (timestamp_raw )
197
+ timestamp = int (timestamp_iso .timestamp ())
198
+
199
+ data_points .append (
200
+ MetricDataPoint (
201
+ timestamp = timestamp ,
202
+ value = value ,
203
+ unit = unit ,
204
+ )
205
+ )
206
+
207
+ metric_series = [MetricSeries (metric = metric_name , labels = all_labels , values = data_points )]
208
+ return QueryMetricsResponse (data = metric_series )
209
+
210
+ def _get_time_format_for_granularity (self , granularity : str | None ) -> str :
211
+ """Get the SQLite strftime format string for a given granularity.
212
+ Args:
213
+ granularity: Granularity string (e.g., "1m", "5m", "1h", "1d")
214
+ Returns:
215
+ SQLite strftime format string for the granularity
216
+ """
217
+ if granularity is None :
218
+ raise ValueError ("granularity cannot be None for this method - use separate logic for no aggregation" )
219
+
220
+ if granularity .endswith ("d" ):
221
+ return "%Y-%m-%d 00:00:00"
222
+ elif granularity .endswith ("h" ):
223
+ return "%Y-%m-%d %H:00:00"
224
+ elif granularity .endswith ("m" ):
225
+ return "%Y-%m-%d %H:%M:00"
226
+ else :
227
+ return "%Y-%m-%d %H:%M:00" # Default to most granular which will give us the most timestamps.
228
+
37
229
async def query_traces (
38
230
self ,
39
231
attribute_filters : list [QueryCondition ] | None = None ,
0 commit comments