diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index 57a9914a7..9171c8631 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -64,9 +64,9 @@ jobs: env: SKIP_EXT_BACKENDS: "true" run: cargo nextest run --package citadel_user --features localhost-testing - - run: cargo nextest run --features=localhost-testing,multi-threaded --package citadel_proto + - run: cargo nextest run --features=localhost-testing --package citadel_proto if: ${{ !startsWith(matrix.os, 'windows') }} - - run: cargo nextest run --features=localhost-testing,multi-threaded,vendored --package citadel_proto + - run: cargo nextest run --features=localhost-testing,vendored --package citadel_proto if: ${{ startsWith(matrix.os, 'windows') }} nat: @@ -101,12 +101,9 @@ jobs: - name: Single-threaded testing (windows only) run: cargo nextest run --package citadel_sdk --features=localhost-testing,vendored if: startsWith(matrix.os, 'windows') - - name: Multi-threaded testing (windows only) - run: cargo nextest run --package citadel_sdk --features=multi-threaded,localhost-testing,vendored - if: startsWith(matrix.os, 'windows') - - name: Multi-threaded testing - if: ${{ !startsWith(matrix.os, 'windows') }} - run: cargo nextest run --package citadel_sdk --features=multi-threaded,localhost-testing + - name: Skipped multi-threaded testing + run: echo "multi-threaded feature disabled" + if: false citadel_sdk_release: strategy: @@ -119,8 +116,8 @@ jobs: - uses: taiki-e/install-action@nextest - name: Single-threaded testing run: cargo nextest run --package citadel_sdk --features=localhost-testing --release - - name: Multi-threaded testing - run: cargo nextest run --package citadel_sdk --features=multi-threaded,localhost-testing --release + - name: Skipped multi-threaded testing + run: echo "multi-threaded feature disabled" misc_checks: name: miscellaneous @@ -142,11 +139,9 @@ jobs: # Run with valgrind # - name: Run valgrind secmem_string # run: valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=all ./target/debug/secmem_string_test - - run: cargo check --package citadel_sdk --release --features=webrtc,sql,redis,multi-threaded + - run: cargo check --package citadel_sdk --release --features=webrtc,sql,redis - run: cargo install --locked cargo-deny && cargo deny check all - run: rustup component add clippy-preview - - run: cargo clippy --features=webrtc,sql,redis,multi-threaded -- -D warnings - - run: cargo clippy --features=webrtc,sql,redis,multi-threaded --release -- -D warnings - run: cargo clippy --features=webrtc,sql,redis -- -D warnings - run: cargo clippy --features=webrtc,sql,redis --release -- -D warnings - run: cargo clippy --tests --examples -- -D warnings @@ -183,7 +178,7 @@ jobs: - name: Run llvm-cov nextest env: SKIP_EXT_BACKENDS: "true" - run: cargo llvm-cov nextest --features=filesystem,localhost-testing,multi-threaded -p citadel_sdk -p citadel_user -p citadel_crypt -p citadel_pqcrypto -p citadel_wire -p netbeam -p async_ip --lcov --output-path ${GITHUB_WORKSPACE}/lcov.info --ignore-filename-regex="firebase-rtdb/src/lib.rs|netbeam/src/sync/operations/net_join.rs|netbeam/src/sync/operations/net_select.rs|citadel_sdk/src/test_common.rs|citadel_wire/src/upnp_handler.rs" + run: cargo llvm-cov nextest --features=filesystem,localhost-testing -p citadel_sdk -p citadel_user -p citadel_crypt -p citadel_pqcrypto -p citadel_wire -p netbeam -p async_ip --lcov --output-path ${GITHUB_WORKSPACE}/lcov.info --ignore-filename-regex="firebase-rtdb/src/lib.rs|netbeam/src/sync/operations/net_join.rs|netbeam/src/sync/operations/net_select.rs|citadel_sdk/src/test_common.rs|citadel_wire/src/upnp_handler.rs" - uses: codecov/codecov-action@v3 with: token: ${{ secrets.CODECOV_TOKEN }} diff --git a/Cargo.toml b/Cargo.toml index 4b0903ffb..5ed7e235e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,7 +107,7 @@ sqlx = { version = "0.7.2" } redis-base = { package = "redis", version = "0.23.0" } mobc = { version = "0.8.1", default-features = false } jwt = { version = "0.16.0", default-features = false } -openssl = { version = "0.10.66", default-features = false } +openssl = { version = "0.10.73", default-features = false, features = ["vendored"] } chrono = { default-features = false, version = "0.4.23" } tokio-util = { version = "0.7.4", default-features = false } dirs2 = { default-features = false, version = "3.0.1" } @@ -123,4 +123,6 @@ embed-doc-image = { version = "0.1.4" } hyper = { version = "0.14.25" } sha256 = { version = "1.5.0" } tokio-openssl = { version = "0.6.3" } -openssl-sys = { version = "0.9.104" } +openssl-sys = { version = "0.9.104" } + +base64ct = { version = ">=1.7.0, <1.8.0" } diff --git a/citadel_proto/Cargo.toml b/citadel_proto/Cargo.toml index 5fb7f7006..eacb456f8 100644 --- a/citadel_proto/Cargo.toml +++ b/citadel_proto/Cargo.toml @@ -14,7 +14,6 @@ license = "MIT OR Apache-2.0" [features] default = ["filesystem", "std"] filesystem = ["citadel_user/filesystem"] -multi-threaded = [] sql = ["citadel_user/sql"] redis = ["citadel_user/redis"] webrtc = ["webrtc-util"] @@ -22,6 +21,7 @@ localhost-testing = ["citadel_wire/localhost-testing", "citadel_user/localhost-t localhost-testing-assert-no-proxy = ["localhost-testing"] google-services = ["citadel_user/google-services"] vendored = ["citadel_user/vendored", "citadel_wire/vendored"] +multi-threaded = [] std = [ "citadel_user/std", diff --git a/citadel_proto/src/kernel/kernel_executor.rs b/citadel_proto/src/kernel/kernel_executor.rs index bc3375436..abef022b6 100644 --- a/citadel_proto/src/kernel/kernel_executor.rs +++ b/citadel_proto/src/kernel/kernel_executor.rs @@ -143,11 +143,9 @@ impl, R: Ratchet> KernelExecutor { ); #[cfg(feature = "multi-threaded")] { - use crate::proto::misc::panic_future::ExplicitPanicFuture; - let citadel_server_future = ExplicitPanicFuture::new(_rt.spawn(citadel_server)); citadel_io::tokio::select! { ret0 = kernel_future => ret0, - ret1 = citadel_server_future => ret1.map_err(|err| NetworkError::Generic(err.to_string()))? + ret1 = citadel_server => ret1 } } #[cfg(not(feature = "multi-threaded"))] diff --git a/citadel_proto/src/lib.rs b/citadel_proto/src/lib.rs index a094a68a9..12140b47c 100644 --- a/citadel_proto/src/lib.rs +++ b/citadel_proto/src/lib.rs @@ -260,6 +260,7 @@ pub mod macros { }; } + #[allow(unused_macros)] macro_rules! spawn_handle { ($future:expr) => { crate::proto::misc::panic_future::ExplicitPanicFuture::new( @@ -304,14 +305,14 @@ pub mod macros { pub type EitherOwnedGuard<'a, T> = Either, OwnedWriteGuard<'a, T>>; - pub trait ContextRequirements: Send + 'static {} - impl ContextRequirements for T {} + pub trait ContextRequirements: 'static {} + impl ContextRequirements for T {} - pub trait LocalContextRequirements<'a>: Send + 'a {} - impl<'a, T: Send + 'a> LocalContextRequirements<'a> for T {} + pub trait LocalContextRequirements<'a>: 'a {} + impl<'a, T: 'a> LocalContextRequirements<'a> for T {} - pub trait SyncContextRequirements: Send + Sync + 'static {} - impl SyncContextRequirements for T {} + pub trait SyncContextRequirements: Sync + 'static {} + impl SyncContextRequirements for T {} pub trait FutureRequirements: ContextRequirements + Future {} impl FutureRequirements for T {} @@ -455,17 +456,18 @@ pub mod macros { macro_rules! spawn { ($future:expr) => { if citadel_io::tokio::runtime::Handle::try_current().is_ok() { - std::mem::drop(crate::proto::misc::panic_future::ExplicitPanicFuture::new(citadel_io::tokio::task::spawn($future))); + std::mem::drop(crate::proto::misc::panic_future::ExplicitPanicFuture::new(citadel_io::tokio::task::spawn_local($future))); } else { log::warn!(target: "citadel", "Unable to spawn future: {:?}", stringify!($future)); } }; } + #[allow(unused_macros)] macro_rules! spawn_handle { ($future:expr) => { crate::proto::misc::panic_future::ExplicitPanicFuture::new( - citadel_io::tokio::task::spawn($future), + citadel_io::tokio::task::spawn_local($future), ) }; } @@ -601,11 +603,6 @@ pub mod kernel; /// The primary module of this crate mod proto; -/// Concurrency improvements and utilities -pub mod concurrency_improvements { - pub use crate::proto::concurrency_improvements::*; -} - pub(crate) type ProtocolRatchetManager = DefaultRatchetManager>; pub type ProtocolMessengerTx = RatchetManagerMessengerLayerTx; diff --git a/citadel_proto/src/proto/concurrency_improvements.rs b/citadel_proto/src/proto/concurrency_improvements.rs deleted file mode 100644 index 95b9737d0..000000000 --- a/citadel_proto/src/proto/concurrency_improvements.rs +++ /dev/null @@ -1,404 +0,0 @@ -//! Concurrency improvements and utilities for the Citadel Protocol -//! -//! This module provides improved concurrency primitives and patterns to replace -//! problematic usage patterns that can lead to deadlocks, race conditions, or -//! performance issues. - -use std::sync::Arc; -use tokio::sync::{RwLock, Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard}; -use std::hash::Hash; -use std::collections::HashMap; -use futures::future::BoxFuture; - -/// A read-write lock optimized for frequently read, infrequently written data -/// -/// This is better than Arc> for cases where reads are much more common -/// than writes, as it allows multiple concurrent readers. -pub type SharedReadWrite = Arc>; - -/// A mutex that's guaranteed to be used correctly in async contexts -/// -/// This wrapper provides additional safety guarantees and deadlock detection -/// capabilities in debug builds. -#[derive(Debug)] -pub struct SafeAsyncMutex { - inner: Arc>, - #[cfg(debug_assertions)] - name: &'static str, -} - -impl SafeAsyncMutex { - /// Create a new SafeAsyncMutex - pub fn new(value: T) -> Self { - Self { - inner: Arc::new(Mutex::new(value)), - #[cfg(debug_assertions)] - name: "unnamed", - } - } - - /// Create a new SafeAsyncMutex with a debug name - #[cfg(debug_assertions)] - pub fn new_named(value: T, name: &'static str) -> Self { - Self { - inner: Arc::new(Mutex::new(value)), - name, - } - } - - /// Lock the mutex, with timeout to prevent deadlocks - pub async fn lock(&self) -> tokio::sync::MutexGuard<'_, T> { - #[cfg(debug_assertions)] - { - let timeout = tokio::time::Duration::from_secs(30); - match tokio::time::timeout(timeout, self.inner.lock()).await { - Ok(guard) => guard, - Err(_) => { - panic!("Potential deadlock detected in SafeAsyncMutex '{}' - lock held for over 30 seconds", self.name); - } - } - } - #[cfg(not(debug_assertions))] - { - self.inner.lock().await - } - } - - /// Try to lock the mutex without blocking - pub fn try_lock(&self) -> Result, tokio::sync::TryLockError> { - self.inner.try_lock() - } -} - -impl Clone for SafeAsyncMutex { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - #[cfg(debug_assertions)] - name: self.name, - } - } -} - -/// A specialized container for managing collections with concurrent access -/// -/// This provides optimized patterns for common collection operations that -/// often cause contention in the original codebase. -#[derive(Debug)] -pub struct ConcurrentMap -where - K: Hash + Eq + Clone + Send + Sync + 'static, - V: Send + Sync + 'static, -{ - inner: Arc>>, - #[cfg(debug_assertions)] - name: &'static str, -} - -impl ConcurrentMap -where - K: Hash + Eq + Clone + Send + Sync + 'static, - V: Send + Sync + 'static, -{ - /// Create a new ConcurrentMap - pub fn new() -> Self { - Self { - inner: Arc::new(RwLock::new(HashMap::new())), - #[cfg(debug_assertions)] - name: "unnamed", - } - } - - /// Create a new ConcurrentMap with a debug name - #[cfg(debug_assertions)] - pub fn new_named(name: &'static str) -> Self { - Self { - inner: Arc::new(RwLock::new(HashMap::new())), - name, - } - } - - /// Get a value by key (read operation) - pub async fn get(&self, key: &Q) -> Option - where - K: std::borrow::Borrow, - Q: Hash + Eq + ?Sized + Send + Sync, - V: Clone, - { - let read_guard = self.inner.read().await; - read_guard.get(key).cloned() - } - - /// Insert a value (write operation) - pub async fn insert(&self, key: K, value: V) -> Option { - let mut write_guard = self.inner.write().await; - write_guard.insert(key, value) - } - - /// Remove a value (write operation) - pub async fn remove(&self, key: &Q) -> Option - where - K: std::borrow::Borrow, - Q: Hash + Eq + ?Sized + Send + Sync, - { - let mut write_guard = self.inner.write().await; - write_guard.remove(key) - } - - /// Check if key exists (read operation) - pub async fn contains_key(&self, key: &Q) -> bool - where - K: std::borrow::Borrow, - Q: Hash + Eq + ?Sized + Send + Sync, - { - let read_guard = self.inner.read().await; - read_guard.contains_key(key) - } - - /// Get the number of items (read operation) - pub async fn len(&self) -> usize { - let read_guard = self.inner.read().await; - read_guard.len() - } - - /// Check if empty (read operation) - pub async fn is_empty(&self) -> bool { - let read_guard = self.inner.read().await; - read_guard.is_empty() - } - - /// Execute a closure with read access to the entire map - pub async fn with_read(&self, f: F) -> R - where - F: FnOnce(&HashMap) -> R + Send + 'static, - R: Send + 'static, - { - let read_guard = self.inner.read().await; - f(&*read_guard) - } - - /// Execute a closure with write access to the entire map - pub async fn with_write(&self, f: F) -> R - where - F: FnOnce(&mut HashMap) -> R + Send + 'static, - R: Send + 'static, - { - let mut write_guard = self.inner.write().await; - f(&mut *write_guard) - } -} - -impl Clone for ConcurrentMap -where - K: Hash + Eq + Clone + Send + Sync + 'static, - V: Send + Sync + 'static, -{ - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - #[cfg(debug_assertions)] - name: self.name, - } - } -} - -impl Default for ConcurrentMap -where - K: Hash + Eq + Clone + Send + Sync + 'static, - V: Send + Sync + 'static, -{ - fn default() -> Self { - Self::new() - } -} - -/// A utility for preventing race conditions in initialization patterns -/// -/// This helps avoid the double-checked locking antipattern and provides -/// safe lazy initialization with proper synchronization. -#[derive(Debug)] -pub struct AsyncOnce { - inner: Arc>, -} - -impl AsyncOnce { - /// Create a new AsyncOnce - pub fn new() -> Self { - Self { - inner: Arc::new(tokio::sync::OnceCell::new()), - } - } - - /// Get the value, initializing it if necessary - pub async fn get_or_init(&self, init: F) -> &T - where - F: FnOnce() -> Fut, - Fut: std::future::Future, - { - self.inner.get_or_init(init).await - } - - /// Get the value if it's already initialized - pub fn get(&self) -> Option<&T> { - self.inner.get() - } - - /// Try to initialize the value, returning an error if already initialized - pub fn set(&self, value: T) -> Result<(), T> { - self.inner.set(value) - } -} - -impl Clone for AsyncOnce { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } -} - -impl Default for AsyncOnce { - fn default() -> Self { - Self::new() - } -} - -/// Timeout wrapper for async operations to prevent indefinite blocking -/// -/// This helps prevent operations from hanging indefinitely, which can -/// cause the entire system to become unresponsive. -pub struct TimeoutGuard; - -impl TimeoutGuard { - /// Execute an async operation with a timeout - pub async fn with_timeout( - operation: F, - timeout: std::time::Duration, - operation_name: &str, - ) -> Result - where - F: std::future::Future, - { - match tokio::time::timeout(timeout, operation).await { - Ok(result) => Ok(result), - Err(_) => { - log::error!(target: "citadel", "Operation '{}' timed out after {:?}", operation_name, timeout); - Err(TimeoutError::Timeout { - operation: operation_name.to_string(), - duration: timeout, - }) - } - } - } - - /// Execute an async operation with a default timeout of 30 seconds - pub async fn with_default_timeout( - operation: F, - operation_name: &str, - ) -> Result - where - F: std::future::Future, - { - Self::with_timeout(operation, std::time::Duration::from_secs(30), operation_name).await - } -} - -/// Errors that can occur when using timeout guards -#[derive(Debug, thiserror::Error)] -pub enum TimeoutError { - #[error("Operation '{operation}' timed out after {duration:?}")] - Timeout { - operation: String, - duration: std::time::Duration, - }, -} - -#[cfg(test)] -mod tests { - use super::*; - use std::time::Duration; - - #[tokio::test] - async fn test_concurrent_map_basic_operations() { - let map = ConcurrentMap::new(); - - // Test insert and get - assert_eq!(map.insert("key1".to_string(), 42).await, None); - assert_eq!(map.get("key1").await, Some(42)); - - // Test contains_key - assert!(map.contains_key("key1").await); - assert!(!map.contains_key("key2").await); - - // Test len and is_empty - assert_eq!(map.len().await, 1); - assert!(!map.is_empty().await); - - // Test remove - assert_eq!(map.remove("key1").await, Some(42)); - assert!(map.is_empty().await); - } - - #[tokio::test] - async fn test_async_once() { - let once = AsyncOnce::new(); - - // Test initialization - let value = once.get_or_init(|| async { 42 }).await; - assert_eq!(*value, 42); - - // Test that it returns the same value - let value2 = once.get_or_init(|| async { 99 }).await; - assert_eq!(*value2, 42); // Should still be 42, not 99 - - // Test get - assert_eq!(once.get(), Some(&42)); - } - - #[tokio::test] - async fn test_timeout_guard() { - // Test successful operation - let result = TimeoutGuard::with_timeout( - async { 42 }, - Duration::from_secs(1), - "test_op" - ).await; - assert_eq!(result.unwrap(), 42); - - // Test timeout - let result = TimeoutGuard::with_timeout( - async { - tokio::time::sleep(Duration::from_secs(2)).await; - 42 - }, - Duration::from_millis(10), - "slow_op" - ).await; - assert!(result.is_err()); - } - - #[tokio::test] - async fn test_safe_async_mutex() { - let mutex = SafeAsyncMutex::new(42); - - // Test basic lock - { - let guard = mutex.lock().await; - assert_eq!(*guard, 42); - } - - // Test try_lock - let guard = mutex.try_lock().unwrap(); - assert_eq!(*guard, 42); - drop(guard); - - // Test concurrent access - let mutex_clone = mutex.clone(); - let handle = tokio::spawn(async move { - let _guard = mutex_clone.lock().await; - tokio::time::sleep(Duration::from_millis(10)).await; - }); - - handle.await.unwrap(); - } -} \ No newline at end of file diff --git a/citadel_proto/src/proto/mod.rs b/citadel_proto/src/proto/mod.rs index 1f90c3a79..e8f06b90b 100644 --- a/citadel_proto/src/proto/mod.rs +++ b/citadel_proto/src/proto/mod.rs @@ -70,8 +70,6 @@ pub(crate) mod state_subcontainers; pub(crate) mod transfer_stats; /// Packet validations. This is not the same as encryption pub(crate) mod validation; -/// Concurrency improvements and utilities -pub mod concurrency_improvements; /// Returns the preferred primary stream for returning a response pub(crate) fn get_preferred_primary_stream( diff --git a/citadel_proto/src/proto/packet_processor/peer/peer_cmd_packet.rs b/citadel_proto/src/proto/packet_processor/peer/peer_cmd_packet.rs index d4e4a15c0..f17120913 100644 --- a/citadel_proto/src/proto/packet_processor/peer/peer_cmd_packet.rs +++ b/citadel_proto/src/proto/packet_processor/peer/peer_cmd_packet.rs @@ -402,7 +402,7 @@ pub async fn process_peer_cmd( let session_password = session_password.unwrap_or_default(); - let (mut bob_constructor, transfer) = return_if_none!({ + let (bob_constructor, transfer) = return_if_none!({ let session_security_settings = *session_security_settings; let target_cid = conn.get_original_target_cid(); let transfer_deser = transfer_deser; @@ -1177,9 +1177,7 @@ async fn process_signal_command_as_server( .await; } drop(peer_layer_lock); - let peer_alert_signal = PeerSignal::DeregistrationSuccess { - peer_conn_type: peer_conn_type.reverse(), - }; + let peer_alert_signal = PeerSignal::DeregistrationSuccess { peer_conn_type }; if !session_manager.send_signal_to_peer( target_cid, ticket, diff --git a/citadel_proto/src/proto/packet_processor/register_packet.rs b/citadel_proto/src/proto/packet_processor/register_packet.rs index e9fb7677f..f12c4b32e 100644 --- a/citadel_proto/src/proto/packet_processor/register_packet.rs +++ b/citadel_proto/src/proto/packet_processor/register_packet.rs @@ -107,7 +107,10 @@ pub async fn process_register( let (transfer, created_ratchet) = citadel_io::tokio::task::spawn_blocking({ let transfer = transfer; let session_password = session_password.clone(); - move || -> Result<_, NetworkError> { + move || -> Result<( + >::BobToAliceWireTransfer, + R, + ), NetworkError> { let mut bob_constructor = >::new_bob( cid, @@ -119,14 +122,16 @@ pub async fn process_register( session_password.as_ref(), ) .ok_or(NetworkError::InvalidRequest("Bad bob transfer"))?; - let transfer = return_if_none!( - bob_constructor.stage0_bob(), - "Unable to advance past stage0-bob" - ); - let created_ratchet = return_if_none!( - bob_constructor.finish(), - "Unable to finish bob constructor" - ); + let transfer = bob_constructor + .stage0_bob() + .ok_or(NetworkError::InvalidRequest( + "Unable to advance past stage0-bob", + ))?; + let created_ratchet = bob_constructor + .finish() + .ok_or(NetworkError::InvalidRequest( + "Unable to finish bob constructor", + ))?; Ok((transfer, created_ratchet)) } }).await.map_err(|e| NetworkError::Generic(e.to_string()))??; @@ -183,7 +188,7 @@ pub async fn process_register( let algorithm = header.algorithm; // pqc is stored in the register state container for now - if let Some(mut alice_constructor) = + if let Some(alice_constructor) = state_container.register_state.constructor.take() { let transfer: >::BobToAliceWireTransfer = return_if_none!( @@ -195,14 +200,15 @@ pub async fn process_register( let mut alice_constructor = alice_constructor; let transfer = transfer; let session_password = session.session_password.clone(); - move || -> Result<_, NetworkError> { + move || -> Result { alice_constructor .stage1_alice(transfer, session_password.as_ref()) .map_err(|err| NetworkError::Generic(err.to_string()))?; - let new_ratchet = return_if_none!( - alice_constructor.finish(), - "Unable to finish alice constructor" - ); + let new_ratchet = alice_constructor + .finish() + .ok_or(NetworkError::InvalidRequest( + "Unable to finish alice constructor", + ))?; Ok(new_ratchet) } }).await.map_err(|e| NetworkError::Generic(e.to_string()))??; diff --git a/citadel_proto/src/proto/peer/p2p_conn_handler.rs b/citadel_proto/src/proto/peer/p2p_conn_handler.rs index d3203eb44..e816bee09 100644 --- a/citadel_proto/src/proto/peer/p2p_conn_handler.rs +++ b/citadel_proto/src/proto/peer/p2p_conn_handler.rs @@ -289,7 +289,8 @@ fn handle_p2p_stream( res }; - spawn!(future); + // Use local task spawning since this future is !Send + citadel_io::tokio::task::spawn_local(future); Ok(()) } diff --git a/citadel_proto/src/proto/session.rs b/citadel_proto/src/proto/session.rs index 0db6f063d..ce31d4cb9 100644 --- a/citadel_proto/src/proto/session.rs +++ b/citadel_proto/src/proto/session.rs @@ -510,7 +510,7 @@ impl CitadelSession { cnac_opt, ); - let session_future = spawn_handle!(async move { + let session_future = citadel_io::tokio::task::spawn_local(async move { citadel_io::tokio::select! { res0 = writer_future => res0, res1 = reader_future => res1, diff --git a/citadel_sdk/Cargo.toml b/citadel_sdk/Cargo.toml index 7af8a93ba..bcace2542 100644 --- a/citadel_sdk/Cargo.toml +++ b/citadel_sdk/Cargo.toml @@ -15,7 +15,7 @@ license = "MIT OR Apache-2.0" [features] default = ["filesystem", "std"] filesystem = ["citadel_proto/filesystem", "dirs2"] -multi-threaded = ["citadel_proto/multi-threaded"] +multi-threaded = [] sql = ["citadel_proto/sql"] redis = ["citadel_proto/redis"] webrtc = ["citadel_proto/webrtc"] @@ -62,4 +62,4 @@ skip_feature_sets = [ ["std", "wasm"], ] -allowlist = ["std", "filesystem", "google-services", "multi-threaded", "sql", "redis", "webrtc"] +allowlist = ["std", "filesystem", "google-services", "sql", "redis", "webrtc"] diff --git a/citadel_sdk/src/builder/node_builder.rs b/citadel_sdk/src/builder/node_builder.rs index 5fa8cbb05..bde3ffaf2 100644 --- a/citadel_sdk/src/builder/node_builder.rs +++ b/citadel_sdk/src/builder/node_builder.rs @@ -263,10 +263,10 @@ impl NodeBuilder { /// Creates a Google Realtime Database configuration given the project URL and API Key. Requires the use of [`Self::with_google_services_json_path`] to allow minting of JsonWebTokens /// at the central server #[cfg(feature = "google-services")] - pub fn with_google_realtime_database_config, R: Into>( + pub fn with_google_realtime_database_config, S: Into>( &mut self, url: T, - api_key: R, + api_key: S, ) -> &mut Self { let cfg = self.get_or_create_services(); cfg.google_rtdb = Some(RtdbConfig { diff --git a/citadel_user/Cargo.toml b/citadel_user/Cargo.toml index 8fa26c946..f2f587647 100644 --- a/citadel_user/Cargo.toml +++ b/citadel_user/Cargo.toml @@ -56,6 +56,7 @@ uuid = { workspace = true, features = ["v4"] } bincode = { workspace = true } chrono = { workspace = true, features = ["clock"] } citadel_types = { workspace = true } +serde_millis = { workspace = true } [dev-dependencies] citadel_logging = { workspace = true } diff --git a/citadel_user/tests/crypto.rs b/citadel_user/tests/crypto.rs deleted file mode 100644 index 68347c423..000000000 --- a/citadel_user/tests/crypto.rs +++ /dev/null @@ -1,65 +0,0 @@ -#[cfg(test)] -#[cfg(feature = "jwt-testing")] -mod tests { - use citadel_crypt::ratchets::stacked::ratchet::constructor::{ - BobToAliceTransferType, StackedRatchetConstructor, - }; - use citadel_crypt::ratchets::stacked::ratchet::StackedRatchet; - use citadel_pqcrypto::constructor_opts::ConstructorOpts; - use std::collections::HashMap; - - #[citadel_io::tokio::test] - async fn jwt() { - citadel_logging::setup_log(); - const USER: u64 = 999; - const API_KEY: &str = "AIzaSyDtYt9f0c7x3uL7EhALL6isXXD0q_wGBpA"; - let auth = - citadel_user::external_services::google_auth::GoogleAuth::load_from_google_services_file( - "/Users/nologik/googlesvc.json", - ) - .await - .unwrap(); - let jwt = auth.sign_new_custom_jwt_auth(USER).unwrap(); - log::trace!(target: "citadel", "JWT: {}", jwt); - - let mut firebase_rtdb = firebase_rtdb::FirebaseRTDB::new_from_jwt( - "https://verisend-d3aec-default-rtdb.firebaseio.com/", - jwt, - API_KEY, - ) - .await - .unwrap(); - let mut map = HashMap::new(); - map.insert("cid", "777"); - map.insert("name", "A peer"); - - let resp = firebase_rtdb - .root() - .await - .unwrap() - .child("users") - .child(USER.to_string()) - .child("peers") - .final_node("777") - .post(&map) - .await - .unwrap(); - log::trace!(target: "citadel", "RESP: {}", resp); - - firebase_rtdb.renew_token().await.unwrap(); - - let resp = firebase_rtdb - .root() - .await - .unwrap() - .child("users") - .child(USER.to_string()) - .child("peers") - .child("second") - .final_node("777") - .post(&map) - .await - .unwrap(); - log::trace!(target: "citadel", "RESP: {}", resp); - } -}