Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 103 additions & 24 deletions extension/src/heartbeat_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,85 @@ pub fn uptime(agg: HeartbeatAgg<'static>) -> Interval {
agg.sum_live_intervals().into()
}

#[pg_extern]
pub fn average_downtime(agg: HeartbeatAgg<'static>) -> f64 {
let num_intervals = agg.num_intervals;
(agg.end_time - agg.start_time - agg.sum_live_intervals()) as f64 / num_intervals as f64
}

#[pg_extern]
pub fn average_uptime(agg: HeartbeatAgg<'static>) -> f64 {
let num_intervals = agg.num_intervals;
agg.sum_live_intervals() as f64 / num_intervals as f64
}

fn mean(data: &Vec<i64>) -> Option<f64> {
let sum = data.iter().sum::<i64>() as f64;
let count = data.len();

match count {
positive if positive > 0 => Some(sum / count as f64),
_ => None,
}
}

fn std_deviation(data: &Vec<i64>) -> Option<f64> {
match (mean(data), data.len()) {
(Some(data_mean), count) if count > 0 => {
let variance = data
.iter()
.map(|value| {
let diff = data_mean - (*value as f64);

diff * diff
})
.sum::<f64>()
/ count as f64;

Some(variance.sqrt())
}
_ => None,
}
}

#[pg_extern]
pub fn stddev_downtime(agg: HeartbeatAgg<'static>) -> Option<f64> {
// Dead ranges are the opposite of the intervals stored in the aggregate
let mut starts = agg.interval_ends.clone().into_vec();
let mut ends = agg.interval_starts.clone().into_vec();

// Fix the first point depending on whether the aggregate starts in a live or dead range
if ends[0] == agg.start_time {
ends.remove(0);
} else {
starts.insert(0, agg.start_time);
}

// Fix the last point depending on whether the aggregate starts in a live or dead range
if *starts.last().unwrap() == agg.end_time {
starts.pop();
} else {
ends.push(agg.end_time);
}

let mut data: Vec<i64> = vec![];
for i in 0..agg.num_intervals as usize {
data.push(ends[i] - starts[i]);
}
std_deviation(&data)
}

#[pg_extern]
pub fn stddev_uptime(agg: HeartbeatAgg<'static>) -> Option<f64> {
let starts = agg.interval_starts.as_slice();
let ends = agg.interval_ends.as_slice();
let mut data: Vec<i64> = vec![];
for i in 0..agg.num_intervals as usize {
data.push(ends[i] - starts[i]);
}
std_deviation(&data)
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_heartbeat_agg_uptime(
Expand Down Expand Up @@ -1014,7 +1093,7 @@ mod tests {
let (result1, result2, result3) =
client.update(
"WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness)
SELECT live_at(agg, '01-01-2020 00:01:00 UTC')::TEXT,
SELECT live_at(agg, '01-01-2020 00:01:00 UTC')::TEXT,
live_at(agg, '01-01-2020 00:05:00 UTC')::TEXT,
live_at(agg, '01-01-2020 00:30:00 UTC')::TEXT FROM agg", None, None)
.unwrap().first()
Expand All @@ -1035,7 +1114,7 @@ mod tests {
let (result1, result2, result3) =
client.update(
"WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness)
SELECT (agg -> live_at('01-01-2020 00:01:00 UTC'))::TEXT,
SELECT (agg -> live_at('01-01-2020 00:01:00 UTC'))::TEXT,
(agg -> live_at('01-01-2020 00:05:00 UTC'))::TEXT,
(agg -> live_at('01-01-2020 00:30:00 UTC'))::TEXT FROM agg", None, None)
.unwrap().first()
Expand Down Expand Up @@ -1110,7 +1189,7 @@ mod tests {
.update(
"WITH aggs AS (
SELECT heartbeat_agg(time, batch, '1h', '1m')
FROM heartbeats
FROM heartbeats
GROUP BY batch
) SELECT rollup(heartbeat_agg)::TEXT FROM aggs",
None,
Expand Down Expand Up @@ -1348,12 +1427,12 @@ mod tests {
.update(
"WITH s AS (
SELECT start,
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
GROUP BY start),
t AS (
SELECT start,
interpolate(agg, LAG (agg) OVER (ORDER BY start)) AS agg
interpolate(agg, LAG (agg) OVER (ORDER BY start)) AS agg
FROM s)
SELECT downtime(agg)::TEXT FROM t;",
None,
Expand Down Expand Up @@ -1395,12 +1474,12 @@ mod tests {
.update(
"WITH s AS (
SELECT start,
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
GROUP BY start),
t AS (
SELECT start,
interpolate(agg, LAG (agg) OVER (ORDER BY start)) AS agg
interpolate(agg, LAG (agg) OVER (ORDER BY start)) AS agg
FROM s)
SELECT live_ranges(agg)::TEXT FROM t;",
None,
Expand Down Expand Up @@ -1455,12 +1534,12 @@ mod tests {
.update(
"WITH s AS (
SELECT start,
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
GROUP BY start),
t AS (
SELECT start,
agg -> interpolate(LAG (agg) OVER (ORDER BY start)) AS agg
agg -> interpolate(LAG (agg) OVER (ORDER BY start)) AS agg
FROM s)
SELECT live_ranges(agg)::TEXT FROM t;",
None,
Expand Down Expand Up @@ -1515,8 +1594,8 @@ mod tests {
.update(
"WITH s AS (
SELECT start,
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
GROUP BY start)
SELECT interpolated_uptime(agg, LAG (agg) OVER (ORDER BY start))::TEXT
FROM s",
Expand Down Expand Up @@ -1559,8 +1638,8 @@ mod tests {
.update(
"WITH s AS (
SELECT start,
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
GROUP BY start)
SELECT (agg -> interpolated_uptime(LAG (agg) OVER (ORDER BY start)))::TEXT
FROM s",
Expand Down Expand Up @@ -1603,8 +1682,8 @@ mod tests {
.update(
"WITH s AS (
SELECT start,
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
GROUP BY start)
SELECT interpolated_downtime(agg, LAG (agg) OVER (ORDER BY start))::TEXT
FROM s",
Expand Down Expand Up @@ -1647,8 +1726,8 @@ mod tests {
.update(
"WITH s AS (
SELECT start,
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
heartbeat_agg(heartbeat, start, '30m', '10m') AS agg
FROM liveness
GROUP BY start)
SELECT (agg -> interpolated_downtime(LAG (agg) OVER (ORDER BY start)))::TEXT
FROM s",
Expand Down Expand Up @@ -1836,12 +1915,12 @@ mod tests {
let output = client
.update(
"WITH rollups AS (
SELECT heartbeat_agg(ts, batch, '2h', '20m')
FROM poc
GROUP BY batch
SELECT heartbeat_agg(ts, batch, '2h', '20m')
FROM poc
GROUP BY batch
ORDER BY batch
)
SELECT live_ranges(rollup(heartbeat_agg))::TEXT
SELECT live_ranges(rollup(heartbeat_agg))::TEXT
FROM rollups",
None,
None,
Expand Down
5 changes: 5 additions & 0 deletions extension/src/stabilization_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ crate::functions_stabilized_at! {
num_live_ranges(heartbeatagg),
trim_to(heartbeatagg,timestamp with time zone,interval),
trim_to(timestamp with time zone,interval),
average_downtime(heartbeatagg),
average_uptime(heartbeatagg),
stddev_downtime(heartbeatagg),
stddev_uptime(heartbeatagg),
accessorpercentilearray_in(cstring),
accessorpercentilearray_out(accessorpercentilearray),
arrow_uddsketch_approx_percentile_array(uddsketch,accessorpercentilearray),
Expand Down Expand Up @@ -274,6 +278,7 @@ crate::functions_stabilized_at! {
live_ranges(heartbeatagg),
rollup(heartbeatagg),
uptime(heartbeatagg),

accessordeadranges_in(cstring),
accessordeadranges_out(accessordeadranges),
accessordowntime_in(cstring),
Expand Down