Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 202 additions & 79 deletions oximeter/db/src/client/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use slog::Logger;
use slog::debug;
use slog::trace;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::time::Duration;
use std::time::Instant;
use uuid::Uuid;
Expand Down Expand Up @@ -949,98 +950,100 @@ impl Client {
Ok(query)
}

// Build a reasonably efficient query to retrieve all fields for a given
// timeseries. Joins in ClickHouse are expensive, so aggregate all relevant
// fields from each relevant fields table in a single subquery, then join
// the results together. This results in n - 1 joins, where n is the number
// of relevant fields tables. Note that we may be able to improve
// performance in future ClickHouse versions, which have better support for
// Variant types, better support for the merge() table function, and faster
// joins.
fn all_fields_query_raw(
&self,
schema: &TimeseriesSchema,
) -> (bool, String) {
match schema.field_schema.len() {
0 => unreachable!(),
1 => {
let field_schema = schema.field_schema.first().unwrap();
(
true,
format!(
"SELECT DISTINCT timeseries_key, field_value AS {field_name} \
FROM {db_name}.{field_table} \
WHERE \
timeseries_name = '{timeseries_name}' AND \
field_name = '{field_name}'",
field_name = field_schema.name,
db_name = crate::DATABASE_NAME,
field_table = field_table_name(field_schema.field_type),
timeseries_name = schema.timeseries_name,
),
)
}
_ => {
let mut top_level_columns =
Vec::with_capacity(schema.field_schema.len());
let mut field_subqueries =
Vec::with_capacity(schema.field_schema.len());

// Select each field value, aliasing it to its field name.
// Build a vector of top-level select expressions, as well as a
// map from fields to lists of subquery select expressions.
let mut top_selects: Vec<String> = Vec::new();
let mut select_map: HashMap<oximeter::FieldType, Vec<String>> =
HashMap::new();
for field_schema in schema.field_schema.iter() {
top_level_columns.push(format!(
"filter_on_{}.field_value AS {}",
field_schema.name, field_schema.name
));
field_subqueries.push((
format!(
"SELECT DISTINCT timeseries_key, field_value \
FROM {db_name}.{field_table} \
WHERE \
timeseries_name = '{timeseries_name}' AND \
field_name = '{field_name}' \
",
db_name = crate::DATABASE_NAME,
field_table =
field_table_name(field_schema.field_type),
timeseries_name = schema.timeseries_name,
field_name = field_schema.name,
),
format!("filter_on_{}", field_schema.name),
select_map
.entry(field_schema.field_type)
.or_insert_with(|| vec![String::from("timeseries_key")])
.push(format!(
"anyIf(field_value, field_name = '{}') AS {}",
field_schema.name, field_schema.name
));
top_selects.push(format!(
"{}_pivot.{} AS {}",
field_table_name(field_schema.field_type),
field_schema.name,
field_schema.name
));
}

// Write the top-level select statement, starting by selecting
// the timeseries key from the first field schema.
let mut out = format!(
"SELECT {}.timeseries_key AS timeseries_key, {} FROM ",
field_subqueries[0].1,
top_level_columns.join(", "),
);
// Sort field tables by number of columns, descending.
// ClickHouse recommends joining larger tables to smaller
// tables, and doesn't currently reorder joins automatically.
let mut field_types: Vec<oximeter::FieldType> =
select_map.keys().cloned().collect();
field_types.sort_by(|a, b| {
select_map[b]
.len()
.cmp(&select_map[a].len())
.then(field_table_name(*a).cmp(&field_table_name(*b)))
});

// Then add all the subqueries selecting each field.
//
// We need to add these, along with their aliases. The first
// such subquery has no join conditions, but the later ones all
// refer to the previous via:
//
// `ON <previous_filter_name>.timeseries_key = <current_filter_name>.timeseries_key`
for (i, (subq, alias)) in field_subqueries.iter().enumerate() {
// Push the subquery itself, aliased.
out.push('(');
out.push_str(subq);
out.push_str(") AS ");
out.push_str(alias);

// Push the join conditions.
if i > 0 {
let previous_alias = &field_subqueries[i - 1].1;
out.push_str(" ON ");
out.push_str(alias);
out.push_str(".timeseries_key = ");
out.push_str(previous_alias);
out.push_str(".timeseries_key");
}
// Build a map from field type to pivot subquery. We filter by
// timeseries_name, group by timeseries_key, and use anyIf to
// pivot fields to a wide table. We can use anyIf to take the
// first matching value because a given timeseries key is
// always associated with the same set of fields, so all rows
// with a given (timeseries_key, field_name) will have the same
// field_value.
let mut query_map: HashMap<oximeter::FieldType, String> =
HashMap::new();
for field_type in field_types.clone() {
let selects = &select_map[&field_type];
let query = format!(
"(
SELECT
{select}
FROM {db_name}.{from}
WHERE timeseries_name = '{timeseries_name}'
GROUP BY timeseries_key
) AS {subquery_name}_pivot",
select = selects.join(", "),
db_name = crate::DATABASE_NAME,
from = field_table_name(field_type),
timeseries_name = schema.timeseries_name,
subquery_name = field_table_name(field_type),
);
query_map.insert(field_type, query);
}

// Push the "INNER JOIN" expression itself, for all but the
// last subquery.
if i < field_subqueries.len() - 1 {
out.push_str(" INNER JOIN ");
}
// Assemble the final query.
let mut from = query_map[&field_types[0]].clone();
for field_type in field_types.iter().skip(1) {
from = format!(
"{from} JOIN {query} ON {source}_pivot.timeseries_key = {dest}_pivot.timeseries_key",
from = from,
query = query_map[field_type],
source = field_table_name(field_types[0]),
dest = field_table_name(*field_type),
);
}
(false, out)
top_selects.push(format!(
"{}_pivot.timeseries_key AS timeseries_key",
field_table_name(field_types[0])
));
let query =
format!("SELECT {} FROM {}", top_selects.join(", "), from);
(false, query)
}
}
}
Expand Down Expand Up @@ -1196,7 +1199,7 @@ mod tests {
AuthzScope, DatumType, FieldSchema, FieldSource, FieldType, Sample,
TimeseriesSchema, Units,
};
use oximeter::{FieldValue, types::Cumulative};
use oximeter::{FieldValue, TimeseriesName, types::Cumulative};
use oxql_types::{Table, Timeseries, point::Points};
use std::collections::{BTreeMap, BTreeSet};
use std::time::Duration;
Expand Down Expand Up @@ -1337,6 +1340,126 @@ mod tests {
TestContext { logctx, clickhouse: db, client, test_data }
}

#[tokio::test]
async fn test_get_fields_query() {
let ctx = setup_oxql_test("test_get_fields_query").await;

let schema = ctx
.client
.schema_for_timeseries(
&TimeseriesName::try_from("some_target:some_metric").unwrap(),
)
.await
.unwrap()
.unwrap();
let query = ctx.client.all_fields_query(&schema, None).unwrap();
let want = "SELECT
fields_i32_pivot.foo AS foo,
fields_u32_pivot.index AS index,
fields_string_pivot.name AS name,
fields_i32_pivot.timeseries_key AS timeseries_key
FROM
(
SELECT
timeseries_key,
anyIf(field_value, field_name = 'foo') AS foo
FROM oximeter.fields_i32
WHERE timeseries_name = 'some_target:some_metric'
GROUP BY timeseries_key
) AS fields_i32_pivot
JOIN
(
SELECT
timeseries_key,
anyIf(field_value, field_name = 'name') AS name
FROM oximeter.fields_string
WHERE timeseries_name = 'some_target:some_metric'
GROUP BY timeseries_key
) AS fields_string_pivot ON fields_i32_pivot.timeseries_key = fields_string_pivot.timeseries_key
JOIN
(
SELECT
timeseries_key,
anyIf(field_value, field_name = 'index') AS index
FROM oximeter.fields_u32
WHERE timeseries_name = 'some_target:some_metric'
GROUP BY timeseries_key
) AS fields_u32_pivot ON fields_i32_pivot.timeseries_key = fields_u32_pivot.timeseries_key";
assert_eq!(
want.split_whitespace().collect::<Vec<&str>>().join(" "),
query.split_whitespace().collect::<Vec<&str>>().join(" ")
);

ctx.cleanup_successful().await;
}

#[tokio::test]
async fn test_get_fields() {
let ctx = setup_oxql_test("test_get_fields").await;

#[derive(Clone, Debug, oximeter::Metric)]
struct Metric1 {
foo: i32,
bar: i32,
datum: Cumulative<u64>,
}
#[derive(Clone, Debug, oximeter::Metric)]
struct Metric2 {
foo: i32,
baz: i32,
datum: Cumulative<u64>,
}

// Insert samples for multiple metrics with partially overlapping field
// names and types. Then we'll query for one of those metrics and
// assert that we only get the expected fields, and not fields of the
// same name and type from another metric.
let samples = [
Sample::new(
&SomeTarget { name: String::from("ts1"), index: 1 },
&Metric1 { foo: 1, bar: 2, datum: Cumulative::new(5) },
)
.unwrap(),
Sample::new(
&SomeTarget { name: String::from("ts1"), index: 1 },
&Metric1 { foo: 1, bar: 2, datum: Cumulative::new(6) },
)
.unwrap(),
Sample::new(
&SomeTarget { name: String::from("ts2"), index: 1 },
&Metric2 { foo: 3, baz: 4, datum: Cumulative::new(5) },
)
.unwrap(),
Sample::new(
&SomeTarget { name: String::from("ts2"), index: 1 },
&Metric2 { foo: 3, baz: 4, datum: Cumulative::new(6) },
)
.unwrap(),
];
ctx.client
.insert_samples(&samples[..])
.await
.expect("failed to insert samples");

let query = "get some_target:metric2 | filter timestamp > @2020-01-01";
let result = ctx
.client
.oxql_query(query, QueryAuthzScope::Fleet)
.await
.expect("failed to run OxQL query");

assert_eq!(result.tables.len(), 1, "should be exactly 1 table");
let table = result.tables.get(0).unwrap();

assert_eq!(table.n_timeseries(), 1, "should be exactly 1 series");
let series: Vec<&Timeseries> = table.timeseries().collect();

assert_eq!(series[0].fields.get("foo").unwrap(), &FieldValue::I32(3));
assert_eq!(series[0].fields.get("baz").unwrap(), &FieldValue::I32(4));

ctx.cleanup_successful().await;
}

#[tokio::test]
async fn test_get_entire_table() {
let ctx = setup_oxql_test("test_get_entire_table").await;
Expand Down
Loading