Skip to content

Commit 31a8d5f

Browse files
Remove the middle man thread the JS worker thread (#3577)
# Description of Changes This makes us go from 3 threads to 2. The next step is to core pin the V8 worker thread. # API and ABI breaking changes None # Expected complexity level and risk 4 # Testing Existing tests should cover this. --------- Co-authored-by: Noa <[email protected]>
1 parent dedbfb4 commit 31a8d5f

File tree

10 files changed

+871
-534
lines changed

10 files changed

+871
-534
lines changed

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ enum-map = "2.6.3"
175175
env_logger = "0.10"
176176
ethnum = { version = "1.5.0", features = ["serde"] }
177177
flate2 = "1.0.24"
178+
flume = { version = "0.11", default-features = false, features = ["async"] }
178179
foldhash = "0.1.4"
179180
fs-err = "2.9.0"
180181
fs_extra = "1.3.0"

crates/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ dirs.workspace = true
5656
enum-as-inner.workspace = true
5757
enum-map.workspace = true
5858
flate2.workspace = true
59+
flume.workspace = true
5960
fs2.workspace = true
6061
futures.workspace = true
6162
futures-util.workspace = true

crates/core/src/db/relational_db.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ use spacetimedb_datastore::locking_tx_datastore::state_view::{
1919
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView,
2020
};
2121
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
22-
use spacetimedb_datastore::system_tables::{system_tables, StModuleRow};
22+
use spacetimedb_datastore::system_tables::{
23+
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
24+
};
2325
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
2426
use spacetimedb_datastore::traits::{
2527
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
@@ -1406,7 +1408,7 @@ impl RelationalDB {
14061408
self.inner.delete_by_rel_mut_tx(tx, table_id, relation)
14071409
}
14081410

1409-
/// Clear all rows from a table without dropping it.
1411+
/// Clears all rows from a table without dropping it.
14101412
pub fn clear_table(&self, tx: &mut MutTx, table_id: TableId) -> Result<usize, DBError> {
14111413
let rows_deleted = tx.clear_table(table_id)?;
14121414
Ok(rows_deleted)
@@ -1417,6 +1419,17 @@ impl RelationalDB {
14171419
Ok(tx.clear_all_views()?)
14181420
}
14191421

1422+
/// Empties the system tables tracking clients.
1423+
pub fn clear_all_clients(&self) -> Result<(), DBError> {
1424+
self.with_auto_commit(Workload::Internal, |mut_tx| {
1425+
self.clear_all_views(mut_tx)?;
1426+
self.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?;
1427+
self.clear_table(mut_tx, ST_CLIENT_ID)?;
1428+
self.clear_table(mut_tx, ST_VIEW_SUB_ID)?;
1429+
Ok(())
1430+
})
1431+
}
1432+
14201433
pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result<SequenceId, DBError> {
14211434
Ok(self.inner.create_sequence_mut_tx(tx, sequence_schema)?)
14221435
}
@@ -3049,43 +3062,37 @@ mod tests {
30493062
let stdb = TestDB::durable().expect("failed to create TestDB");
30503063

30513064
let timestamp = Timestamp::now();
3052-
let ctx = ReducerContext {
3053-
name: "abstract_concrete_proxy_factory_impl".into(),
3054-
caller_identity: Identity::__dummy(),
3055-
caller_connection_id: ConnectionId::ZERO,
3056-
timestamp,
3057-
arg_bsatn: Bytes::new(),
3065+
let workload = |name: &str| {
3066+
Workload::Reducer(ReducerContext {
3067+
name: name.into(),
3068+
caller_identity: Identity::__dummy(),
3069+
caller_connection_id: ConnectionId::ZERO,
3070+
timestamp,
3071+
arg_bsatn: Bytes::new(),
3072+
})
30583073
};
3074+
let workload1 = workload("abstract_concrete_proxy_factory_impl");
30593075

30603076
let row_ty = ProductType::from([("le_boeuf", AlgebraicType::I32)]);
30613077
let schema = table("test_table", row_ty.clone(), |builder| builder);
30623078

30633079
// Create an empty transaction
30643080
{
3065-
let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Reducer(ctx.clone()));
3081+
let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload1.clone());
30663082
stdb.commit_tx(tx).expect("failed to commit empty transaction");
30673083
}
30683084

30693085
// Create an empty transaction pretending to be an
30703086
// `__identity_connected__` call.
30713087
{
3072-
let tx = stdb.begin_mut_tx(
3073-
IsolationLevel::Serializable,
3074-
Workload::Reducer(ReducerContext {
3075-
name: "__identity_connected__".into(),
3076-
caller_identity: Identity::__dummy(),
3077-
caller_connection_id: ConnectionId::ZERO,
3078-
timestamp,
3079-
arg_bsatn: Bytes::new(),
3080-
}),
3081-
);
3088+
let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload("__identity_connected__"));
30823089
stdb.commit_tx(tx)
30833090
.expect("failed to commit empty __identity_connected__ transaction");
30843091
}
30853092

30863093
// Create a non-empty transaction including reducer info
30873094
let table_id = {
3088-
let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Reducer(ctx));
3095+
let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload1);
30893096
let table_id = stdb.create_table(&mut tx, schema).expect("failed to create table");
30903097
insert(&stdb, &mut tx, table_id, &product!(AlgebraicValue::I32(0))).expect("failed to insert row");
30913098
stdb.commit_tx(tx).expect("failed to commit tx");

0 commit comments

Comments
 (0)