Skip to content

Commit 8118dd8

Browse files
Add query engine support for mutable transactions (#3524)
# Description of Changes <!-- Please describe your change, mention any related tickets, and so on here. --> The query engine can now execute queries using mutable transactions because this patch implements the `Datastore` trait for `MutTxId`. Note this is both a refactor and a feature patch. It is a refactor because the `Datastore` trait was updated to allow for mutable transactions. # API and ABI breaking changes <!-- If this is an API or ABI breaking change, please apply the corresponding GitHub label. --> None # Expected complexity level and risk 1.5 <!-- How complicated do you think these changes are? Grade on a scale from 1 to 5, where 1 is a trivial change, and 5 is a deep-reaching and complex change. This complexity rating applies not only to the complexity apparent in the diff, but also to its interactions with existing and future code. If you answered more than a 2, explain what is complex about the PR, and what other components it interacts with in potentially concerning ways. --> # Testing <!-- Describe any testing you've done, and any testing you'd like your reviewers to do, so that you're confident that all the changes work as expected! --> - [x] Current tests pass after refactor - [ ] Use `MutTxId` for query execution
1 parent 798852e commit 8118dd8

File tree

8 files changed

+145
-1166
lines changed

8 files changed

+145
-1166
lines changed

crates/core/src/subscription/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,7 @@ where
174174
.filter(|(_, plan)| {
175175
// Since subscriptions only support selects and inner joins,
176176
// we filter out any plans that read from an empty table.
177-
plan.table_ids()
178-
.all(|table_id| tx.table(table_id).is_some_and(|t| t.row_count > 0))
177+
plan.table_ids().all(|table_id| tx.row_count(table_id) > 0)
179178
})
180179
.map(|(sql, plan)| (sql, plan, plan.subscribed_table_id(), plan.subscribed_table_name()))
181180
.map(|(sql, plan, table_id, table_name)| {

crates/core/src/subscription/tx.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use smallvec::SmallVec;
1010
use spacetimedb_execution::{Datastore, DeltaStore, Row};
1111
use spacetimedb_lib::{query::Delta, AlgebraicValue, ProductValue};
1212
use spacetimedb_primitives::{IndexId, TableId};
13-
use spacetimedb_table::{blob_store::BlobStore, table::Table};
13+
use spacetimedb_table::table::{IndexScanRangeIter, TableScanIter};
1414

1515
use spacetimedb_datastore::{
1616
locking_tx_datastore::{state_view::StateView, TxId},
@@ -119,12 +119,31 @@ impl<'a> From<&'a TxId> for DeltaTx<'a> {
119119
}
120120

121121
impl Datastore for DeltaTx<'_> {
122-
fn table(&self, table_id: TableId) -> Option<&Table> {
123-
self.tx.table(table_id)
122+
type TableIter<'a>
123+
= TableScanIter<'a>
124+
where
125+
Self: 'a;
126+
127+
type IndexIter<'a>
128+
= IndexScanRangeIter<'a>
129+
where
130+
Self: 'a;
131+
132+
fn row_count(&self, table_id: TableId) -> u64 {
133+
self.tx.row_count(table_id)
124134
}
125135

126-
fn blob_store(&self) -> &dyn BlobStore {
127-
self.tx.blob_store()
136+
fn table_scan<'a>(&'a self, table_id: TableId) -> anyhow::Result<Self::TableIter<'a>> {
137+
self.tx.table_scan(table_id)
138+
}
139+
140+
fn index_scan<'a>(
141+
&'a self,
142+
table_id: TableId,
143+
index_id: IndexId,
144+
range: &impl RangeBounds<AlgebraicValue>,
145+
) -> anyhow::Result<Self::IndexIter<'a>> {
146+
self.tx.index_scan(table_id, index_id, range)
128147
}
129148
}
130149

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2662,7 +2662,8 @@ mod tests {
26622662
let table_id = datastore.create_table_mut_tx(&mut tx, table_schema)?;
26632663
let index_id = datastore.index_id_from_name_mut_tx(&tx, "index")?.unwrap();
26642664
let find_row_by_key = |tx: &MutTxId, key: u32| {
2665-
tx.index_scan_point(table_id, index_id, &key.into())
2665+
let key: AlgebraicValue = key.into();
2666+
tx.index_scan(table_id, index_id, &key)
26662667
.unwrap()
26672668
.map(|row| row.pointer())
26682669
.collect::<Vec<_>>()
@@ -3441,7 +3442,8 @@ mod tests {
34413442
let mut tx = begin_mut_tx(&datastore);
34423443
assert_eq!(tx.get_schema(table_id).unwrap().columns, columns_original);
34433444
let index_key_types = |tx: &MutTxId| {
3444-
tx.table(table_id)
3445+
tx.committed_state_write_lock
3446+
.get_table(table_id)
34453447
.unwrap()
34463448
.indexes
34473449
.values()
@@ -3570,7 +3572,7 @@ mod tests {
35703572
.map(|row| row.to_product_value())
35713573
.collect::<Vec<_>>();
35723574
assert_eq!(rows, old_rows, "Rows shouldn't be changed if rolledback");
3573-
let table = tx.table(rollback_table_id);
3575+
let table = tx.table_name(rollback_table_id);
35743576
assert!(table.is_none(), "new table shouldn't be created if rolledback");
35753577

35763578
// Add column and actually commit this time.

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,43 @@ impl MutTxId {
165165
}
166166

167167
impl Datastore for MutTxId {
168-
fn blob_store(&self) -> &dyn BlobStore {
169-
&self.committed_state_write_lock.blob_store
168+
type TableIter<'a>
169+
= IterMutTx<'a>
170+
where
171+
Self: 'a;
172+
173+
type IndexIter<'a>
174+
= IndexScanRanged<'a>
175+
where
176+
Self: 'a;
177+
178+
fn row_count(&self, table_id: TableId) -> u64 {
179+
self.table_row_count(table_id).unwrap_or_default()
180+
}
181+
182+
fn table_scan<'a>(&'a self, table_id: TableId) -> anyhow::Result<Self::TableIter<'a>> {
183+
Ok(self.iter(table_id)?)
170184
}
171185

172-
fn table(&self, table_id: TableId) -> Option<&Table> {
173-
self.committed_state_write_lock.get_table(table_id)
186+
fn index_scan<'a>(
187+
&'a self,
188+
table_id: TableId,
189+
index_id: IndexId,
190+
range: &impl RangeBounds<AlgebraicValue>,
191+
) -> anyhow::Result<Self::IndexIter<'a>> {
192+
// Extract the table id, and commit/tx indices.
193+
let (_, commit_index, tx_index) = self
194+
.get_table_and_index(index_id)
195+
.ok_or_else(|| IndexError::NotFound(index_id))?;
196+
197+
// Get an index seek iterator for the tx and committed state.
198+
let tx_iter = tx_index.map(|i| i.seek_range(range));
199+
let commit_iter = commit_index.seek_range(range);
200+
201+
let dt = self.tx_state.get_delete_table(table_id);
202+
let iter = combine_range_index_iters(dt, tx_iter, commit_iter);
203+
204+
Ok(iter)
174205
}
175206
}
176207

crates/datastore/src/locking_tx_datastore/tx.rs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ use crate::locking_tx_datastore::state_view::IterTx;
99
use spacetimedb_durability::TxOffset;
1010
use spacetimedb_execution::Datastore;
1111
use spacetimedb_lib::metrics::ExecutionMetrics;
12-
use spacetimedb_primitives::{ColList, TableId};
12+
use spacetimedb_primitives::{ColList, IndexId, TableId};
1313
use spacetimedb_sats::AlgebraicValue;
1414
use spacetimedb_schema::schema::TableSchema;
15-
use spacetimedb_table::blob_store::BlobStore;
16-
use spacetimedb_table::table::Table;
15+
use spacetimedb_table::table::{IndexScanRangeIter, TableScanIter};
1716
use std::sync::Arc;
1817
use std::{future, num::NonZeroU64};
1918
use std::{
@@ -33,12 +32,44 @@ pub struct TxId {
3332
}
3433

3534
impl Datastore for TxId {
36-
fn blob_store(&self) -> &dyn BlobStore {
37-
&self.committed_state_shared_lock.blob_store
35+
type TableIter<'a>
36+
= TableScanIter<'a>
37+
where
38+
Self: 'a;
39+
40+
type IndexIter<'a>
41+
= IndexScanRangeIter<'a>
42+
where
43+
Self: 'a;
44+
45+
fn row_count(&self, table_id: TableId) -> u64 {
46+
self.committed_state_shared_lock
47+
.table_row_count(table_id)
48+
.unwrap_or_default()
3849
}
3950

40-
fn table(&self, table_id: TableId) -> Option<&Table> {
41-
self.committed_state_shared_lock.get_table(table_id)
51+
fn table_scan<'a>(&'a self, table_id: TableId) -> anyhow::Result<Self::TableIter<'a>> {
52+
self.committed_state_shared_lock
53+
.get_table(table_id)
54+
.map(|table| table.scan_rows(&self.committed_state_shared_lock.blob_store))
55+
.ok_or_else(|| anyhow::anyhow!("TableId `{table_id}` does not exist"))
56+
}
57+
58+
fn index_scan<'a>(
59+
&'a self,
60+
table_id: TableId,
61+
index_id: IndexId,
62+
range: &impl RangeBounds<AlgebraicValue>,
63+
) -> anyhow::Result<Self::IndexIter<'a>> {
64+
self.committed_state_shared_lock
65+
.get_table(table_id)
66+
.ok_or_else(|| anyhow::anyhow!("TableId `{table_id}` does not exist"))
67+
.and_then(|table| {
68+
table
69+
.get_index_by_id_with_table(&self.committed_state_shared_lock.blob_store, index_id)
70+
.map(|i| i.seek_range(range))
71+
.ok_or_else(|| anyhow::anyhow!("IndexId `{index_id}` does not exist"))
72+
})
4273
}
4374
}
4475

0 commit comments

Comments
 (0)