Skip to content

Commit f2cf5d1

Browse files
Compute read sets for MutTxId via InstanceEnv (#3520)
# Description of Changes <!-- Please describe your change, mention any related tickets, and so on here. --> The `InstanceEnv` will now compute read sets when executing a view and store them in the `MutTxId`. These read sets track table scans as well as singular index key scans. Index key ranges will be tracked in the future, but for now an index range scan is treated as a full table scan. These read sets are maintained as part of the `CommittedState`. TODO: Check write sets against read sets. Re-evaluate views in the case of overlap and update the read sets. # API and ABI breaking changes <!-- If this is an API or ABI breaking change, please apply the corresponding GitHub label. --> None # Expected complexity level and risk <!-- How complicated do you think these changes are? Grade on a scale from 1 to 5, where 1 is a trivial change, and 5 is a deep-reaching and complex change. This complexity rating applies not only to the complexity apparent in the diff, but also to its interactions with existing and future code. If you answered more than a 2, explain what is complex about the PR, and what other components it interacts with in potentially concerning ways. --> 1 # Testing <!-- Describe any testing you've done, and any testing you'd like your reviewers to do, so that you're confident that all the changes work as expected! --> - [ ] Unit tests
1 parent 3db42a7 commit f2cf5d1

File tree

5 files changed

+202
-19
lines changed

5 files changed

+202
-19
lines changed

crates/core/src/db/relational_db.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ use std::collections::HashSet;
5757
use std::fmt;
5858
use std::fs::File;
5959
use std::io;
60-
use std::ops::RangeBounds;
60+
use std::ops::{Bound, RangeBounds};
6161
use std::path::Path;
6262
use std::sync::Arc;
6363
use tokio::sync::watch;
@@ -1353,7 +1353,15 @@ impl RelationalDB {
13531353
prefix_elems: ColId,
13541354
rstart: &[u8],
13551355
rend: &[u8],
1356-
) -> Result<(TableId, impl Iterator<Item = RowRef<'a>>), DBError> {
1356+
) -> Result<
1357+
(
1358+
TableId,
1359+
Bound<AlgebraicValue>,
1360+
Bound<AlgebraicValue>,
1361+
impl Iterator<Item = RowRef<'a>>,
1362+
),
1363+
DBError,
1364+
> {
13571365
Ok(tx.index_scan_range(index_id, prefix, prefix_elems, rstart, rend)?)
13581366
}
13591367

crates/core/src/host/instance_env.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use smallvec::SmallVec;
1111
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
1212
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
1313
use spacetimedb_lib::{ConnectionId, Identity, Timestamp};
14-
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
14+
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId, ViewId};
1515
use spacetimedb_sats::{
1616
bsatn::{self, ToBsatn},
1717
buffer::{CountWriter, TeeWriter},
@@ -31,6 +31,7 @@ pub struct InstanceEnv {
3131
pub tx: TxSlot,
3232
/// The timestamp the current reducer began running.
3333
pub start_time: Timestamp,
34+
pub view_id: Option<ViewId>,
3435
}
3536

3637
#[derive(Clone, Default)]
@@ -172,6 +173,7 @@ impl InstanceEnv {
172173
scheduler,
173174
tx: TxSlot::default(),
174175
start_time: Timestamp::now(),
176+
view_id: None,
175177
}
176178
}
177179

@@ -183,6 +185,12 @@ impl InstanceEnv {
183185
/// Signal to this `InstanceEnv` that a reducer call is beginning.
184186
pub fn start_reducer(&mut self, ts: Timestamp) {
185187
self.start_time = ts;
188+
self.view_id = None;
189+
}
190+
191+
/// Signal to this `InstanceEnv` that a we're going to execute a view and compute its read set.
192+
pub fn start_view(&mut self, view_id: ViewId) {
193+
self.view_id = Some(view_id);
186194
}
187195

188196
fn get_tx(&self) -> Result<impl DerefMut<Target = MutTxId> + '_, GetTxError> {
@@ -375,7 +383,7 @@ impl InstanceEnv {
375383
let tx = &mut *self.tx.get()?;
376384

377385
// Find all rows in the table to delete.
378-
let (table_id, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
386+
let (table_id, _, _, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
379387
// Re. `SmallVec`, `delete_by_field` only cares about 1 element, so optimize for that.
380388
let rows_to_delete = iter.map(|row_ref| row_ref.pointer()).collect::<SmallVec<[_; 1]>>();
381389

@@ -460,7 +468,11 @@ impl InstanceEnv {
460468
let tx = &mut *self.get_tx()?;
461469

462470
// Query the row count for id.
463-
stdb.table_row_count_mut(tx, table_id).ok_or(NodesError::TableNotFound)
471+
stdb.table_row_count_mut(tx, table_id)
472+
.ok_or(NodesError::TableNotFound)
473+
.inspect(|_| {
474+
tx.record_table_scan(self.view_id, table_id);
475+
})
464476
}
465477

466478
#[tracing::instrument(level = "trace", skip_all)]
@@ -484,6 +496,8 @@ impl InstanceEnv {
484496
&mut bytes_scanned,
485497
);
486498

499+
tx.record_table_scan(self.view_id, table_id);
500+
487501
tx.metrics.rows_scanned += rows_scanned;
488502
tx.metrics.bytes_scanned += bytes_scanned;
489503

@@ -508,11 +522,13 @@ impl InstanceEnv {
508522
let mut bytes_scanned = 0;
509523

510524
// Open index iterator
511-
let (_, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
525+
let (table_id, lower, upper, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
512526

513527
// Scan the index and serialize rows to bsatn
514528
let chunks = ChunkedWriter::collect_iter(pool, iter, &mut rows_scanned, &mut bytes_scanned);
515529

530+
tx.record_index_scan(self.view_id, table_id, index_id, lower, upper);
531+
516532
tx.metrics.index_seeks += 1;
517533
tx.metrics.rows_scanned += rows_scanned;
518534
tx.metrics.bytes_scanned += bytes_scanned;
@@ -648,6 +664,7 @@ mod test {
648664
scheduler,
649665
tx: TxSlot::default(),
650666
start_time: Timestamp::now(),
667+
view_id: None,
651668
},
652669
runtime,
653670
))

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 76 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,11 @@ use super::{
66
tx_state::{IndexIdMap, PendingSchemaChange, TxState},
77
IterByColEqTx,
88
};
9-
use crate::system_tables::{
10-
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID,
11-
ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX,
12-
};
139
use crate::{
1410
db_metrics::DB_METRICS,
1511
error::{DatastoreError, IndexError, TableError},
1612
execution_context::ExecutionContext,
17-
locking_tx_datastore::state_view::iter_st_column_for_table,
13+
locking_tx_datastore::{mut_tx::ViewReadSets, state_view::iter_st_column_for_table},
1814
system_tables::{
1915
system_tables, StColumnRow, StConstraintData, StConstraintRow, StIndexRow, StSequenceRow, StTableFields,
2016
StTableRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME,
@@ -25,12 +21,19 @@ use crate::{
2521
},
2622
traits::TxData,
2723
};
24+
use crate::{
25+
locking_tx_datastore::mut_tx::ReadSet,
26+
system_tables::{
27+
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID,
28+
ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX,
29+
},
30+
};
2831
use anyhow::anyhow;
2932
use core::{convert::Infallible, ops::RangeBounds};
30-
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
33+
use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap, IntSet};
3134
use spacetimedb_durability::TxOffset;
3235
use spacetimedb_lib::{db::auth::StTableType, Identity};
33-
use spacetimedb_primitives::{ColId, ColList, ColSet, IndexId, TableId};
36+
use spacetimedb_primitives::{ColId, ColList, ColSet, IndexId, TableId, ViewId};
3437
use spacetimedb_sats::{algebraic_value::de::ValueDeserializer, memory_usage::MemoryUsage, Deserialize};
3538
use spacetimedb_sats::{AlgebraicValue, ProductValue};
3639
use spacetimedb_schema::{
@@ -47,6 +50,40 @@ use std::collections::BTreeMap;
4750
use std::sync::Arc;
4851
use thin_vec::ThinVec;
4952

53+
type IndexKeyReadSet = HashMap<AlgebraicValue, IntSet<ViewId>>;
54+
type IndexColReadSet = HashMap<ColList, IndexKeyReadSet>;
55+
56+
#[derive(Default)]
57+
struct CommittedReadSets {
58+
tables: IntMap<TableId, IntSet<ViewId>>,
59+
index_keys: IntMap<TableId, IndexColReadSet>,
60+
}
61+
62+
impl MemoryUsage for CommittedReadSets {
63+
fn heap_usage(&self) -> usize {
64+
self.tables.heap_usage() + self.index_keys.heap_usage()
65+
}
66+
}
67+
68+
impl CommittedReadSets {
69+
/// Record in the [`CommittedState`] that this view scans this table
70+
fn view_scans_table(&mut self, view_id: ViewId, table_id: TableId) {
71+
self.tables.entry(table_id).or_default().insert(view_id);
72+
}
73+
74+
/// Record in the [`CommittedState`] that this view reads this index `key` for these table `cols`
75+
fn view_reads_index_key(&mut self, view_id: ViewId, table_id: TableId, cols: ColList, key: &AlgebraicValue) {
76+
self.index_keys
77+
.entry(table_id)
78+
.or_default()
79+
.entry(cols)
80+
.or_default()
81+
.entry(key.clone())
82+
.or_default()
83+
.insert(view_id);
84+
}
85+
}
86+
5087
/// Contains the live, in-memory snapshot of a database. This structure
5188
/// is exposed in order to support tools wanting to process the commit
5289
/// logs directly. For normal usage, see the RelationalDB struct instead.
@@ -72,6 +109,11 @@ pub struct CommittedState {
72109
/// We should split `CommittedState` into two types
73110
/// where one, e.g., `ReplayCommittedState`, has this field.
74111
table_dropped: IntSet<TableId>,
112+
/// We track the read sets for each view in the committed state.
113+
/// We check each reducer's write set against these read sets.
114+
/// Any overlap will trigger a re-evaluation of the affected view,
115+
/// and its read set will be updated accordingly.
116+
read_sets: CommittedReadSets,
75117
}
76118

77119
impl MemoryUsage for CommittedState {
@@ -83,13 +125,15 @@ impl MemoryUsage for CommittedState {
83125
index_id_map,
84126
page_pool: _,
85127
table_dropped,
128+
read_sets,
86129
} = self;
87130
// NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource.
88131
next_tx_offset.heap_usage()
89132
+ tables.heap_usage()
90133
+ blob_store.heap_usage()
91134
+ index_id_map.heap_usage()
92135
+ table_dropped.heap_usage()
136+
+ read_sets.heap_usage()
93137
}
94138
}
95139

@@ -152,6 +196,7 @@ impl CommittedState {
152196
blob_store: <_>::default(),
153197
index_id_map: <_>::default(),
154198
table_dropped: <_>::default(),
199+
read_sets: <_>::default(),
155200
page_pool,
156201
}
157202
}
@@ -614,10 +659,13 @@ impl CommittedState {
614659
tx_data.has_rows_or_connect_disconnect(ctx.reducer_context())
615660
}
616661

617-
pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
662+
pub(super) fn merge(&mut self, tx_state: TxState, read_sets: ViewReadSets, ctx: &ExecutionContext) -> TxData {
618663
let mut tx_data = TxData::default();
619664
let mut truncates = IntSet::default();
620665

666+
// Merge read sets from the `MutTxId` into the `CommittedState`
667+
self.merge_read_sets(read_sets);
668+
621669
// First, apply deletes. This will free up space in the committed tables.
622670
self.merge_apply_deletes(
623671
&mut tx_data,
@@ -648,6 +696,26 @@ impl CommittedState {
648696
tx_data
649697
}
650698

699+
fn merge_read_set(&mut self, view_id: ViewId, read_set: ReadSet) {
700+
for table_id in read_set.tables_scanned() {
701+
self.read_sets.view_scans_table(view_id, *table_id);
702+
}
703+
for (table_id, index_id, key) in read_set.index_keys_scanned() {
704+
if let Some(cols) = self
705+
.get_schema(*table_id)
706+
.map(|table_schema| table_schema.col_list_for_index_id(*index_id))
707+
{
708+
self.read_sets.view_reads_index_key(view_id, *table_id, cols, key);
709+
}
710+
}
711+
}
712+
713+
fn merge_read_sets(&mut self, read_sets: ViewReadSets) {
714+
for (view_id, read_set) in read_sets {
715+
self.merge_read_set(view_id, read_set);
716+
}
717+
}
718+
651719
fn merge_apply_deletes(
652720
&mut self,
653721
tx_data: &mut TxData,

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,6 +926,7 @@ impl MutTx for Locking {
926926
sequence_state_lock,
927927
tx_state: TxState::default(),
928928
lock_wait_time,
929+
read_sets: <_>::default(),
929930
timer,
930931
ctx,
931932
metrics,

0 commit comments

Comments
 (0)