Skip to content

Commit c871610

Browse files
Record transaction metrics off the main thread (#2910)
1 parent e4d5c18 commit c871610

File tree

9 files changed

+197
-41
lines changed

9 files changed

+197
-41
lines changed

crates/core/src/db/mod.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
use std::sync::Arc;
2+
3+
use enum_map::EnumMap;
4+
use tokio::sync::mpsc;
5+
6+
use crate::{
7+
db::datastore::{locking_tx_datastore::datastore::TxMetrics, traits::TxData},
8+
execution_context::WorkloadType,
9+
subscription::ExecutionCounters,
10+
};
11+
112
pub mod datastore;
213
pub mod db_metrics;
314
pub mod relational_db;
@@ -22,3 +33,86 @@ pub struct Config {
2233
/// Specifies the page pool max size in bytes.
2334
pub page_pool_max_size: Option<usize>,
2435
}
36+
37+
/// A message that is processed by the [`spawn_metrics_recorder`] actor.
38+
/// We use a separate task to record metrics to avoid blocking transactions.
39+
pub struct MetricsMessage {
40+
/// The reducer the produced these metrics.
41+
reducer: String,
42+
/// Metrics from a mutable transaction.
43+
metrics_for_writer: Option<TxMetrics>,
44+
/// Metrics from a read-only transaction.
45+
/// A message may have metrics for both types of transactions,
46+
/// because metrics for a reducer and its subscription updates are recorded together.
47+
metrics_for_reader: Option<TxMetrics>,
48+
/// The row updates for an immutable transaction.
49+
/// Needed for insert and delete counters.
50+
tx_data: Option<Arc<TxData>>,
51+
/// Cached metrics counters for each workload type.
52+
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
53+
}
54+
55+
/// The handle used to send work to the tx metrics recorder.
56+
#[derive(Clone)]
57+
pub struct MetricsRecorderQueue {
58+
tx: mpsc::UnboundedSender<MetricsMessage>,
59+
}
60+
61+
impl MetricsRecorderQueue {
62+
pub fn send_metrics(
63+
&self,
64+
reducer: String,
65+
metrics_for_writer: Option<TxMetrics>,
66+
metrics_for_reader: Option<TxMetrics>,
67+
tx_data: Option<Arc<TxData>>,
68+
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
69+
) {
70+
if let Err(err) = self.tx.send(MetricsMessage {
71+
reducer,
72+
metrics_for_writer,
73+
metrics_for_reader,
74+
tx_data,
75+
counters,
76+
}) {
77+
log::warn!("failed to send metrics: {err}");
78+
}
79+
}
80+
}
81+
82+
/// Spawns a task for recording transaction metrics.
83+
/// Returns the handle for pushing metrics to the recorder.
84+
pub fn spawn_tx_metrics_recorder() -> (MetricsRecorderQueue, tokio::task::AbortHandle) {
85+
let (tx, mut rx) = mpsc::unbounded_channel();
86+
let abort_handle = tokio::spawn(async move {
87+
while let Some(MetricsMessage {
88+
reducer,
89+
metrics_for_writer,
90+
metrics_for_reader,
91+
tx_data,
92+
counters,
93+
}) = rx.recv().await
94+
{
95+
if let Some(tx_metrics) = metrics_for_writer {
96+
tx_metrics.report(
97+
// If row updates are present,
98+
// they will always belong to the writer transaction.
99+
tx_data.as_deref(),
100+
&reducer,
101+
|wl| &counters[wl],
102+
);
103+
}
104+
if let Some(tx_metrics) = metrics_for_reader {
105+
tx_metrics.report(
106+
// If row updates are present,
107+
// they will never belong to the reader transaction.
108+
// Passing row updates here will most likely panic.
109+
None,
110+
&reducer,
111+
|wl| &counters[wl],
112+
);
113+
}
114+
}
115+
})
116+
.abort_handle();
117+
(MetricsRecorderQueue { tx }, abort_handle)
118+
}

crates/core/src/db/relational_db.rs

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use super::datastore::{
1818
};
1919
use super::db_metrics::DB_METRICS;
2020
use crate::db::datastore::system_tables::StModuleRow;
21+
use crate::db::MetricsRecorderQueue;
2122
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
2223
use crate::execution_context::{ReducerContext, Workload, WorkloadType};
2324
use crate::messages::control_db::HostType;
@@ -110,14 +111,17 @@ pub struct RelationalDB {
110111
/// `Some` if `durability` is `Some`, `None` otherwise.
111112
disk_size_fn: Option<DiskSizeFn>,
112113

114+
/// A map from workload types to their cached prometheus counters.
115+
workload_type_to_exec_counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
116+
117+
/// An async queue for recording transaction metrics off the main thread
118+
metrics_recorder_queue: Option<MetricsRecorderQueue>,
119+
113120
// DO NOT ADD FIELDS AFTER THIS.
114121
// By default, fields are dropped in declaration order.
115122
// We want to release the file lock last.
116123
// TODO(noa): is this lockfile still necessary now that we have data-dir?
117124
_lock: LockFile,
118-
119-
/// A map from workload types to their cached prometheus counters.
120-
workload_type_to_exec_counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
121125
}
122126

123127
#[derive(Clone)]
@@ -231,6 +235,7 @@ impl RelationalDB {
231235
inner: Locking,
232236
durability: Option<(Arc<Durability>, DiskSizeFn)>,
233237
snapshot_repo: Option<Arc<SnapshotRepository>>,
238+
metrics_recorder_queue: Option<MetricsRecorderQueue>,
234239
) -> Self {
235240
let (durability, disk_size_fn) = durability.unzip();
236241
let snapshot_worker =
@@ -249,8 +254,10 @@ impl RelationalDB {
249254
row_count_fn: default_row_count_fn(database_identity),
250255
disk_size_fn,
251256

252-
_lock: lock,
253257
workload_type_to_exec_counters,
258+
metrics_recorder_queue,
259+
260+
_lock: lock,
254261
}
255262
}
256263

@@ -324,6 +331,10 @@ impl RelationalDB {
324331
/// If restoring from an existing database, the `snapshot_repo` must
325332
/// store views of the same sequence of TXes as the `history`.
326333
///
334+
/// - `metrics_recorder_queue`
335+
///
336+
/// The send side of a queue for recording transaction metrics.
337+
///
327338
/// # Return values
328339
///
329340
/// Alongside `Self`, [`ConnectedClients`] is returned, which is the set of
@@ -333,13 +344,15 @@ impl RelationalDB {
333344
/// gracefully. The caller is responsible for disconnecting the clients.
334345
///
335346
/// [ModuleHost]: crate::host::module_host::ModuleHost
347+
#[allow(clippy::too_many_arguments)]
336348
pub fn open(
337349
root: &ReplicaDir,
338350
database_identity: Identity,
339351
owner_identity: Identity,
340352
history: impl durability::History<TxData = Txdata>,
341353
durability: Option<(Arc<Durability>, DiskSizeFn)>,
342354
snapshot_repo: Option<Arc<SnapshotRepository>>,
355+
metrics_recorder_queue: Option<MetricsRecorderQueue>,
343356
page_pool: PagePool,
344357
) -> Result<(Self, ConnectedClients), DBError> {
345358
log::trace!("[{}] DATABASE: OPEN", database_identity);
@@ -373,6 +386,7 @@ impl RelationalDB {
373386
inner,
374387
durability,
375388
snapshot_repo,
389+
metrics_recorder_queue,
376390
);
377391

378392
if let Some(meta) = db.metadata()? {
@@ -749,6 +763,11 @@ impl RelationalDB {
749763
Ok(AlgebraicValue::decode(col_ty, &mut &*bytes)?)
750764
}
751765

766+
/// Returns the execution counters for this database.
767+
pub fn exec_counter_map(&self) -> Arc<EnumMap<WorkloadType, ExecutionCounters>> {
768+
self.workload_type_to_exec_counters.clone()
769+
}
770+
752771
/// Returns the execution counters for `workload_type` for this database.
753772
pub fn exec_counters_for(&self, workload_type: WorkloadType) -> &ExecutionCounters {
754773
&self.workload_type_to_exec_counters[workload_type]
@@ -988,7 +1007,7 @@ impl RelationalDB {
9881007
let mut tx = self.begin_tx(workload);
9891008
let res = f(&mut tx);
9901009
let (tx_metrics, reducer) = self.release_tx(tx);
991-
self.report_tx_metricses(&reducer, None, None, &tx_metrics);
1010+
self.report_read_tx_metrics(reducer, tx_metrics);
9921011
res
9931012
}
9941013

@@ -999,11 +1018,11 @@ impl RelationalDB {
9991018
{
10001019
if res.is_err() {
10011020
let (tx_metrics, reducer) = self.rollback_mut_tx(tx);
1002-
self.report(&reducer, &tx_metrics, None);
1021+
self.report_mut_tx_metrics(reducer, tx_metrics, None);
10031022
} else {
10041023
match self.commit_tx(tx).map_err(E::from)? {
10051024
Some((tx_data, tx_metrics, reducer)) => {
1006-
self.report(&reducer, &tx_metrics, Some(&tx_data));
1025+
self.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data));
10071026
}
10081027
None => panic!("TODO: retry?"),
10091028
}
@@ -1018,7 +1037,7 @@ impl RelationalDB {
10181037
match res {
10191038
Err(e) => {
10201039
let (tx_metrics, reducer) = self.rollback_mut_tx(tx);
1021-
self.report(&reducer, &tx_metrics, None);
1040+
self.report_mut_tx_metrics(reducer, tx_metrics, None);
10221041

10231042
Err(e)
10241043
}
@@ -1042,17 +1061,22 @@ impl RelationalDB {
10421061
/// Reports the `TxMetrics`s passed.
10431062
///
10441063
/// Should only be called after the tx lock has been fully released.
1045-
pub(crate) fn report_tx_metricses(
1064+
pub(crate) fn report_tx_metrics(
10461065
&self,
1047-
reducer: &str,
1048-
tx_data: Option<&TxData>,
1049-
metrics_mut: Option<&TxMetrics>,
1050-
metrics_read: &TxMetrics,
1066+
reducer: String,
1067+
tx_data: Option<Arc<TxData>>,
1068+
metrics_for_writer: Option<TxMetrics>,
1069+
metrics_for_reader: Option<TxMetrics>,
10511070
) {
1052-
if let Some(metrics_mut) = metrics_mut {
1053-
self.report(reducer, metrics_mut, tx_data);
1071+
if let Some(recorder) = &self.metrics_recorder_queue {
1072+
recorder.send_metrics(
1073+
reducer,
1074+
metrics_for_writer,
1075+
metrics_for_reader,
1076+
tx_data,
1077+
self.exec_counter_map(),
1078+
);
10541079
}
1055-
self.report(reducer, metrics_read, None);
10561080
}
10571081
}
10581082

@@ -1403,8 +1427,13 @@ impl RelationalDB {
14031427
}
14041428

14051429
/// Reports the metrics for `reducer`, using counters provided by `db`.
1406-
pub fn report(&self, reducer: &str, metrics: &TxMetrics, tx_data: Option<&TxData>) {
1407-
metrics.report(tx_data, reducer, |wl: WorkloadType| self.exec_counters_for(wl));
1430+
pub fn report_mut_tx_metrics(&self, reducer: String, metrics: TxMetrics, tx_data: Option<TxData>) {
1431+
self.report_tx_metrics(reducer, tx_data.map(Arc::new), Some(metrics), None);
1432+
}
1433+
1434+
/// Reports subscription metrics for `reducer`, using counters provided by `db`.
1435+
pub fn report_read_tx_metrics(&self, reducer: String, metrics: TxMetrics) {
1436+
self.report_tx_metrics(reducer, None, None, Some(metrics));
14081437
}
14091438

14101439
/// Read the value of [ST_VARNAME_ROW_LIMIT] from `st_var`
@@ -1779,7 +1808,7 @@ pub mod tests_utils {
17791808
expected_num_clients: usize,
17801809
) -> Result<Self, DBError> {
17811810
let dir = TempReplicaDir::new()?;
1782-
let db = Self::open_db(&dir, history, None, None, expected_num_clients)?;
1811+
let db = Self::open_db(&dir, history, None, None, None, expected_num_clients)?;
17831812
Ok(Self {
17841813
db,
17851814
durable: None,
@@ -1870,7 +1899,7 @@ pub mod tests_utils {
18701899
}
18711900

18721901
fn in_memory_internal(root: &ReplicaDir) -> Result<RelationalDB, DBError> {
1873-
Self::open_db(root, EmptyHistory::new(), None, None, 0)
1902+
Self::open_db(root, EmptyHistory::new(), None, None, None, 0)
18741903
}
18751904

18761905
fn durable_internal(
@@ -1884,7 +1913,7 @@ pub mod tests_utils {
18841913
let snapshot_repo = want_snapshot_repo
18851914
.then(|| open_snapshot_repo(root.snapshots(), Identity::ZERO, 0))
18861915
.transpose()?;
1887-
let db = Self::open_db(root, history, Some((durability, disk_size_fn)), snapshot_repo, 0)?;
1916+
let db = Self::open_db(root, history, Some((durability, disk_size_fn)), snapshot_repo, None, 0)?;
18881917

18891918
Ok((db, local))
18901919
}
@@ -1894,6 +1923,7 @@ pub mod tests_utils {
18941923
history: impl durability::History<TxData = Txdata>,
18951924
durability: Option<(Arc<Durability>, DiskSizeFn)>,
18961925
snapshot_repo: Option<Arc<SnapshotRepository>>,
1926+
metrics_recorder_queue: Option<MetricsRecorderQueue>,
18971927
expected_num_clients: usize,
18981928
) -> Result<RelationalDB, DBError> {
18991929
let (db, connected_clients) = RelationalDB::open(
@@ -1903,6 +1933,7 @@ pub mod tests_utils {
19031933
history,
19041934
durability,
19051935
snapshot_repo,
1936+
metrics_recorder_queue,
19061937
PagePool::new_for_test(),
19071938
)?;
19081939
assert_eq!(connected_clients.len(), expected_num_clients);
@@ -2151,6 +2182,7 @@ mod tests {
21512182
EmptyHistory::new(),
21522183
None,
21532184
None,
2185+
None,
21542186
PagePool::new_for_test(),
21552187
) {
21562188
Ok(_) => {

0 commit comments

Comments
 (0)