diff --git a/Cargo.lock b/Cargo.lock index ed929d1e017..834c4b26ab6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7517,6 +7517,7 @@ version = "1.6.0" dependencies = [ "anyhow", "bigdecimal", + "bytes", "derive_more 0.99.20", "ethnum", "pretty_assertions", diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs index fd426bb154c..2bfcc19c79f 100644 --- a/crates/client-api/src/lib.rs +++ b/crates/client-api/src/lib.rs @@ -72,65 +72,54 @@ impl Host { body: String, ) -> axum::response::Result>> { let module_host = self - .module() + .host_controller + .get_or_launch_module_host(database, self.replica_id) .await .map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?; - let (tx_offset, durable_offset, json) = self - .host_controller - .using_database( - database, - self.replica_id, - move |db| -> axum::response::Result<_, (StatusCode, String)> { - tracing::info!(sql = body); - - // We need a header for query results - let mut header = vec![]; - - let sql_start = std::time::Instant::now(); - let sql_span = - tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,).entered(); - - let result = sql::execute::run( - // Returns an empty result set for mutations - db, - &body, - auth, - Some(&module_host.info().subscriptions), - &mut header, - ) - .map_err(|e| { - log::warn!("{e}"); - if let Some(auth_err) = e.get_auth_error() { - (StatusCode::UNAUTHORIZED, auth_err.to_string()) - } else { - (StatusCode::BAD_REQUEST, e.to_string()) - } - })?; - - let total_duration = sql_start.elapsed(); - sql_span.record("total_duration", tracing::field::debug(total_duration)); - - // Turn the header into a `ProductType` - let schema = header - .into_iter() - .map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name))) - .collect(); - - Ok(( - result.tx_offset, - db.durable_tx_offset(), - vec![SqlStmtResult { - schema, - rows: result.rows, - total_duration_micros: total_duration.as_micros() as u64, - stats: SqlStmtStats::from_metrics(&result.metrics), - }], - )) - }, - ) - .await - .map_err(log_and_500)??; + tracing::info!(sql = body); + // We need a header for query results + let mut header = vec![]; + let sql_start = std::time::Instant::now(); + let sql_span = tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty).entered(); + let db = &module_host.module.replica_ctx().relational_db; + + let result = sql::execute::run( + // Returns an empty result set for mutations + &db, + &body, + auth, + Some(&module_host), + auth.caller, + &mut header, + ) + .await + .map_err(|e| { + log::warn!("{e}"); + if let Some(auth_err) = e.get_auth_error() { + (StatusCode::UNAUTHORIZED, auth_err.to_string()) + } else { + (StatusCode::BAD_REQUEST, e.to_string()) + } + })?; + + let total_duration = sql_start.elapsed(); + sql_span.record("total_duration", tracing::field::debug(total_duration)); + + // Turn the header into a `ProductType` + let schema = header + .into_iter() + .map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name))) + .collect(); + + let tx_offset = result.tx_offset; + let durable_offset = db.durable_tx_offset(); + let json = vec![SqlStmtResult { + schema, + rows: result.rows, + total_duration_micros: total_duration.as_micros() as u64, + stats: SqlStmtStats::from_metrics(&result.metrics), + }]; if confirmed_read { if let Some(mut durable_offset) = durable_offset { @@ -141,7 +130,6 @@ impl Host { Ok(json) } - pub async fn update( &self, database: Database, diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index c6b2959c710..c9adf084efe 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,5 +1,5 @@ use crate::db::MetricsRecorderQueue; -use crate::error::{DBError, DatabaseError, RestoreSnapshotError, ViewError}; +use crate::error::{DBError, DatabaseError, RestoreSnapshotError}; use crate::host::ArgsTuple; use crate::messages::control_db::HostType; use crate::subscription::ExecutionCounters; @@ -14,16 +14,16 @@ use spacetimedb_commitlog as commitlog; use spacetimedb_commitlog::repo::OnNewSegmentFn; use spacetimedb_data_structures::map::IntSet; use spacetimedb_datastore::db_metrics::DB_METRICS; -use spacetimedb_datastore::error::{DatastoreError, TableError}; +use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError}; use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; -use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, StViewArgFields, StViewRow, ST_VIEW_ID}; +use spacetimedb_datastore::system_tables::ST_VIEW_ID; +use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, StViewRow}; use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID}; -use spacetimedb_datastore::system_tables::{StViewArgRow, ST_VIEW_ARG_ID}; use spacetimedb_datastore::traits::{ InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore, UpdateFlags, @@ -1519,27 +1519,6 @@ impl RelationalDB { }) } - /// Get or insert view argument into `ST_VIEW_ARG_ID`. - pub fn get_or_insert_st_view_arg(&self, tx: &mut MutTxId, args: &Bytes) -> Result { - let bytes_av = AlgebraicValue::Bytes(args.to_vec().into()); - let mut rows = self.iter_by_col_eq_mut(tx, ST_VIEW_ARG_ID, [StViewArgFields::Bytes], &bytes_av)?; - - // Extract the first matching `arg_id`, if any. - if let Some(res) = rows.next() { - let row = StViewArgRow::try_from(res).expect("valid StViewArgRow"); - return Ok(row.id); - } - - let view_arg_bytes = product![0u64, bytes_av] - .to_bsatn_vec() - .map_err(|_| ViewError::SerializeArgs)?; - - let (_, view_arg_row, _) = self.insert(tx, ST_VIEW_ARG_ID, &view_arg_bytes)?; - let StViewArgRow { id: arg_id, .. } = view_arg_row.try_into().expect("valid StViewArgRow"); - - Ok(arg_id) - } - /// Evaluate and update View. /// This involves: /// 1. Serializing the view arguments into `ST_VIEW_ARG_ID` @@ -1569,12 +1548,11 @@ impl RelationalDB { let (table_id, is_anonymous) = ( st_view_row .table_id - .ok_or_else(|| ViewError::ViewNotFound(view.to_string()))?, + .expect("Tables are always created for views upon view creation"), st_view_row.is_anonymous, ); - // Insert the view arguments into ST_VIEW_ARG_ID - let arg_id = self.get_or_insert_st_view_arg(tx, &args.get_bsatn())?; + let arg_id = tx.get_or_insert_st_view_arg(&args.get_bsatn())?; let input_rows = product![ if is_anonymous { @@ -1598,7 +1576,7 @@ impl RelationalDB { let seed = spacetimedb_sats::WithTypespace::new(typespace, &return_type); let return_val = seed .deserialize(bsatn::Deserializer::new(&mut &bytes[..])) - .map_err(|e| ViewError::DeserializeReturn(e.to_string()))?; + .map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?; let products: Vec = if return_type.is_array() { let arr = return_val.into_array().expect("return type is array"); @@ -1607,7 +1585,7 @@ impl RelationalDB { let opt = return_val.into_option().expect("return type is option"); Ok(opt.into_iter().map(|v| v.into_product().unwrap()).collect()) } else { - Err(ViewError::InvalidReturnType(return_type.clone())) + Err(DatastoreError::from(ViewError::InvalidReturnType(return_type.clone()))) }?; // Insert all rows from the return value into the view table @@ -1621,7 +1599,9 @@ impl RelationalDB { elements: elements.into_boxed_slice(), } }; - let row_bytes = row.to_bsatn_vec().map_err(|_| ViewError::SerializeRow)?; + let row_bytes = row + .to_bsatn_vec() + .map_err(|_| DatastoreError::from(ViewError::SerializeRow))?; self.insert(tx, table_id, &row_bytes)?; } diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index ece2657b5e8..84e321a261b 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -8,7 +8,7 @@ use hex::FromHexError; use spacetimedb_commitlog::repo::TxOffset; use spacetimedb_durability::DurabilityExited; use spacetimedb_expr::errors::TypingError; -use spacetimedb_lib::{AlgebraicType, Identity}; +use spacetimedb_lib::Identity; use spacetimedb_schema::error::ValidationErrors; use spacetimedb_snapshot::SnapshotError; use spacetimedb_table::table::ReadViaBsatnError; @@ -147,8 +147,6 @@ pub enum DBError { RestoreSnapshot(#[from] RestoreSnapshotError), #[error(transparent)] DurabilityGone(#[from] DurabilityExited), - #[error("ViewError: {0}")] - View(#[from] ViewError), } impl DBError { @@ -192,28 +190,6 @@ impl<'a, T: ?Sized + 'a> From>> for DBE } } -#[derive(Error, Debug)] -pub enum ViewError { - #[error("view '{0}' not found")] - ViewNotFound(String), - #[error("failed to deserialize view arguments from row")] - DeserializeArgs, - #[error("failed to deserialize view return value: {0}")] - DeserializeReturn(String), - #[error("failed to serialize row to BSATN")] - SerializeRow, - #[error("invalid return type: expected Array or Option, got {0:?}")] - InvalidReturnType(AlgebraicType), - #[error("return type is Array but deserialized value is not Array")] - TypeMismatchArray, - #[error("return type is Option but deserialized value is not Option")] - TypeMismatchOption, - #[error("expected ProductValue in view result")] - ExpectedProduct, - #[error("failed to serialize view arguments")] - SerializeArgs, -} - #[derive(Debug, Error)] pub enum LogReplayError { #[error( diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index fdbdf1a1f46..c73f66b3e97 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -27,7 +27,6 @@ use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS; use spacetimedb_datastore::db_metrics::DB_METRICS; -use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::traits::Program; use spacetimedb_durability::{self as durability}; use spacetimedb_lib::{hash_bytes, AlgebraicValue, Identity, Timestamp}; @@ -187,13 +186,6 @@ impl From for ViewOutcome { } } -pub struct ViewCallResult { - pub outcome: ViewOutcome, - pub tx: MutTxId, - pub energy_used: EnergyQuanta, - pub execution_duration: Duration, -} - #[derive(Clone, Debug)] pub struct ProcedureCallResult { pub return_val: AlgebraicValue, @@ -893,8 +885,10 @@ impl Host { scheduler_starter.start(&module_host)?; let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle(); + let module = watch::Sender::new(module_host); + replica_ctx.subscriptions.init(module.subscribe()); Ok(Host { - module: watch::Sender::new(module_host), + module, replica_ctx, scheduler, disk_metrics_recorder_task, diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 1ce140dc173..7cf269a9edf 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -10,7 +10,7 @@ use crate::energy::EnergyQuanta; use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::hash::Hash; -use crate::host::host_controller::ViewCallResult; +use crate::host::host_controller::ViewOutcome; use crate::host::{InvalidFunctionArguments, InvalidViewArguments}; use crate::identity::Identity; use crate::messages::control_db::{Database, HostType}; @@ -37,6 +37,7 @@ use spacetimedb_auth::identity::ConnectionAuthCtx; use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate}; use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; +use spacetimedb_datastore::error::DatastoreError; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_CLIENT_ID}; @@ -541,7 +542,6 @@ pub struct CallViewParams { pub timestamp: Timestamp, pub caller_identity: Identity, pub caller_connection_id: Option, - pub timer: Option, pub view_id: ViewId, pub args: ArgsTuple, /// The expected return type of the view, used for deserialization. @@ -712,6 +712,13 @@ pub enum ReducerCallError { LifecycleReducer(Lifecycle), } +pub struct ViewCallResult { + pub outcome: ViewOutcome, + pub tx: MutTxId, + pub energy_used: EnergyQuanta, + pub execution_duration: Duration, +} + #[derive(thiserror::Error, Debug)] pub enum ViewCallError { #[error(transparent)] @@ -720,6 +727,10 @@ pub enum ViewCallError { NoSuchModule(#[from] NoSuchModule), #[error("no such view")] NoSuchView, + #[error("missing client connection for view call trigged by subscription")] + MissingClientConnection, + #[error("DB error during view call: {0}")] + DatastoreError(#[from] DatastoreError), } #[derive(thiserror::Error, Debug)] @@ -1453,11 +1464,10 @@ impl ModuleHost { pub async fn call_view( &self, tx: MutTxId, - caller_identity: Identity, - caller_connection_id: Option, - timer: Option, view_name: &str, args: FunctionArgs, + caller_identity: Identity, + caller_connection_id: Option, ) -> Result { let (view_id, view_def) = self .info @@ -1466,12 +1476,45 @@ impl ModuleHost { .ok_or(ViewCallError::NoSuchView)?; let view_seed = ArgsSeed(self.info.module_def.typespace().with_type(view_def)); - //TODO: can we put the args into `ST_VIEW_ARG` table at this point? let args = args.into_tuple(view_seed).map_err(InvalidViewArguments)?; + + let res = self + .call_view_inner( + tx, + view_id, + view_def, + args.clone(), + caller_identity, + caller_connection_id, + ) + .await; + + let log_message = match &res { + Err(ViewCallError::NoSuchView) => Some(no_such_function_log_message("view", view_name)), + Err(ViewCallError::Args(_)) => Some(args_error_log_message("view", view_name)), + _ => None, + }; + + if let Some(log_message) = log_message { + self.inject_logs(LogLevel::Error, view_name, &log_message) + } + + res + } + + async fn call_view_inner( + &self, + tx: MutTxId, + view_id: ViewId, + view_def: &ViewDef, + args: ArgsTuple, + caller_identity: Identity, + caller_connection_id: Option, + ) -> Result { let return_type = view_def.return_type.clone(); let is_anonymous = view_def.is_anonymous; - let res = Ok(self + Ok(self .call(&view_def.name, move |inst| { inst.call_view( tx, @@ -1479,7 +1522,6 @@ impl ModuleHost { timestamp: Timestamp::now(), caller_identity, caller_connection_id, - timer, view_id, args, return_type, @@ -1487,19 +1529,7 @@ impl ModuleHost { }, ) }) - .await?); - - let log_message = match &res { - Err(ViewCallError::NoSuchView) => Some(no_such_function_log_message("view", view_name)), - Err(ViewCallError::Args(_)) => Some(args_error_log_message("view", view_name)), - _ => None, - }; - - if let Some(log_message) = log_message { - self.inject_logs(LogLevel::Error, view_name, &log_message) - } - - res + .await?) } pub fn subscribe_to_logs(&self) -> anyhow::Result> { diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index b90d3d4da7b..03b5021ad3e 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -14,9 +14,10 @@ use super::instrumentation::CallTimes; use crate::client::ClientConnectionSender; use crate::database_logger; use crate::energy::{EnergyMonitor, FunctionBudget, FunctionFingerprint}; -use crate::host::host_controller::{ViewCallResult, ViewOutcome}; +use crate::host::host_controller::ViewOutcome; use crate::host::instance_env::InstanceEnv; use crate::host::module_common::{build_common_module_from_raw, ModuleCommon}; +use crate::host::module_host::ViewCallResult; use crate::host::module_host::{ CallProcedureParams, CallReducerParams, CallViewParams, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, @@ -725,7 +726,7 @@ impl InstanceCommon { } } - pub(crate) fn call_function( + fn call_function( &mut self, caller_identity: Identity, function_name: &str, diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 352b7381837..7b97a708252 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -10,12 +10,12 @@ use crate::host::AbiCall; use anyhow::Context as _; use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::locking_tx_datastore::UniqueView; -use spacetimedb_lib::{ConnectionId, Identity, Timestamp}; +use spacetimedb_lib::{ConnectionId, Timestamp}; use spacetimedb_primitives::{errno, ColId}; use std::future::Future; use std::num::NonZeroU32; use std::time::Instant; -use wasmtime::{AsContext, Caller, FuncType, StoreContextMut}; +use wasmtime::{AsContext, Caller, StoreContextMut}; /// A stream of bytes which the WASM module can read from /// using [`WasmInstanceEnv::bytes_source_read`]. diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 089148cdd2e..0ded6d8eb64 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -7,7 +7,7 @@ use crate::energy::EnergyQuanta; use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall}; -use crate::host::ArgsTuple; +use crate::host::{ArgsTuple, ModuleHost}; use crate::subscription::module_subscription_actor::{ModuleSubscriptions, WriteConflict}; use crate::subscription::module_subscription_manager::TransactionOffset; use crate::subscription::tx::DeltaTx; @@ -20,8 +20,8 @@ use spacetimedb_datastore::traits::IsolationLevel; use spacetimedb_expr::statement::Statement; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::metrics::ExecutionMetrics; -use spacetimedb_lib::Timestamp; use spacetimedb_lib::{AlgebraicType, ProductType, ProductValue}; +use spacetimedb_lib::{Identity, Timestamp}; use spacetimedb_query::{compile_sql_stmt, execute_dml_stmt, execute_select_stmt}; use spacetimedb_schema::relation::FieldName; use spacetimedb_vm::eval::run_ast; @@ -186,21 +186,50 @@ pub struct SqlResult { } /// Run the `SQL` string using the `auth` credentials -pub fn run( +pub async fn run( db: &RelationalDB, sql_text: &str, auth: AuthCtx, - subs: Option<&ModuleSubscriptions>, + module: Option<&ModuleHost>, + caller_identity: Identity, head: &mut Vec<(Box, AlgebraicType)>, ) -> Result { // We parse the sql statement in a mutable transaction. // If it turns out to be a query, we downgrade the tx. - let (tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| { + let (mut tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| { compile_sql_stmt(sql_text, &SchemaViewer::new(tx, &auth), &auth) })?; let mut metrics = ExecutionMetrics::default(); + for (view_name, args) in stmt.views() { + let (is_memoized, args) = tx + .is_materialized(view_name, args, caller_identity) + .map_err(|e| DBError::Other(anyhow!("Failed to check memoized view: {e}")))?; + + // Skip if already memoized + if is_memoized { + continue; + } + + let module = module + .as_ref() + .ok_or_else(|| anyhow!("Cannot execute view `{view_name}` without module context"))?; + + let res = module + .call_view( + tx, + view_name, + crate::host::FunctionArgs::Bsatn(args), + caller_identity, + None, + ) + .await + .map_err(|e| DBError::Other(anyhow!("Failed to execute view `{view_name}`: {e}")))?; + + tx = res.tx; + } + match stmt { Statement::Select(stmt) => { // Up to this point, the tx has been read-only, @@ -260,7 +289,7 @@ pub fn run( tx.metrics.merge(metrics); // Commit the tx if there are no deltas to process - if subs.is_none() { + if module.is_none() { let metrics = tx.metrics; return db.commit_tx(tx).map(|tx_opt| { let (tx_offset, tx_data, tx_metrics, reducer) = tx_opt.unwrap(); @@ -281,8 +310,10 @@ pub fn run( // Note, we get the delta by downgrading the tx. // Hence we just pass a default `DatabaseUpdate` here. // It will ultimately be replaced with the correct one. - match subs + match module .unwrap() + .info + .subscriptions .commit_and_broadcast_event( None, ModuleEvent { diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 1c1f492b285..9014e10022e 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -15,6 +15,7 @@ use crate::db::relational_db::{MutTx, RelationalDB, Tx}; use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent}; +use crate::host::{FunctionArgs, ModuleHost}; use crate::messages::websocket::Subscribe; use crate::subscription::execute_plans; use crate::subscription::query::is_subscribe_to_all_tables; @@ -31,15 +32,16 @@ use spacetimedb_client_api_messages::websocket::{ use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; -use spacetimedb_datastore::locking_tx_datastore::TxId; +use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; use spacetimedb_datastore::traits::TxData; use spacetimedb_durability::TxOffset; use spacetimedb_execution::pipelined::PipelinedProject; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::Identity; +use std::sync::OnceLock; use std::{sync::Arc, time::Instant}; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, watch}; type Subscriptions = Arc>; @@ -52,6 +54,7 @@ pub struct ModuleSubscriptions { broadcast_queue: BroadcastQueue, owner_identity: Identity, stats: Arc, + module_rx: OnceLock>, } #[derive(Debug, Clone)] @@ -190,9 +193,37 @@ impl ModuleSubscriptions { broadcast_queue, owner_identity, stats, + module_rx: OnceLock::new(), } } + pub fn init(&self, module_host: watch::Receiver) { + self.module_rx + .set(module_host) + .expect("ModuleSubscriptions::init called twice"); + } + + #[allow(dead_code)] + async fn call_view( + &self, + tx: MutTxId, + view_name: &str, + args: FunctionArgs, + sender: Arc, + ) -> Result<(), DBError> { + let module_host_rx = self + .module_rx + .get() + .expect("ModuleSubscriptions::init not called before call_view"); + let module_host = module_host_rx.borrow(); + + let _result = module_host + .call_view(tx, view_name, args, sender.id.identity, Some(sender.id.connection_id)) + .await; + + // TODO: Handle result + Ok(()) + } /// Construct a new [`ModuleSubscriptions`] for use in testing, /// creating a new [`tokio::runtime::Runtime`] to run its send worker. pub fn for_test_new_runtime(db: Arc) -> (ModuleSubscriptions, tokio::runtime::Runtime) { diff --git a/crates/datastore/src/error.rs b/crates/datastore/src/error.rs index 3a571f7d579..936e077fd28 100644 --- a/crates/datastore/src/error.rs +++ b/crates/datastore/src/error.rs @@ -29,10 +29,36 @@ pub enum DatastoreError { // TODO(cloutiertyler): should this be a TableError? I couldn't get it to compile #[error("Error reading a value from a table through BSATN: {0}")] ReadViaBsatnError(#[from] ReadViaBsatnError), + + #[error("ViewError: {0}")] + View(#[from] ViewError), + #[error(transparent)] Other(#[from] anyhow::Error), } +#[derive(Error, Debug)] +pub enum ViewError { + #[error("view '{0}' not found")] + ViewNotFound(String), + #[error("failed to deserialize view arguments from row")] + DeserializeArgs, + #[error("failed to deserialize view return value: {0}")] + DeserializeReturn(String), + #[error("failed to serialize row to BSATN")] + SerializeRow, + #[error("invalid return type: expected Array or Option, got {0:?}")] + InvalidReturnType(AlgebraicType), + #[error("return type is Array but deserialized value is not Array")] + TypeMismatchArray, + #[error("return type is Option but deserialized value is not Option")] + TypeMismatchOption, + #[error("expected ProductValue in view result")] + ExpectedProduct, + #[error("failed to serialize view arguments")] + SerializeArgs, +} + #[derive(Error, Debug, EnumAsInner)] pub enum TableError { #[error("Table with name `{0}` start with 'st_' and that is reserved for internal system tables.")] diff --git a/crates/datastore/src/execution_context.rs b/crates/datastore/src/execution_context.rs index fc7647143d1..f363a5aa8f5 100644 --- a/crates/datastore/src/execution_context.rs +++ b/crates/datastore/src/execution_context.rs @@ -97,7 +97,6 @@ pub enum Workload { #[cfg(any(test, feature = "test"))] ForTests, Reducer(ReducerContext), - View, Sql, Subscribe, Unsubscribe, @@ -116,7 +115,6 @@ impl Workload { Self::Unsubscribe => WorkloadType::Unsubscribe, Self::Update => WorkloadType::Update, Self::Internal => WorkloadType::Internal, - Self::View => WorkloadType::View, } } } @@ -164,7 +162,6 @@ impl ExecutionContext { Workload::Subscribe => Self::new(database, None, WorkloadType::Subscribe), Workload::Unsubscribe => Self::new(database, None, WorkloadType::Unsubscribe), Workload::Update => Self::new(database, None, WorkloadType::Update), - Workload::View => Self::new(database, None, WorkloadType::View), } } diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 3aaae6d4098..8a6580ab583 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -98,7 +98,7 @@ impl CommittedReadSets { /// Returns true if the given view exists in any read set. /// This is used to determine whether a view needs to be re-evaluated. - fn has_memoized_view(&self, view: &UniqueView) -> bool { + fn is_materialized(&self, view: &UniqueView) -> bool { self.tables.values().any(|views| views.contains(view)) || self.index_keys.values().any(|col_map| { col_map @@ -1061,8 +1061,8 @@ impl CommittedState { .set(self.blob_store.bytes_used_by_blobs() as _); } - pub(super) fn has_memoized_view(&self, view: &UniqueView) -> bool { - self.read_sets.has_memoized_view(view) + pub(super) fn is_materialized(&self, view: &UniqueView) -> bool { + self.read_sets.is_materialized(view) } } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index a7b9096d7d8..c6656a8452f 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -8,12 +8,16 @@ use super::{ tx_state::{IndexIdMap, PendingSchemaChange, TxState, TxTableForInsertion}, SharedMutexGuard, SharedWriteGuard, }; -use crate::system_tables::{ - system_tables, ConnectionIdViaU128, IdentityViaU256, StConnectionCredentialsFields, StConnectionCredentialsRow, - StViewClientFields, StViewClientRow, StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, - ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_CLIENT_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, -}; use crate::traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags}; +use crate::{ + error::ViewError, + system_tables::{ + system_tables, ConnectionIdViaU128, IdentityViaU256, StConnectionCredentialsFields, StConnectionCredentialsRow, + StViewArgFields, StViewArgRow, StViewClientFields, StViewClientRow, StViewColumnFields, StViewFields, + StViewParamFields, StViewParamRow, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_ARG_ID, ST_VIEW_CLIENT_ID, + ST_VIEW_COLUMN_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, + }, +}; use crate::{ error::{IndexError, SequenceError, TableError}, system_tables::{ @@ -34,7 +38,7 @@ use smallvec::SmallVec; use spacetimedb_data_structures::map::{IntMap, IntSet}; use spacetimedb_durability::TxOffset; use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row}; -use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics}; +use spacetimedb_lib::{bsatn::ToBsatn as _, db::raw_def::v9::RawSql, metrics::ExecutionMetrics}; use spacetimedb_lib::{ db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP}, ConnectionId, Identity, @@ -45,6 +49,7 @@ use spacetimedb_primitives::{ use spacetimedb_sats::{ bsatn::{self, to_writer, DecodeError, Deserializer}, de::{DeserializeSeed, WithBound}, + product, ser::Serialize, AlgebraicType, AlgebraicValue, ProductType, ProductValue, WithTypespace, }; @@ -141,6 +146,10 @@ impl UniqueView { args, } } + + pub fn into_args(self) -> Bytes { + self.args + } } pub type ViewReadSets = HashMap; @@ -731,24 +740,29 @@ impl MutTxId { Ok((tx, commit)) } - /// Check if a memoized view exists for the given view name, args, and sender identity. - /// if not, [`RelationalDB::evaluate_view`] should be called to compute and store it. - fn has_memoized_view(&self, view_name: &str, args: Bytes, sender: Identity) -> Result<(bool, Bytes)> { + /// Checks whether a memoized view exists for the given view name, arguments, and sender identity. + /// + /// If view is not materialized, [`RelationalDB::evaluate_view`] should be called to compute and store it. + /// + /// - `view_name`: The name of the view to look up. + /// - `args`: The serialized (bastn-encoded) arguments for the view. + /// - `sender`: The identity of the sender requesting the view. + pub fn is_materialized(&self, view_name: &str, args: Bytes, sender: Identity) -> Result<(bool, Bytes)> { let (view_id, is_anonymous) = self .view_from_name(view_name)? .map(|view_row| (view_row.view_id, view_row.is_anonymous)) .ok_or_else(|| anyhow::anyhow!("view `{view_name}` not found"))?; let unique_view = if is_anonymous { - UniqueView::anonymous(view_id, args.clone()) + UniqueView::anonymous(view_id, args) } else { - UniqueView::with_identity(sender, view_id, args.clone()) + UniqueView::with_identity(sender, view_id, args) }; - let has_memoized = self.read_sets.contains_key(&unique_view) - || self.committed_state_write_lock.has_memoized_view(&unique_view); + let is_materialized = + self.read_sets.contains_key(&unique_view) || self.committed_state_write_lock.is_materialized(&unique_view); - Ok((has_memoized, args)) + Ok((is_materialized, unique_view.into_args())) } } @@ -1859,6 +1873,51 @@ impl MutTxId { .collect() } + /// Get or insert view argument into `ST_VIEW_ARG_ID`. + pub fn get_or_insert_st_view_arg(&mut self, args: &Bytes) -> Result { + let bytes_av = AlgebraicValue::Bytes(args.to_vec().into()); + let mut rows = self.iter_by_col_eq(ST_VIEW_ARG_ID, [StViewArgFields::Bytes], &bytes_av)?; + + // Extract the first matching `arg_id`, if any. + if let Some(res) = rows.next() { + let row = StViewArgRow::try_from(res).expect("valid StViewArgRow"); + return Ok(row.id); + } + + let view_arg_bytes = product![0u64, bytes_av] + .to_bsatn_vec() + .expect("StViewArgRow serialization to never fail"); + + let (_, view_arg_row, _) = self.insert_via_serialize_bsatn(ST_VIEW_ARG_ID, &view_arg_bytes)?; + let StViewArgRow { id: arg_id, .. } = view_arg_row.collapse().try_into().expect("valid StViewArgRow"); + + Ok(arg_id) + } + + pub fn insert_st_view_client( + &mut self, + view: &str, + args: &Bytes, + sender: Identity, + connection_id: ConnectionId, + ) -> Result { + let view_id = self + .view_id_from_name(view)? + .ok_or_else(|| ViewError::ViewNotFound(view.to_string()))?; + + let arg_id = self.get_or_insert_st_view_arg(args)?; + + let row = &StViewClientRow { + view_id, + arg_id, + identity: IdentityViaU256(sender), + connection_id: ConnectionIdViaU128(connection_id), + }; + self.insert_via_serialize_bsatn(ST_VIEW_CLIENT_ID, row)?; + + Ok(arg_id) + } + /// Is anyone is subscribed to the view arguments identified by `arg_id` fn is_identity_subscribed_to_view_args(&self, view_id: ViewId, arg_id: u64, sender: Identity) -> Result { Ok(self diff --git a/crates/expr/Cargo.toml b/crates/expr/Cargo.toml index afc5fb9f401..4997aa60382 100644 --- a/crates/expr/Cargo.toml +++ b/crates/expr/Cargo.toml @@ -17,6 +17,7 @@ spacetimedb-primitives.workspace = true spacetimedb-sats.workspace = true spacetimedb-schema.workspace = true spacetimedb-sql-parser.workspace = true +bytes.workspace = true [dev-dependencies] pretty_assertions.workspace = true diff --git a/crates/expr/src/statement.rs b/crates/expr/src/statement.rs index 50e9bdb4c22..b265432995e 100644 --- a/crates/expr/src/statement.rs +++ b/crates/expr/src/statement.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use bytes::Bytes; use spacetimedb_lib::{identity::AuthCtx, st_var::StVarValue, AlgebraicType, AlgebraicValue, ProductValue}; use spacetimedb_primitives::{ColId, TableId}; use spacetimedb_schema::schema::{ColumnSchema, TableSchema}; @@ -31,6 +32,13 @@ pub enum Statement { DML(DML), } +impl Statement { + pub fn views(&self) -> Vec<(&str, Bytes)> { + //TODO: implement view name extraction + todo!() + } +} + pub enum DML { Insert(TableInsert), Update(TableUpdate),