@@ -28,6 +28,7 @@ use slog::Logger;
2828use slog:: debug;
2929use slog:: trace;
3030use std:: collections:: BTreeMap ;
31+ use std:: collections:: HashMap ;
3132use std:: time:: Duration ;
3233use std:: time:: Instant ;
3334use uuid:: Uuid ;
@@ -949,98 +950,100 @@ impl Client {
949950 Ok ( query)
950951 }
951952
953+ // Build a reasonably efficient query to retrieve all fields for a given
954+ // timeseries. Joins in ClickHouse are expensive, so aggregate all relevant
955+ // fields from each relevant fields table in a single subquery, then join
956+ // the results together. This results in n - 1 joins, where n is the number
957+ // of relevant fields tables. Note that we may be able to improve
958+ // performance in future ClickHouse versions, which have better support for
959+ // Variant types, better support for the merge() table function, and faster
960+ // joins.
952961 fn all_fields_query_raw (
953962 & self ,
954963 schema : & TimeseriesSchema ,
955964 ) -> ( bool , String ) {
956965 match schema. field_schema . len ( ) {
957966 0 => unreachable ! ( ) ,
958- 1 => {
959- let field_schema = schema. field_schema . first ( ) . unwrap ( ) ;
960- (
961- true ,
962- format ! (
963- "SELECT DISTINCT timeseries_key, field_value AS {field_name} \
964- FROM {db_name}.{field_table} \
965- WHERE \
966- timeseries_name = '{timeseries_name}' AND \
967- field_name = '{field_name}'",
968- field_name = field_schema. name,
969- db_name = crate :: DATABASE_NAME ,
970- field_table = field_table_name( field_schema. field_type) ,
971- timeseries_name = schema. timeseries_name,
972- ) ,
973- )
974- }
975967 _ => {
976- let mut top_level_columns =
977- Vec :: with_capacity ( schema. field_schema . len ( ) ) ;
978- let mut field_subqueries =
979- Vec :: with_capacity ( schema. field_schema . len ( ) ) ;
980-
981- // Select each field value, aliasing it to its field name.
968+ // Build a vector of top-level select expressions, as well as a
969+ // map from fields to lists of subquery select expressions.
970+ let mut top_selects: Vec < String > = Vec :: new ( ) ;
971+ let mut select_map: HashMap < oximeter:: FieldType , Vec < String > > =
972+ HashMap :: new ( ) ;
982973 for field_schema in schema. field_schema . iter ( ) {
983- top_level_columns. push ( format ! (
984- "filter_on_{}.field_value AS {}" ,
985- field_schema. name, field_schema. name
986- ) ) ;
987- field_subqueries. push ( (
988- format ! (
989- "SELECT DISTINCT timeseries_key, field_value \
990- FROM {db_name}.{field_table} \
991- WHERE \
992- timeseries_name = '{timeseries_name}' AND \
993- field_name = '{field_name}' \
994- ",
995- db_name = crate :: DATABASE_NAME ,
996- field_table =
997- field_table_name( field_schema. field_type) ,
998- timeseries_name = schema. timeseries_name,
999- field_name = field_schema. name,
1000- ) ,
1001- format ! ( "filter_on_{}" , field_schema. name) ,
974+ select_map
975+ . entry ( field_schema. field_type )
976+ . or_insert_with ( || vec ! [ String :: from( "timeseries_key" ) ] )
977+ . push ( format ! (
978+ "anyIf(field_value, field_name = '{}') AS {}" ,
979+ field_schema. name, field_schema. name
980+ ) ) ;
981+ top_selects. push ( format ! (
982+ "{}_pivot.{} AS {}" ,
983+ field_table_name( field_schema. field_type) ,
984+ field_schema. name,
985+ field_schema. name
1002986 ) ) ;
1003987 }
1004988
1005- // Write the top-level select statement, starting by selecting
1006- // the timeseries key from the first field schema.
1007- let mut out = format ! (
1008- "SELECT {}.timeseries_key AS timeseries_key, {} FROM " ,
1009- field_subqueries[ 0 ] . 1 ,
1010- top_level_columns. join( ", " ) ,
1011- ) ;
989+ // Sort field tables by number of columns, descending.
990+ // ClickHouse recommends joining larger tables to smaller
991+ // tables, and doesn't currently reorder joins automatically.
992+ let mut field_types: Vec < oximeter:: FieldType > =
993+ select_map. keys ( ) . cloned ( ) . collect ( ) ;
994+ field_types. sort_by ( |a, b| {
995+ select_map[ b]
996+ . len ( )
997+ . cmp ( & select_map[ a] . len ( ) )
998+ . then ( field_table_name ( * a) . cmp ( & field_table_name ( * b) ) )
999+ } ) ;
10121000
1013- // Then add all the subqueries selecting each field.
1014- //
1015- // We need to add these, along with their aliases. The first
1016- // such subquery has no join conditions, but the later ones all
1017- // refer to the previous via:
1018- //
1019- // `ON <previous_filter_name>.timeseries_key = <current_filter_name>.timeseries_key`
1020- for ( i, ( subq, alias) ) in field_subqueries. iter ( ) . enumerate ( ) {
1021- // Push the subquery itself, aliased.
1022- out. push ( '(' ) ;
1023- out. push_str ( subq) ;
1024- out. push_str ( ") AS " ) ;
1025- out. push_str ( alias) ;
1026-
1027- // Push the join conditions.
1028- if i > 0 {
1029- let previous_alias = & field_subqueries[ i - 1 ] . 1 ;
1030- out. push_str ( " ON " ) ;
1031- out. push_str ( alias) ;
1032- out. push_str ( ".timeseries_key = " ) ;
1033- out. push_str ( previous_alias) ;
1034- out. push_str ( ".timeseries_key" ) ;
1035- }
1001+ // Build a map from field type to pivot subquery. We filter by
1002+ // timeseries_name, group by timeseries_key, and use anyIf to
1003+ // pivot fields to a wide table. We can use anyIf to take the
1004+ // first matching value because a given timeseries key is
1005+ // always associated with the same set of fields, so all rows
1006+ // with a given (timeseries_key, field_name) will have the same
1007+ // field_value.
1008+ let mut query_map: HashMap < oximeter:: FieldType , String > =
1009+ HashMap :: new ( ) ;
1010+ for field_type in field_types. clone ( ) {
1011+ let selects = & select_map[ & field_type] ;
1012+ let query = format ! (
1013+ "(
1014+ SELECT
1015+ {select}
1016+ FROM {db_name}.{from}
1017+ WHERE timeseries_name = '{timeseries_name}'
1018+ GROUP BY timeseries_key
1019+ ) AS {subquery_name}_pivot" ,
1020+ select = selects. join( ", " ) ,
1021+ db_name = crate :: DATABASE_NAME ,
1022+ from = field_table_name( field_type) ,
1023+ timeseries_name = schema. timeseries_name,
1024+ subquery_name = field_table_name( field_type) ,
1025+ ) ;
1026+ query_map. insert ( field_type, query) ;
1027+ }
10361028
1037- // Push the "INNER JOIN" expression itself, for all but the
1038- // last subquery.
1039- if i < field_subqueries. len ( ) - 1 {
1040- out. push_str ( " INNER JOIN " ) ;
1041- }
1029+ // Assemble the final query.
1030+ let mut from = query_map[ & field_types[ 0 ] ] . clone ( ) ;
1031+ for field_type in field_types. iter ( ) . skip ( 1 ) {
1032+ from = format ! (
1033+ "{from} JOIN {query} ON {source}_pivot.timeseries_key = {dest}_pivot.timeseries_key" ,
1034+ from = from,
1035+ query = query_map[ field_type] ,
1036+ source = field_table_name( field_types[ 0 ] ) ,
1037+ dest = field_table_name( * field_type) ,
1038+ ) ;
10421039 }
1043- ( false , out)
1040+ top_selects. push ( format ! (
1041+ "{}_pivot.timeseries_key AS timeseries_key" ,
1042+ field_table_name( field_types[ 0 ] )
1043+ ) ) ;
1044+ let query =
1045+ format ! ( "SELECT {} FROM {}" , top_selects. join( ", " ) , from) ;
1046+ ( false , query)
10441047 }
10451048 }
10461049 }
@@ -1196,7 +1199,7 @@ mod tests {
11961199 AuthzScope , DatumType , FieldSchema , FieldSource , FieldType , Sample ,
11971200 TimeseriesSchema , Units ,
11981201 } ;
1199- use oximeter:: { FieldValue , types:: Cumulative } ;
1202+ use oximeter:: { FieldValue , TimeseriesName , types:: Cumulative } ;
12001203 use oxql_types:: { Table , Timeseries , point:: Points } ;
12011204 use std:: collections:: { BTreeMap , BTreeSet } ;
12021205 use std:: time:: Duration ;
@@ -1337,6 +1340,126 @@ mod tests {
13371340 TestContext { logctx, clickhouse : db, client, test_data }
13381341 }
13391342
1343+ #[ tokio:: test]
1344+ async fn test_get_fields_query ( ) {
1345+ let ctx = setup_oxql_test ( "test_get_fields_query" ) . await ;
1346+
1347+ let schema = ctx
1348+ . client
1349+ . schema_for_timeseries (
1350+ & TimeseriesName :: try_from ( "some_target:some_metric" ) . unwrap ( ) ,
1351+ )
1352+ . await
1353+ . unwrap ( )
1354+ . unwrap ( ) ;
1355+ let query = ctx. client . all_fields_query ( & schema, None ) . unwrap ( ) ;
1356+ let want = "SELECT
1357+ fields_i32_pivot.foo AS foo,
1358+ fields_u32_pivot.index AS index,
1359+ fields_string_pivot.name AS name,
1360+ fields_i32_pivot.timeseries_key AS timeseries_key
1361+ FROM
1362+ (
1363+ SELECT
1364+ timeseries_key,
1365+ anyIf(field_value, field_name = 'foo') AS foo
1366+ FROM oximeter.fields_i32
1367+ WHERE timeseries_name = 'some_target:some_metric'
1368+ GROUP BY timeseries_key
1369+ ) AS fields_i32_pivot
1370+ JOIN
1371+ (
1372+ SELECT
1373+ timeseries_key,
1374+ anyIf(field_value, field_name = 'name') AS name
1375+ FROM oximeter.fields_string
1376+ WHERE timeseries_name = 'some_target:some_metric'
1377+ GROUP BY timeseries_key
1378+ ) AS fields_string_pivot ON fields_i32_pivot.timeseries_key = fields_string_pivot.timeseries_key
1379+ JOIN
1380+ (
1381+ SELECT
1382+ timeseries_key,
1383+ anyIf(field_value, field_name = 'index') AS index
1384+ FROM oximeter.fields_u32
1385+ WHERE timeseries_name = 'some_target:some_metric'
1386+ GROUP BY timeseries_key
1387+ ) AS fields_u32_pivot ON fields_i32_pivot.timeseries_key = fields_u32_pivot.timeseries_key" ;
1388+ assert_eq ! (
1389+ want. split_whitespace( ) . collect:: <Vec <& str >>( ) . join( " " ) ,
1390+ query. split_whitespace( ) . collect:: <Vec <& str >>( ) . join( " " )
1391+ ) ;
1392+
1393+ ctx. cleanup_successful ( ) . await ;
1394+ }
1395+
1396+ #[ tokio:: test]
1397+ async fn test_get_fields ( ) {
1398+ let ctx = setup_oxql_test ( "test_get_fields" ) . await ;
1399+
1400+ #[ derive( Clone , Debug , oximeter:: Metric ) ]
1401+ struct Metric1 {
1402+ foo : i32 ,
1403+ bar : i32 ,
1404+ datum : Cumulative < u64 > ,
1405+ }
1406+ #[ derive( Clone , Debug , oximeter:: Metric ) ]
1407+ struct Metric2 {
1408+ foo : i32 ,
1409+ baz : i32 ,
1410+ datum : Cumulative < u64 > ,
1411+ }
1412+
1413+ // Insert samples for multiple metrics with partially overlapping field
1414+ // names and types. Then we'll query for one of those metrics and
1415+ // assert that we only get the expected fields, and not fields of the
1416+ // same name and type from another metric.
1417+ let samples = [
1418+ Sample :: new (
1419+ & SomeTarget { name : String :: from ( "ts1" ) , index : 1 } ,
1420+ & Metric1 { foo : 1 , bar : 2 , datum : Cumulative :: new ( 5 ) } ,
1421+ )
1422+ . unwrap ( ) ,
1423+ Sample :: new (
1424+ & SomeTarget { name : String :: from ( "ts1" ) , index : 1 } ,
1425+ & Metric1 { foo : 1 , bar : 2 , datum : Cumulative :: new ( 6 ) } ,
1426+ )
1427+ . unwrap ( ) ,
1428+ Sample :: new (
1429+ & SomeTarget { name : String :: from ( "ts2" ) , index : 1 } ,
1430+ & Metric2 { foo : 3 , baz : 4 , datum : Cumulative :: new ( 5 ) } ,
1431+ )
1432+ . unwrap ( ) ,
1433+ Sample :: new (
1434+ & SomeTarget { name : String :: from ( "ts2" ) , index : 1 } ,
1435+ & Metric2 { foo : 3 , baz : 4 , datum : Cumulative :: new ( 6 ) } ,
1436+ )
1437+ . unwrap ( ) ,
1438+ ] ;
1439+ ctx. client
1440+ . insert_samples ( & samples[ ..] )
1441+ . await
1442+ . expect ( "failed to insert samples" ) ;
1443+
1444+ let query = "get some_target:metric2 | filter timestamp > @2020-01-01" ;
1445+ let result = ctx
1446+ . client
1447+ . oxql_query ( query, QueryAuthzScope :: Fleet )
1448+ . await
1449+ . expect ( "failed to run OxQL query" ) ;
1450+
1451+ assert_eq ! ( result. tables. len( ) , 1 , "should be exactly 1 table" ) ;
1452+ let table = result. tables . get ( 0 ) . unwrap ( ) ;
1453+
1454+ assert_eq ! ( table. n_timeseries( ) , 1 , "should be exactly 1 series" ) ;
1455+ let series: Vec < & Timeseries > = table. timeseries ( ) . collect ( ) ;
1456+
1457+ assert_eq ! ( series[ 0 ] . fields. get( "foo" ) . unwrap( ) , & FieldValue :: I32 ( 3 ) ) ;
1458+ assert_eq ! ( series[ 0 ] . fields. get( "baz" ) . unwrap( ) , & FieldValue :: I32 ( 4 ) ) ;
1459+
1460+ ctx. cleanup_successful ( ) . await ;
1461+ }
1462+
13401463 #[ tokio:: test]
13411464 async fn test_get_entire_table ( ) {
13421465 let ctx = setup_oxql_test ( "test_get_entire_table" ) . await ;
0 commit comments