Skip to content

Commit d533f9c

Browse files
authored
Shub/view interface (#3561)
# Description of Changes Addresses - #3487 (comment). Can be reviewed commit wise. 1. Provide `ModuleSubscriptions` a interface to invoke `ModuleHost::call_view`. 2. Handle `st_view_client table` insertion. 3. Make views available form sql.
1 parent ce0d15a commit d533f9c

File tree

16 files changed

+299
-176
lines changed

16 files changed

+299
-176
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/client-api/src/lib.rs

Lines changed: 45 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -72,65 +72,54 @@ impl Host {
7272
body: String,
7373
) -> axum::response::Result<Vec<SqlStmtResult<ProductValue>>> {
7474
let module_host = self
75-
.module()
75+
.host_controller
76+
.get_or_launch_module_host(database, self.replica_id)
7677
.await
7778
.map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?;
7879

79-
let (tx_offset, durable_offset, json) = self
80-
.host_controller
81-
.using_database(
82-
database,
83-
self.replica_id,
84-
move |db| -> axum::response::Result<_, (StatusCode, String)> {
85-
tracing::info!(sql = body);
86-
87-
// We need a header for query results
88-
let mut header = vec![];
89-
90-
let sql_start = std::time::Instant::now();
91-
let sql_span =
92-
tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,).entered();
93-
94-
let result = sql::execute::run(
95-
// Returns an empty result set for mutations
96-
db,
97-
&body,
98-
auth,
99-
Some(&module_host.info().subscriptions),
100-
&mut header,
101-
)
102-
.map_err(|e| {
103-
log::warn!("{e}");
104-
if let Some(auth_err) = e.get_auth_error() {
105-
(StatusCode::UNAUTHORIZED, auth_err.to_string())
106-
} else {
107-
(StatusCode::BAD_REQUEST, e.to_string())
108-
}
109-
})?;
110-
111-
let total_duration = sql_start.elapsed();
112-
sql_span.record("total_duration", tracing::field::debug(total_duration));
113-
114-
// Turn the header into a `ProductType`
115-
let schema = header
116-
.into_iter()
117-
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
118-
.collect();
119-
120-
Ok((
121-
result.tx_offset,
122-
db.durable_tx_offset(),
123-
vec![SqlStmtResult {
124-
schema,
125-
rows: result.rows,
126-
total_duration_micros: total_duration.as_micros() as u64,
127-
stats: SqlStmtStats::from_metrics(&result.metrics),
128-
}],
129-
))
130-
},
131-
)
132-
.await
133-
.map_err(log_and_500)??;
80+
tracing::info!(sql = body);
81+
// We need a header for query results
82+
let mut header = vec![];
83+
let sql_start = std::time::Instant::now();
84+
let sql_span = tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty).entered();
85+
let db = &module_host.module.replica_ctx().relational_db;
86+
87+
let result = sql::execute::run(
88+
// Returns an empty result set for mutations
89+
&db,
90+
&body,
91+
auth,
92+
Some(&module_host),
93+
auth.caller,
94+
&mut header,
95+
)
96+
.await
97+
.map_err(|e| {
98+
log::warn!("{e}");
99+
if let Some(auth_err) = e.get_auth_error() {
100+
(StatusCode::UNAUTHORIZED, auth_err.to_string())
101+
} else {
102+
(StatusCode::BAD_REQUEST, e.to_string())
103+
}
104+
})?;
105+
106+
let total_duration = sql_start.elapsed();
107+
sql_span.record("total_duration", tracing::field::debug(total_duration));
108+
109+
// Turn the header into a `ProductType`
110+
let schema = header
111+
.into_iter()
112+
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
113+
.collect();
114+
115+
let tx_offset = result.tx_offset;
116+
let durable_offset = db.durable_tx_offset();
117+
let json = vec![SqlStmtResult {
118+
schema,
119+
rows: result.rows,
120+
total_duration_micros: total_duration.as_micros() as u64,
121+
stats: SqlStmtStats::from_metrics(&result.metrics),
122+
}];
134123

135124
if confirmed_read {
136125
if let Some(mut durable_offset) = durable_offset {
@@ -141,7 +130,6 @@ impl Host {
141130

142131
Ok(json)
143132
}
144-
145133
pub async fn update(
146134
&self,
147135
database: Database,

crates/core/src/db/relational_db.rs

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::db::MetricsRecorderQueue;
2-
use crate::error::{DBError, DatabaseError, RestoreSnapshotError, ViewError};
2+
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
33
use crate::host::ArgsTuple;
44
use crate::messages::control_db::HostType;
55
use crate::subscription::ExecutionCounters;
@@ -14,16 +14,16 @@ use spacetimedb_commitlog as commitlog;
1414
use spacetimedb_commitlog::repo::OnNewSegmentFn;
1515
use spacetimedb_data_structures::map::IntSet;
1616
use spacetimedb_datastore::db_metrics::DB_METRICS;
17-
use spacetimedb_datastore::error::{DatastoreError, TableError};
17+
use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError};
1818
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
1919
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
2020
use spacetimedb_datastore::locking_tx_datastore::state_view::{
2121
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView,
2222
};
2323
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
24-
use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, StViewArgFields, StViewRow, ST_VIEW_ID};
24+
use spacetimedb_datastore::system_tables::ST_VIEW_ID;
25+
use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, StViewRow};
2526
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
26-
use spacetimedb_datastore::system_tables::{StViewArgRow, ST_VIEW_ARG_ID};
2727
use spacetimedb_datastore::traits::{
2828
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
2929
UpdateFlags,
@@ -1519,27 +1519,6 @@ impl RelationalDB {
15191519
})
15201520
}
15211521

1522-
/// Get or insert view argument into `ST_VIEW_ARG_ID`.
1523-
pub fn get_or_insert_st_view_arg(&self, tx: &mut MutTxId, args: &Bytes) -> Result<u64, DBError> {
1524-
let bytes_av = AlgebraicValue::Bytes(args.to_vec().into());
1525-
let mut rows = self.iter_by_col_eq_mut(tx, ST_VIEW_ARG_ID, [StViewArgFields::Bytes], &bytes_av)?;
1526-
1527-
// Extract the first matching `arg_id`, if any.
1528-
if let Some(res) = rows.next() {
1529-
let row = StViewArgRow::try_from(res).expect("valid StViewArgRow");
1530-
return Ok(row.id);
1531-
}
1532-
1533-
let view_arg_bytes = product![0u64, bytes_av]
1534-
.to_bsatn_vec()
1535-
.map_err(|_| ViewError::SerializeArgs)?;
1536-
1537-
let (_, view_arg_row, _) = self.insert(tx, ST_VIEW_ARG_ID, &view_arg_bytes)?;
1538-
let StViewArgRow { id: arg_id, .. } = view_arg_row.try_into().expect("valid StViewArgRow");
1539-
1540-
Ok(arg_id)
1541-
}
1542-
15431522
/// Evaluate and update View.
15441523
/// This involves:
15451524
/// 1. Serializing the view arguments into `ST_VIEW_ARG_ID`
@@ -1569,12 +1548,11 @@ impl RelationalDB {
15691548
let (table_id, is_anonymous) = (
15701549
st_view_row
15711550
.table_id
1572-
.ok_or_else(|| ViewError::ViewNotFound(view.to_string()))?,
1551+
.expect("Tables are always created for views upon view creation"),
15731552
st_view_row.is_anonymous,
15741553
);
15751554

1576-
// Insert the view arguments into ST_VIEW_ARG_ID
1577-
let arg_id = self.get_or_insert_st_view_arg(tx, &args.get_bsatn())?;
1555+
let arg_id = tx.get_or_insert_st_view_arg(&args.get_bsatn())?;
15781556

15791557
let input_rows = product![
15801558
if is_anonymous {
@@ -1598,7 +1576,7 @@ impl RelationalDB {
15981576
let seed = spacetimedb_sats::WithTypespace::new(typespace, &return_type);
15991577
let return_val = seed
16001578
.deserialize(bsatn::Deserializer::new(&mut &bytes[..]))
1601-
.map_err(|e| ViewError::DeserializeReturn(e.to_string()))?;
1579+
.map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?;
16021580

16031581
let products: Vec<ProductValue> = if return_type.is_array() {
16041582
let arr = return_val.into_array().expect("return type is array");
@@ -1607,7 +1585,7 @@ impl RelationalDB {
16071585
let opt = return_val.into_option().expect("return type is option");
16081586
Ok(opt.into_iter().map(|v| v.into_product().unwrap()).collect())
16091587
} else {
1610-
Err(ViewError::InvalidReturnType(return_type.clone()))
1588+
Err(DatastoreError::from(ViewError::InvalidReturnType(return_type.clone())))
16111589
}?;
16121590

16131591
// Insert all rows from the return value into the view table
@@ -1621,7 +1599,9 @@ impl RelationalDB {
16211599
elements: elements.into_boxed_slice(),
16221600
}
16231601
};
1624-
let row_bytes = row.to_bsatn_vec().map_err(|_| ViewError::SerializeRow)?;
1602+
let row_bytes = row
1603+
.to_bsatn_vec()
1604+
.map_err(|_| DatastoreError::from(ViewError::SerializeRow))?;
16251605
self.insert(tx, table_id, &row_bytes)?;
16261606
}
16271607

crates/core/src/error.rs

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use hex::FromHexError;
88
use spacetimedb_commitlog::repo::TxOffset;
99
use spacetimedb_durability::DurabilityExited;
1010
use spacetimedb_expr::errors::TypingError;
11-
use spacetimedb_lib::{AlgebraicType, Identity};
11+
use spacetimedb_lib::Identity;
1212
use spacetimedb_schema::error::ValidationErrors;
1313
use spacetimedb_snapshot::SnapshotError;
1414
use spacetimedb_table::table::ReadViaBsatnError;
@@ -147,8 +147,6 @@ pub enum DBError {
147147
RestoreSnapshot(#[from] RestoreSnapshotError),
148148
#[error(transparent)]
149149
DurabilityGone(#[from] DurabilityExited),
150-
#[error("ViewError: {0}")]
151-
View(#[from] ViewError),
152150
}
153151

154152
impl DBError {
@@ -192,28 +190,6 @@ impl<'a, T: ?Sized + 'a> From<PoisonError<std::sync::MutexGuard<'a, T>>> for DBE
192190
}
193191
}
194192

195-
#[derive(Error, Debug)]
196-
pub enum ViewError {
197-
#[error("view '{0}' not found")]
198-
ViewNotFound(String),
199-
#[error("failed to deserialize view arguments from row")]
200-
DeserializeArgs,
201-
#[error("failed to deserialize view return value: {0}")]
202-
DeserializeReturn(String),
203-
#[error("failed to serialize row to BSATN")]
204-
SerializeRow,
205-
#[error("invalid return type: expected Array or Option, got {0:?}")]
206-
InvalidReturnType(AlgebraicType),
207-
#[error("return type is Array but deserialized value is not Array")]
208-
TypeMismatchArray,
209-
#[error("return type is Option but deserialized value is not Option")]
210-
TypeMismatchOption,
211-
#[error("expected ProductValue in view result")]
212-
ExpectedProduct,
213-
#[error("failed to serialize view arguments")]
214-
SerializeArgs,
215-
}
216-
217193
#[derive(Debug, Error)]
218194
pub enum LogReplayError {
219195
#[error(

crates/core/src/host/host_controller.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
2727
use spacetimedb_data_structures::map::IntMap;
2828
use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS;
2929
use spacetimedb_datastore::db_metrics::DB_METRICS;
30-
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
3130
use spacetimedb_datastore::traits::Program;
3231
use spacetimedb_durability::{self as durability};
3332
use spacetimedb_lib::{hash_bytes, AlgebraicValue, Identity, Timestamp};
@@ -187,13 +186,6 @@ impl From<EventStatus> for ViewOutcome {
187186
}
188187
}
189188

190-
pub struct ViewCallResult {
191-
pub outcome: ViewOutcome,
192-
pub tx: MutTxId,
193-
pub energy_used: EnergyQuanta,
194-
pub execution_duration: Duration,
195-
}
196-
197189
#[derive(Clone, Debug)]
198190
pub struct ProcedureCallResult {
199191
pub return_val: AlgebraicValue,
@@ -893,8 +885,10 @@ impl Host {
893885
scheduler_starter.start(&module_host)?;
894886
let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
895887

888+
let module = watch::Sender::new(module_host);
889+
replica_ctx.subscriptions.init(module.subscribe());
896890
Ok(Host {
897-
module: watch::Sender::new(module_host),
891+
module,
898892
replica_ctx,
899893
scheduler,
900894
disk_metrics_recorder_task,

0 commit comments

Comments
 (0)