Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 45 additions & 57 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,65 +72,54 @@ impl Host {
body: String,
) -> axum::response::Result<Vec<SqlStmtResult<ProductValue>>> {
let module_host = self
.module()
.host_controller
.get_or_launch_module_host(database, self.replica_id)
.await
.map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?;

let (tx_offset, durable_offset, json) = self
.host_controller
.using_database(
database,
self.replica_id,
move |db| -> axum::response::Result<_, (StatusCode, String)> {
tracing::info!(sql = body);

// We need a header for query results
let mut header = vec![];

let sql_start = std::time::Instant::now();
let sql_span =
tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,).entered();

let result = sql::execute::run(
// Returns an empty result set for mutations
db,
&body,
auth,
Some(&module_host.info().subscriptions),
&mut header,
)
.map_err(|e| {
log::warn!("{e}");
if let Some(auth_err) = e.get_auth_error() {
(StatusCode::UNAUTHORIZED, auth_err.to_string())
} else {
(StatusCode::BAD_REQUEST, e.to_string())
}
})?;

let total_duration = sql_start.elapsed();
sql_span.record("total_duration", tracing::field::debug(total_duration));

// Turn the header into a `ProductType`
let schema = header
.into_iter()
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
.collect();

Ok((
result.tx_offset,
db.durable_tx_offset(),
vec![SqlStmtResult {
schema,
rows: result.rows,
total_duration_micros: total_duration.as_micros() as u64,
stats: SqlStmtStats::from_metrics(&result.metrics),
}],
))
},
)
.await
.map_err(log_and_500)??;
tracing::info!(sql = body);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sql::execute::run is async now, hence made this code fuctional from closure.

// We need a header for query results
let mut header = vec![];
let sql_start = std::time::Instant::now();
let sql_span = tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty).entered();
let db = &module_host.module.replica_ctx().relational_db;

let result = sql::execute::run(
// Returns an empty result set for mutations
&db,
&body,
auth,
Some(&module_host),
auth.caller,
&mut header,
)
.await
.map_err(|e| {
log::warn!("{e}");
if let Some(auth_err) = e.get_auth_error() {
(StatusCode::UNAUTHORIZED, auth_err.to_string())
} else {
(StatusCode::BAD_REQUEST, e.to_string())
}
})?;

let total_duration = sql_start.elapsed();
sql_span.record("total_duration", tracing::field::debug(total_duration));

// Turn the header into a `ProductType`
let schema = header
.into_iter()
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
.collect();

let tx_offset = result.tx_offset;
let durable_offset = db.durable_tx_offset();
let json = vec![SqlStmtResult {
schema,
rows: result.rows,
total_duration_micros: total_duration.as_micros() as u64,
stats: SqlStmtStats::from_metrics(&result.metrics),
}];

if confirmed_read {
if let Some(mut durable_offset) = durable_offset {
Expand All @@ -141,7 +130,6 @@ impl Host {

Ok(json)
}

pub async fn update(
&self,
database: Database,
Expand Down
42 changes: 11 additions & 31 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::db::MetricsRecorderQueue;
use crate::error::{DBError, DatabaseError, RestoreSnapshotError, ViewError};
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
use crate::host::ArgsTuple;
use crate::messages::control_db::HostType;
use crate::subscription::ExecutionCounters;
Expand All @@ -14,16 +14,16 @@ use spacetimedb_commitlog as commitlog;
use spacetimedb_commitlog::repo::OnNewSegmentFn;
use spacetimedb_data_structures::map::IntSet;
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::error::{DatastoreError, TableError};
use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError};
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::state_view::{
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView,
};
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, StViewArgFields, StViewRow, ST_VIEW_ID};
use spacetimedb_datastore::system_tables::ST_VIEW_ID;
use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, StViewRow};
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
use spacetimedb_datastore::system_tables::{StViewArgRow, ST_VIEW_ARG_ID};
use spacetimedb_datastore::traits::{
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
UpdateFlags,
Expand Down Expand Up @@ -1519,27 +1519,6 @@ impl RelationalDB {
})
}

/// Get or insert view argument into `ST_VIEW_ARG_ID`.
Copy link
Contributor Author

@Shubham8287 Shubham8287 Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to mut_tx.rs

pub fn get_or_insert_st_view_arg(&self, tx: &mut MutTxId, args: &Bytes) -> Result<u64, DBError> {
let bytes_av = AlgebraicValue::Bytes(args.to_vec().into());
let mut rows = self.iter_by_col_eq_mut(tx, ST_VIEW_ARG_ID, [StViewArgFields::Bytes], &bytes_av)?;

// Extract the first matching `arg_id`, if any.
if let Some(res) = rows.next() {
let row = StViewArgRow::try_from(res).expect("valid StViewArgRow");
return Ok(row.id);
}

let view_arg_bytes = product![0u64, bytes_av]
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeArgs)?;

let (_, view_arg_row, _) = self.insert(tx, ST_VIEW_ARG_ID, &view_arg_bytes)?;
let StViewArgRow { id: arg_id, .. } = view_arg_row.try_into().expect("valid StViewArgRow");

Ok(arg_id)
}

/// Evaluate and update View.
/// This involves:
/// 1. Serializing the view arguments into `ST_VIEW_ARG_ID`
Expand Down Expand Up @@ -1569,12 +1548,11 @@ impl RelationalDB {
let (table_id, is_anonymous) = (
st_view_row
.table_id
.ok_or_else(|| ViewError::ViewNotFound(view.to_string()))?,
.expect("Tables are always created for views upon view creation"),
st_view_row.is_anonymous,
);

// Insert the view arguments into ST_VIEW_ARG_ID
let arg_id = self.get_or_insert_st_view_arg(tx, &args.get_bsatn())?;
let arg_id = tx.get_or_insert_st_view_arg(&args.get_bsatn())?;

let input_rows = product![
if is_anonymous {
Expand All @@ -1598,7 +1576,7 @@ impl RelationalDB {
let seed = spacetimedb_sats::WithTypespace::new(typespace, &return_type);
let return_val = seed
.deserialize(bsatn::Deserializer::new(&mut &bytes[..]))
.map_err(|e| ViewError::DeserializeReturn(e.to_string()))?;
.map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?;

let products: Vec<ProductValue> = if return_type.is_array() {
let arr = return_val.into_array().expect("return type is array");
Expand All @@ -1607,7 +1585,7 @@ impl RelationalDB {
let opt = return_val.into_option().expect("return type is option");
Ok(opt.into_iter().map(|v| v.into_product().unwrap()).collect())
} else {
Err(ViewError::InvalidReturnType(return_type.clone()))
Err(DatastoreError::from(ViewError::InvalidReturnType(return_type.clone())))
}?;

// Insert all rows from the return value into the view table
Expand All @@ -1621,7 +1599,9 @@ impl RelationalDB {
elements: elements.into_boxed_slice(),
}
};
let row_bytes = row.to_bsatn_vec().map_err(|_| ViewError::SerializeRow)?;
let row_bytes = row
.to_bsatn_vec()
.map_err(|_| DatastoreError::from(ViewError::SerializeRow))?;
self.insert(tx, table_id, &row_bytes)?;
}

Expand Down
26 changes: 1 addition & 25 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use hex::FromHexError;
use spacetimedb_commitlog::repo::TxOffset;
use spacetimedb_durability::DurabilityExited;
use spacetimedb_expr::errors::TypingError;
use spacetimedb_lib::{AlgebraicType, Identity};
use spacetimedb_lib::Identity;
use spacetimedb_schema::error::ValidationErrors;
use spacetimedb_snapshot::SnapshotError;
use spacetimedb_table::table::ReadViaBsatnError;
Expand Down Expand Up @@ -147,8 +147,6 @@ pub enum DBError {
RestoreSnapshot(#[from] RestoreSnapshotError),
#[error(transparent)]
DurabilityGone(#[from] DurabilityExited),
#[error("ViewError: {0}")]
View(#[from] ViewError),
}

impl DBError {
Expand Down Expand Up @@ -192,28 +190,6 @@ impl<'a, T: ?Sized + 'a> From<PoisonError<std::sync::MutexGuard<'a, T>>> for DBE
}
}

#[derive(Error, Debug)]
pub enum ViewError {
#[error("view '{0}' not found")]
ViewNotFound(String),
#[error("failed to deserialize view arguments from row")]
DeserializeArgs,
#[error("failed to deserialize view return value: {0}")]
DeserializeReturn(String),
#[error("failed to serialize row to BSATN")]
SerializeRow,
#[error("invalid return type: expected Array or Option, got {0:?}")]
InvalidReturnType(AlgebraicType),
#[error("return type is Array but deserialized value is not Array")]
TypeMismatchArray,
#[error("return type is Option but deserialized value is not Option")]
TypeMismatchOption,
#[error("expected ProductValue in view result")]
ExpectedProduct,
#[error("failed to serialize view arguments")]
SerializeArgs,
}

#[derive(Debug, Error)]
pub enum LogReplayError {
#[error(
Expand Down
12 changes: 3 additions & 9 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS;
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
use spacetimedb_datastore::traits::Program;
use spacetimedb_durability::{self as durability};
use spacetimedb_lib::{hash_bytes, AlgebraicValue, Identity, Timestamp};
Expand Down Expand Up @@ -187,13 +186,6 @@ impl From<EventStatus> for ViewOutcome {
}
}

pub struct ViewCallResult {
pub outcome: ViewOutcome,
pub tx: MutTxId,
pub energy_used: EnergyQuanta,
pub execution_duration: Duration,
}

#[derive(Clone, Debug)]
pub struct ProcedureCallResult {
pub return_val: AlgebraicValue,
Expand Down Expand Up @@ -893,8 +885,10 @@ impl Host {
scheduler_starter.start(&module_host)?;
let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();

let module = watch::Sender::new(module_host);
replica_ctx.subscriptions.init(module.subscribe());
Ok(Host {
module: watch::Sender::new(module_host),
module,
replica_ctx,
scheduler,
disk_metrics_recorder_task,
Expand Down
Loading
Loading