diff --git a/Cargo.lock b/Cargo.lock index adb7ff3bd..19f8fc41f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1798,7 +1798,7 @@ dependencies = [ [[package]] name = "fastn" -version = "0.4.112" +version = "0.4.113" dependencies = [ "actix-web", "camino", @@ -1832,6 +1832,8 @@ dependencies = [ [[package]] name = "fastn-context" version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb4d708d15b187e30d12f75dc5aedc4691074f056f12b3536a69effcea9dee4" dependencies = [ "fastn-context-macros", "tokio", @@ -1841,6 +1843,8 @@ dependencies = [ [[package]] name = "fastn-context-macros" version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee801bf528c8d05900d55f1e3f3d6e3d04cf9286c876515c2fbaadda63315c88" dependencies = [ "proc-macro2", "quote", @@ -2039,6 +2043,7 @@ version = "0.1.0" dependencies = [ "async-stream", "eyre", + "fastn-context", "fastn-id52", "fastn-net", "fastn-p2p-macros", diff --git a/Cargo.toml b/Cargo.toml index 69e5b9b18..60d635c3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,8 +3,6 @@ members = [ "clift", "fastn", "fastn-builtins", - "fastn-context", - "fastn-context-macros", "fastn-core", "fastn-daemon", "fastn-ds", diff --git a/Changelog.md b/Changelog.md index 357051a0c..597d46151 100644 --- a/Changelog.md +++ b/Changelog.md @@ -1,5 +1,11 @@ # `fastn` Change Log +## 17 September 2025 + +### fastn: 0.4.113 + +- fix: Do not override query params of http processor's target url. PR #2209. + ## 20 August 2025 ### fastn: 0.4.112 diff --git a/fastn-context-macros/Cargo.toml b/fastn-context-macros/Cargo.toml deleted file mode 100644 index 984941ed9..000000000 --- a/fastn-context-macros/Cargo.toml +++ /dev/null @@ -1,18 +0,0 @@ -[package] -name = "fastn-context-macros" -version = "0.1.0" -authors.workspace = true -edition.workspace = true -description.workspace = true -license.workspace = true -repository.workspace = true -homepage.workspace = true -rust-version.workspace = true - -[lib] -proc-macro = true - -[dependencies] -proc-macro2 = "1" -quote = "1" -syn = { version = "2", features = ["full", "extra-traits"] } \ No newline at end of file diff --git a/fastn-context-macros/src/lib.rs b/fastn-context-macros/src/lib.rs deleted file mode 100644 index 9bb31b100..000000000 --- a/fastn-context-macros/src/lib.rs +++ /dev/null @@ -1,35 +0,0 @@ -use proc_macro::TokenStream; -use quote::quote; -use syn::{ItemFn, parse_macro_input}; - -/// Main function attribute macro for fastn applications with context support -#[proc_macro_attribute] -pub fn main(_args: TokenStream, input: TokenStream) -> TokenStream { - let input_fn = parse_macro_input!(input as ItemFn); - - let user_fn_name = syn::Ident::new("__fastn_user_main", proc_macro2::Span::call_site()); - let fn_block = &input_fn.block; - let fn_attrs = &input_fn.attrs; - let fn_vis = &input_fn.vis; - - quote! { - #(#fn_attrs)* - #fn_vis fn main() -> std::result::Result<(), Box> { - // Initialize tokio runtime - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build()? - .block_on(async { - // Global context automatically created - - // Call user's main function - let result = #user_fn_name().await; - - result - }) - } - - async fn #user_fn_name() -> std::result::Result<(), Box> #fn_block - } - .into() -} diff --git a/fastn-context/Cargo.toml b/fastn-context/Cargo.toml deleted file mode 100644 index 6278095d5..000000000 --- a/fastn-context/Cargo.toml +++ /dev/null @@ -1,15 +0,0 @@ -[package] -name = "fastn-context" -version = "0.1.0" -authors.workspace = true -edition.workspace = true -description.workspace = true -license.workspace = true -repository.workspace = true -homepage.workspace = true -rust-version.workspace = true - -[dependencies] -tokio.workspace = true -tokio-util.workspace = true -fastn-context-macros = { path = "../fastn-context-macros" } \ No newline at end of file diff --git a/fastn-context/NEXT-complete-design.md b/fastn-context/NEXT-complete-design.md deleted file mode 100644 index eb48306d9..000000000 --- a/fastn-context/NEXT-complete-design.md +++ /dev/null @@ -1,844 +0,0 @@ -# fastn-context: Hierarchical Application Context for Debugging and Operations - -This crate provides a hierarchical context system for fastn applications, enabling tree-based cancellation, metrics collection, and operational visibility. It forms the operational backbone for all fastn services. - -> **Note**: This README documents the complete design iteratively. Some sections may overlap as features build on each other. The design is internally consistent - later sections refine and extend earlier concepts. - -## Design Philosophy - -- **Hierarchical Structure**: Applications naturally form trees of operations -- **Automatic Inheritance**: Child contexts inherit cancellation and settings from parents -- **Zero Boilerplate**: Context trees build themselves as applications run -- **Production Ready**: Status trees enable debugging of stuck/slow operations -- **Bounded Complexity**: Simple spawn vs detailed child creation as needed - -## Core Concepts - -### Context Tree Structure - -Every fastn application forms a natural hierarchy: - -``` -Global Context (application level) -├── Service Context (e.g., "remote-access-listener") -│ ├── Session Context (e.g., "alice@bv478gen") -│ │ ├── Task Context (e.g., "stdout-handler") -│ │ └── Task Context (e.g., "stderr-stream") -│ └── Session Context (e.g., "bob@p2nd7avq") -├── Service Context (e.g., "http-proxy") -└── Service Context (e.g., "chat-service") -``` - -### Automatic Context Creation - -fastn-context integrates seamlessly with fastn ecosystem: - -```rust -// 1. Global context created by main macro -#[fastn_context::main] -async fn main() -> eyre::Result<()> { - // Global context automatically available -} - -// 2. Service contexts created by operations -let listener = fastn_p2p::server::listen(key, protocols).await?; -// Creates child context: "p2p-listener" under global - -// 3. Session contexts created per connection -// Each incoming connection gets child context: "session-{peer_id}" - -// 4. Task contexts created by spawn operations -session_ctx.child("shell-handler").spawn(handle_shell); -``` - -## API Reference - -### Core Context - -```rust -pub struct Context { - /// Context name for debugging/status - pub name: String, - - /// When this context was created - pub created_at: std::time::Instant, - - // Private: parent, children, cancellation, metrics, data -} - -impl Context { - /// Create new root context (typically only used by main macro) - pub fn new(name: &str) -> std::sync::Arc; - - /// Create child context with given name - pub fn child(&self, name: &str) -> ContextBuilder; - - /// Simple spawn (inherits current context, no child creation) - pub fn spawn(&self, task: F) -> tokio::task::JoinHandle - where F: std::future::Future + Send + 'static; - - /// Wait for cancellation signal - pub async fn wait(&self); - - /// Cancel this context and all children recursively - pub fn cancel(&self); - - /// Add metric data for status reporting - pub fn add_metric(&self, key: &str, value: MetricValue); - - /// Store arbitrary data on this context - pub fn set_data(&self, key: &str, value: serde_json::Value); - - /// Get stored data - pub fn get_data(&self, key: &str) -> Option; - - /// Increment total counter (historical count) - pub fn increment_total(&self, counter: &str); - - /// Increment live counter (current active count) - pub fn increment_live(&self, counter: &str); - - /// Decrement live counter (when operation completes) - pub fn decrement_live(&self, counter: &str); - - /// Get counter values - pub fn get_total(&self, counter: &str) -> u64; - pub fn get_live(&self, counter: &str) -> u64; -} -``` - -### Context Builder - -```rust -pub struct ContextBuilder { - // Pre-created child context ready for configuration -} - -impl ContextBuilder { - /// Add initial data to context - pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; - - /// Add initial metric to context - pub fn with_metric(self, key: &str, value: MetricValue) -> Self; - - /// Spawn task with this configured child context - pub fn spawn(self, task: F) -> tokio::task::JoinHandle - where F: FnOnce(std::sync::Arc) -> Fut + Send + 'static; -} -``` - -### Global Access - -```rust -/// Get the global application context -pub fn global() -> std::sync::Arc; - -/// Get current task's context (thread-local or task-local) -pub fn current() -> std::sync::Arc; - -/// Print status tree for debugging -pub fn status() -> StatusTree; -``` - -### Metric Types - -```rust -#[derive(Debug, Clone)] -pub enum MetricValue { - Counter(u64), - Gauge(f64), - Duration(std::time::Duration), - Text(String), - Bytes(u64), -} -``` - -## Usage Patterns - -### Simple Task Spawning - -```rust -// Inherit current context (no child creation) -let ctx = fastn_context::current(); -ctx.spawn(async { - // Simple background task -}); -``` - -### Detailed Task Spawning - -```rust -// Create child context with debugging info -ctx.child("remote-shell-handler") - .with_data("peer", alice_id52) - .with_data("shell", "bash") - .with_metric("commands_executed", 0) - .spawn(|task_ctx| async move { - // Task can update its own context - task_ctx.add_metric("commands_executed", cmd_count); - task_ctx.set_data("last_command", "ls -la"); - - // Task waits for its own cancellation - tokio::select! { - _ = task_ctx.wait() => { - println!("Shell handler cancelled"); - } - _ = handle_shell_session() => { - println!("Shell session completed"); - } - } - }); -``` - -### Status Tree Output - -``` -$ fastn status -Global Context (2h 15m 32s uptime) -├── Remote Access Listener (1h 45m active) -│ ├── alice@bv478gen (23m 12s, bash shell) -│ │ ├── stdout-handler (23m 12s, 15.2MB processed) -│ │ └── stderr-stream (18m 45s, 2.1KB processed) -│ └── bob@p2nd7avq (8m 33s, ls command) -│ └── command-executor (8m 33s, exit pending) -├── HTTP Proxy (2h 15m active) -│ ├── connection-pool (45 active, 1,234 requests) -│ └── request-handler-pool (12 workers active) -└── Chat Service (35m active) - ├── presence-monitor (35m, 15 users tracked) - └── message-relay (35m, 4,567 messages) -``` - -## Integration with fastn-p2p - -fastn-p2p depends on fastn-context and automatically creates context hierarchies: - -```rust -// fastn-p2p sessions provide access to their context -async fn handle_remote_shell(session: fastn_p2p::server::Session) { - let ctx = session.context(); // Auto-created by fastn-p2p - - // Simple spawn (inherits session context) - ctx.spawn(pipe_stdout(session.send)); - - // Detailed spawn (creates child for debugging) - ctx.child("command-executor") - .with_data("command", session.protocol.command) - .spawn(|task_ctx| async move { - let result = execute_command(&session.protocol.command).await; - task_ctx.set_data("exit_code", result.code); - }); -} -``` - -## Main Function Integration - -The main macro moves to fastn-context and sets up the global context: - -```rust -#[fastn_context::main] -async fn main() -> eyre::Result<()> { - // Global context automatically created and available - - let ctx = fastn_context::global(); - ctx.child("startup") - .with_data("version", env!("CARGO_PKG_VERSION")) - .spawn(|_| async { - // Application initialization - }); -} -``` - -## Design Benefits - -1. **Names Required for Debugging** - Every important operation has a name in status tree -2. **Selective Complexity** - Simple spawn vs detailed child creation as needed -3. **Automatic Tree Building** - Context hierarchy builds as application runs -4. **Production Debugging** - `fastn status` shows exactly where system is stuck -5. **Clean Separation** - Context concerns separate from networking concerns -6. **Ecosystem Wide** - All fastn crates can use the same context infrastructure - -**Key Insight**: Names aren't optional - they're essential for production debugging and operational visibility. - -## Comprehensive Timing and Lock Monitoring - -Every context and operation tracks detailed timing for real-time debugging, including named lock monitoring for deadlock detection. - -### Timing Integration - -```rust -pub struct Context { - pub name: String, - pub created_at: std::time::Instant, // When context started - pub last_activity: std::sync::Arc>, // Last activity - // ... other fields -} - -impl Context { - /// Update last activity timestamp (called automatically by operations) - pub fn touch(&self); - - /// Get how long this context has been alive - pub fn duration(&self) -> std::time::Duration; - - /// Get how long since last activity - pub fn idle_duration(&self) -> std::time::Duration; - - /// Create named mutex within this context - pub fn mutex(&self, name: &str, data: T) -> ContextMutex; - - /// Create named RwLock within this context - pub fn rwlock(&self, name: &str, data: T) -> ContextRwLock; - - /// Create named semaphore within this context - pub fn semaphore(&self, name: &str, permits: usize) -> ContextSemaphore; -} -``` - -### Named Lock Types - -```rust -pub struct ContextMutex { - name: String, - context: std::sync::Arc, - inner: tokio::sync::Mutex, -} - -impl ContextMutex { - /// Lock with automatic status tracking - pub async fn lock(&self) -> ContextMutexGuard; -} - -pub struct ContextMutexGuard { - acquired_at: std::time::Instant, // When lock was acquired - context_name: String, // Which context holds it - lock_name: String, // Lock identifier - // Auto-reports to context status system - // Auto-cleanup on drop -} -``` - -### Detailed Status Output with Comprehensive Timing - -``` -$ fastn status -Global Context (2h 15m 32s uptime, active 0.1s ago) -├── Remote Access Listener (1h 45m active, last activity 2.3s ago) -│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) -│ │ ├── stdout-handler (23m 12s running, CPU active) -│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) -│ │ └── stderr-stream (18m 45s running, idle 8.1s) -│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK -│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) -│ └── command-executor (8m 33s running, exit pending) -├── HTTP Proxy (2h 15m active, last request 0.8s ago) -│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) -│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) -└── Chat Service (35m active, last message 1.2s ago) - ├── presence-monitor (35m running, heartbeat 30s ago) - └── message-relay (35m running, processing queue) - -🔒 Active Locks (3): - - "session-output-lock" held by alice/stdout-handler (12.3s) ⚠️ LONG HELD - - "user-table-write-lock" held by user-service/db-writer (0.1s) - - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) - -⏳ Lock Waiters (1): - - alice/stderr-stream waiting for "session-output-lock" (8.1s) ⚠️ STUCK - -⚠️ Potential Issues: - - Long-held lock "session-output-lock" (12.3s) may indicate deadlock - - stderr-stream stuck waiting (8.1s) suggests blocked I/O -``` - -### Automatic Activity Tracking - -```rust -// All operations automatically maintain timing -ctx.spawn(async { - // ctx.touch() called when task starts - loop { - do_work().await; - ctx.touch(); // Update activity timestamp - } -}); - -// Lock operations update timing automatically -let guard = ctx.mutex("data-lock", data).lock().await; -// Updates: context last_activity, tracks lock hold time - -// Long operations should periodically touch -async fn long_running_task(ctx: std::sync::Arc) { - loop { - process_batch().await; - ctx.touch(); // Show we're still active, not stuck - - tokio::select! { - _ = ctx.wait() => break, // Cancelled - _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {} - } - } -} -``` - -This provides **real-time operational debugging** - administrators can instantly identify stuck operations, deadlocked tasks, and performance bottlenecks with precise timing information. - -## Counter Management System - -Every context can track both historical totals and live counts for detailed operational metrics. - -### Global Counter Storage with Dotted Paths - -```rust -pub struct Context { - pub name: String, - pub full_path: String, // "global.remote-access.alice@bv478gen.stdout-handler" - // ... other fields -} - -impl Context { - /// Get full dotted path for this context - pub fn path(&self) -> &str; - - /// Increment total counter (stored in global hashmap by full path) - pub fn increment_total(&self, counter: &str); - - /// Increment live counter (stored in global hashmap by full path) - pub fn increment_live(&self, counter: &str); - - /// Decrement live counter (stored in global hashmap by full path) - pub fn decrement_live(&self, counter: &str); - - /// Get counter values (retrieved from global storage) - pub fn get_total(&self, counter: &str) -> u64; - pub fn get_live(&self, counter: &str) -> u64; -} - -// Global counter storage (persists beyond context lifetimes) -static GLOBAL_COUNTERS: LazyLock>> = ...; - -// Counter keys format: "{context_path}.{counter_name}" -// Examples: -// "global.connections" -> 1,247 -// "global.remote-access.connections" -> 234 -// "global.remote-access.alice@bv478gen.commands" -> 45 -// "global.http-proxy.requests" -> 1,013 -``` - -### Automatic Counter Integration - -```rust -// fastn-p2p automatically maintains connection counters -async fn handle_incoming_connection(session: fastn_p2p::server::Session) { - let ctx = session.context(); - - // Automatically tracked by fastn-p2p: - ctx.increment_total("connections"); // Total connections ever - ctx.increment_live("connections"); // Current active connections - - // Your handler code... - - // When session ends: - ctx.decrement_live("connections"); // Automatically called -} - -// Custom counters for application logic -async fn handle_remote_command(session: server::Session) { - let ctx = session.context(); - - ctx.increment_total("commands"); // Total commands executed - ctx.increment_live("commands"); // Currently executing commands - - let result = execute_command(&session.protocol.command).await; - - ctx.decrement_live("commands"); // Command completed - - if result.success { - ctx.increment_total("successful_commands"); - } else { - ctx.increment_total("failed_commands"); - } -} -``` - -### Enhanced Status Display with Counters - -``` -$ fastn status -fastn Status Dashboard -System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 -Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m - -Global Context (2h 15m 32s uptime, active 0.1s ago) -├── Total: 1,247 connections, 15,432 requests | Live: 47 connections, 12 active requests -├── Remote Access Listener (1h 45m active, last activity 2.3s ago) -│ ├── Total: 234 connections, 2,156 commands | Live: 2 connections, 3 commands -│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) -│ │ ├── Total: 45 commands (42 success, 3 failed) | Live: 1 command -│ │ ├── stdout-handler (23m 12s running, CPU active) -│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) -│ │ └── stderr-stream (18m 45s running, idle 8.1s) -│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK -│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) -│ ├── Total: 12 commands (12 success) | Live: 1 command -│ └── command-executor (8m 33s running, exit pending) -├── HTTP Proxy (2h 15m active, last request 0.8s ago) -│ ├── Total: 1,013 requests (987 success, 26 failed) | Live: 45 connections, 8 requests -│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) -│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) -└── Chat Service (35m active, last message 1.2s ago) - ├── Total: 4,567 messages, 89 users joined | Live: 15 users, 3 conversations - ├── presence-monitor (35m running, heartbeat 30s ago) - └── message-relay (35m running, processing queue) - -🔒 Active Locks (3): ... -⏳ Lock Waiters (1): ... -``` - -### Counter Storage and Paths - -```rust -// Counter keys are automatically generated from context paths: - -// Global level counters -// "global.connections" -> 1,247 total -// "global.live_connections" -> 47 current - -// Service level counters -// "global.remote-access.connections" -> 234 total -// "global.remote-access.live_connections" -> 2 current - -// Session level counters -// "global.remote-access.alice@bv478gen.commands" -> 45 total -// "global.remote-access.alice@bv478gen.live_commands" -> 1 current - -// Task level counters -// "global.remote-access.alice@bv478gen.stdout-handler.bytes_processed" -> 1,234,567 - -// Examples in code: -async fn handle_connection(session: server::Session) { - let ctx = session.context(); // Path: "global.remote-access.alice@bv478gen" - - // These create global entries: - ctx.increment_total("commands"); // Key: "global.remote-access.alice@bv478gen.commands" - ctx.increment_live("commands"); // Key: "global.remote-access.alice@bv478gen.live_commands" - - // Nested task context - ctx.child("stdout-handler").spawn(|task_ctx| async move { - // task_ctx.path() -> "global.remote-access.alice@bv478gen.stdout-handler" - task_ctx.increment_total("bytes_processed"); - }); -} -``` - -### Persistent Counter Benefits - -- **✅ Survives context drops** - Counters stored globally, persist after contexts end -- **✅ Hierarchical aggregation** - Can sum all child counters for parent totals -- **✅ Path-based queries** - Easy to find counters by context path -- **✅ Historical tracking** - Total counters accumulate across all context instances -- **✅ Live tracking** - Live counters automatically decremented when contexts drop - -**Live counters** show current activity (auto-decremented on context drop). -**Total counters** show historical activity (persist forever for trending). -**Global storage** ensures metrics survive context lifecycles. - -## Status Monitoring and HTTP Dashboard - -fastn-context automatically provides multiple ways to access real-time status information for debugging and monitoring. - -### P2P Status Access - -```rust -#[fastn_context::main] -async fn main() -> eyre::Result<()> { - // Status automatically available over P2P for remote access - // No HTTP server needed - uses secure P2P connections - - // Your application code... -} -``` - -Status is accessible over the P2P network using the remote access system. - -### Status API Functions - -```rust -/// Get current status snapshot with ANSI formatting -pub fn status() -> Status; - -/// Stream of status updates (max once per second) -pub fn status_stream() -> impl futures_core::stream::Stream; - -/// Get raw status data as structured JSON -pub fn status_json() -> serde_json::Value; -``` - -### Status Type with ANSI Display - -```rust -#[derive(Debug, Clone, serde::Serialize)] -pub struct Status { - pub global_context: ContextStatus, - pub active_locks: Vec, - pub lock_waiters: Vec, - pub warnings: Vec, - pub timestamp: std::time::SystemTime, -} - -#[derive(Debug, Clone, serde::Serialize)] -pub struct ContextStatus { - pub name: String, - pub duration: std::time::Duration, - pub last_activity: std::time::Duration, // Time since last activity - pub children: Vec, - pub metrics: std::collections::HashMap, - pub data: std::collections::HashMap, - pub total_counters: std::collections::HashMap, // Historical counts - pub live_counters: std::collections::HashMap, // Current active counts -} - -#[derive(Debug, Clone, serde::Serialize)] -pub struct LockStatus { - pub name: String, - pub held_by_context: String, - pub held_duration: std::time::Duration, - pub lock_type: LockType, // Mutex, RwLock, Semaphore -} - -#[derive(Debug, Clone, serde::Serialize)] -pub struct StatusWarning { - pub message: String, - pub context_path: String, - pub severity: WarningSeverity, -} - -#[derive(Debug, Clone, serde::Serialize)] -pub enum WarningSeverity { - Info, // FYI information - Warning, // Potential issue - Critical, // Likely problem -} -``` - -### ANSI-Formatted Display - -```rust -impl std::fmt::Display for Status { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - use colored::*; // For ANSI colors - - // Header with timestamp - writeln!(f, "{}", "fastn Status Dashboard".bold().blue())?; - writeln!(f, "{}", format!("Snapshot: {}", - humantime::format_rfc3339(self.timestamp)).dimmed())?; - writeln!(f)?; - - // Context tree with colors and timing - self.display_context_tree(f, &self.global_context, 0)?; - - // Active locks section - if !self.active_locks.is_empty() { - writeln!(f, "\n{} Active Locks ({}):", "🔒".yellow(), self.active_locks.len())?; - for lock in &self.active_locks { - let duration_str = humantime::format_duration(lock.held_duration); - let color = if lock.held_duration.as_secs() > 10 { - "red" - } else { - "white" - }; - writeln!(f, " - \"{}\" held by {} ({})", - lock.name.cyan(), - lock.held_by_context.white(), - duration_str.color(color))?; - } - } - - // Lock waiters section - if !self.lock_waiters.is_empty() { - writeln!(f, "\n{} Lock Waiters ({}):", "⏳".yellow(), self.lock_waiters.len())?; - for waiter in &self.lock_waiters { - let duration_str = humantime::format_duration(waiter.waiting_duration); - writeln!(f, " - {} waiting for \"{}\" ({})", - waiter.context_name.white(), - waiter.lock_name.cyan(), - duration_str.red())?; - } - } - - // Warnings section - if !self.warnings.is_empty() { - writeln!(f, "\n{} Warnings:", "⚠️".red())?; - for warning in &self.warnings { - let icon = match warning.severity { - WarningSeverity::Info => "ℹ️", - WarningSeverity::Warning => "⚠️", - WarningSeverity::Critical => "🚨", - }; - writeln!(f, " {} {}", icon, warning.message.yellow())?; - } - } - - Ok(()) - } -} -``` - -### Status Stream (Event-Driven Updates) - -```rust -/// Stream provides updates only when context tree actually changes -/// No polling - efficient for long-running monitoring -let mut status_stream = fastn_context::status_stream(); -while let Some(status) = status_stream.next().await { - // Only prints when something actually changes - print!("\x1B[2J\x1B[H"); // Clear screen - println!("{}", status); // Display with colors -} -``` - -### CLI Integration with P2P Status Access - -fastn-context integrates with the main fastn CLI to provide both local and remote status access: - -```bash -# Local machine status -fastn status # One-time snapshot with ANSI colors -fastn status -w # Watch mode (event-driven, no polling) -fastn status --json # JSON output for programmatic use - -# Remote machine status over P2P (requires remote access) -fastn status alice # Status from machine with alias "alice" -fastn status bv478gen... # Status from machine with ID52 -fastn status alice -w # Watch remote machine's status in real-time -fastn status alice --json # Remote machine status as JSON - -# Multiple machines -fastn status alice,bob,prod # Status from multiple machines -``` - -**P2P Status Protocol:** -- Uses secure fastn remote access (same as `fastn rshell`) -- Requires target machine in your `remote-access/config.toml` -- Status data transmitted over encrypted P2P connection -- Real-time streaming for remote watch mode - -### Status Protocol Integration - -Status access integrates seamlessly with fastn's remote access system: - -```rust -// Status is available as a built-in remote command -// When fastn-daemon receives status requests, fastn-context provides the data - -// Server side - automatic status command handling -// fastn-daemon automatically handles: -// - StatusRequest -> returns current Status -// - StatusStreamRequest -> returns real-time Status stream - -// Client side - transparent remote access -fastn status alice // Translates to fastn_p2p::client::call(alice, StatusRequest) -fastn status alice -w // Translates to fastn_p2p::client::connect(alice, StatusStreamProtocol) -``` - -This gives **comprehensive status access** - terminal, HTTP, streaming, and programmatic - all from the same underlying Status structure with rich ANSI formatting for human consumption. - -## System Metrics Monitoring - -fastn-context automatically monitors system resources and integrates them into the status display. - -### Automatic System Monitoring - -```rust -#[derive(Debug, Clone, serde::Serialize)] -pub struct SystemMetrics { - pub cpu_usage_percent: f32, // Current CPU usage - pub memory_used_bytes: u64, // RAM usage - pub memory_total_bytes: u64, // Total RAM - pub disk_used_bytes: u64, // Disk usage - pub disk_total_bytes: u64, // Total disk - pub network_rx_bytes_per_sec: u64, // Network receive rate - pub network_tx_bytes_per_sec: u64, // Network transmit rate - pub load_average: [f32; 3], // 1min, 5min, 15min load - pub uptime: std::time::Duration, // System uptime -} - -// Added to Status structure -pub struct Status { - pub system_metrics: SystemMetrics, // System resource usage - pub global_context: ContextStatus, - pub active_locks: Vec, - pub lock_waiters: Vec, - pub warnings: Vec, - pub timestamp: std::time::SystemTime, -} -``` - -### Efficient Metric Collection - -```rust -// System metrics cached and updated appropriately: -// - CPU usage: Updated every 1 second (smooth average) -// - Memory/disk: Updated every 5 seconds (less volatile) -// - Network rates: Updated every 1 second (calculated from deltas) -// - Load average: Updated every 10 seconds (system provides this) - -// Metrics only recalculated when status is actually requested -// No background polling unless someone is watching -``` - -### Enhanced Status Display with System Info - -``` -$ fastn status -fastn Status Dashboard -System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 -Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m - -Global Context (2h 15m 32s uptime, active 0.1s ago) -├── Remote Access Listener (1h 45m active, last activity 2.3s ago) -│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) -│ │ ├── stdout-handler (23m 12s running, CPU active) -│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) -│ │ └── stderr-stream (18m 45s running, idle 8.1s) -│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK -│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) -│ └── command-executor (8m 33s running, exit pending) -├── HTTP Proxy (2h 15m active, last request 0.8s ago) -│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) -│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) -└── Chat Service (35m active, last message 1.2s ago) - ├── presence-monitor (35m running, heartbeat 30s ago) - └── message-relay (35m running, processing queue) - -🔒 Active Locks (3): - - "session-output-lock" held by alice/stdout-handler (12.3s) ⚠️ LONG HELD - - "user-table-write-lock" held by user-service/db-writer (0.1s) - - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) - -⏳ Lock Waiters (1): - - alice/stderr-stream waiting for "session-output-lock" (8.1s) ⚠️ STUCK - -⚠️ Alerts: - - Long-held lock "session-output-lock" (12.3s) may indicate deadlock - - stderr-stream stuck waiting (8.1s) suggests blocked I/O - - CPU usage normal (12.3%), memory usage low (13%) -``` - -### Watch Mode (`fastn status -w`) - -```rust -// Event-driven updates - only when something changes -// No CPU overhead when system is idle -// Immediately shows when new contexts/locks appear or disappear - -$ fastn status -w -# Screen updates only when: -# - New context created/destroyed -# - Lock acquired/released -# - Significant activity changes -# - System metrics cross thresholds -# - No updates for days if system is stable -``` - -This provides **complete operational visibility** with both application-specific context trees and system resource monitoring, all with efficient event-driven updates instead of wasteful polling. \ No newline at end of file diff --git a/fastn-context/NEXT-counters.md b/fastn-context/NEXT-counters.md deleted file mode 100644 index f240c3135..000000000 --- a/fastn-context/NEXT-counters.md +++ /dev/null @@ -1,37 +0,0 @@ -# NEXT: Global Counter Storage System - -Features for persistent counter tracking with dotted path keys. - -## Counter Types - -- **Total counters** - Historical cumulative counts (persist after context drops) -- **Live counters** - Current active operations (auto-decremented on drop) - -## Global Storage - -```rust -// Dotted path keys in global HashMap -"global.connections" -> 1,247 -"global.remote-access.alice@bv478gen.commands" -> 45 -"global.http-proxy.requests" -> 1,013 -``` - -## API - -```rust -impl Context { - pub fn increment_total(&self, counter: &str); - pub fn increment_live(&self, counter: &str); - pub fn decrement_live(&self, counter: &str); - pub fn get_total(&self, counter: &str) -> u64; - pub fn get_live(&self, counter: &str) -> u64; -} -``` - -## Integration - -- fastn-p2p automatically tracks connection/request counters -- Counters survive context drops via global storage -- Hierarchical aggregation possible via path prefix matching - -**Implementation**: After basic Context + monitoring foundation. \ No newline at end of file diff --git a/fastn-context/NEXT-locks.md b/fastn-context/NEXT-locks.md deleted file mode 100644 index 9853624da..000000000 --- a/fastn-context/NEXT-locks.md +++ /dev/null @@ -1,32 +0,0 @@ -# NEXT: Named Locks and Deadlock Detection - -Features for named lock monitoring and deadlock detection. - -## Named Lock Types - -- `ContextMutex` - Named mutex with timing tracking -- `ContextRwLock` - Named read-write lock -- `ContextSemaphore` - Named semaphore with permit tracking - -## Lock Monitoring - -- Track who holds what locks and for how long -- Track who's waiting for locks and wait times -- Automatic deadlock risk detection -- Lock status in context tree display - -## Usage Pattern - -```rust -// Create named locks within context -let user_lock = ctx.mutex("user-data-lock", UserData::new()); - -// Lock operations automatically tracked -let guard = user_lock.lock().await; -// Shows in status: "HOLDS user-data-lock (2.3s)" - -// Waiting operations tracked -// Shows in status: "WAITING user-data-lock (5.1s) ⚠️ STUCK" -``` - -**Implementation**: After basic Context system + monitoring foundation. \ No newline at end of file diff --git a/fastn-context/NEXT-metrics-and-data.md b/fastn-context/NEXT-metrics-and-data.md deleted file mode 100644 index e4ba056e3..000000000 --- a/fastn-context/NEXT-metrics-and-data.md +++ /dev/null @@ -1,121 +0,0 @@ -# NEXT: Metrics and Data Storage - -Features for storing metrics and arbitrary data on contexts for debugging and monitoring. - -## Metric Storage - -```rust -impl Context { - /// Add metric data for status reporting - pub fn add_metric(&self, key: &str, value: MetricValue); -} - -#[derive(Debug, Clone)] -pub enum MetricValue { - Counter(u64), - Gauge(f64), - Duration(std::time::Duration), - Text(String), - Bytes(u64), -} -``` - -## Data Storage - -```rust -impl Context { - /// Store arbitrary data on this context - pub fn set_data(&self, key: &str, value: serde_json::Value); - - /// Get stored data - pub fn get_data(&self, key: &str) -> Option; -} -``` - -## Builder Pattern Integration - -```rust -impl ContextBuilder { - /// Add initial data to context - pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; - - /// Add initial metric to context - pub fn with_metric(self, key: &str, value: MetricValue) -> Self; -} -``` - -## Usage Examples - -```rust -// Add metrics during task execution -task_ctx.add_metric("commands_executed", MetricValue::Counter(cmd_count)); -task_ctx.add_metric("response_time", MetricValue::Duration(elapsed)); - -// Store debugging data -task_ctx.set_data("last_command", serde_json::Value::String("ls -la".to_string())); -task_ctx.set_data("exit_code", serde_json::Value::Number(0.into())); - -// Pre-configure context with builder -ctx.child("remote-shell-handler") - .with_data("peer", serde_json::Value::String(alice_id52)) - .with_data("shell", serde_json::Value::String("bash".to_string())) - .with_metric("commands_executed", MetricValue::Counter(0)) - .spawn(|task_ctx| async move { - // Task starts with pre-configured data and metrics - }); -``` - -## Automatic Request Tracking - -For contexts that call `.persist()`, automatic time-windowed counters will be maintained: - -```rust -// Automatic counters for persisted contexts (no manual tracking needed) -// Uses full dotted context path as key - -// When ctx.persist() is called on "global.p2p.alice@bv478gen.stream-123": -// Auto-increments these counters: -"global.p2p.alice@bv478gen.requests_since_start" // Total ever -"global.p2p.alice@bv478gen.requests_last_day" // Last 24 hours -"global.p2p.alice@bv478gen.requests_last_hour" // Last 60 minutes -"global.p2p.alice@bv478gen.requests_last_minute" // Last 60 seconds -"global.p2p.alice@bv478gen.requests_last_second" // Last 1 second - -// Hierarchical aggregation automatically available: -"global.p2p.requests_last_hour" // All P2P requests -"global.requests_last_hour" // All application requests -``` - -### Time Window Implementation - -```rust -// Sliding window counters with efficient circular buffers -// Updated automatically when any context calls persist() - -// Status display shows rates: -✅ global.p2p.alice@bv478gen (23m, active) - Requests: 1,247 total | 234 last hour | 45 last minute | 2/sec current - -// Automatic rate calculation and trending -``` - -### Usage Pattern - -```rust -// P2P stream handler -async fn handle_stream(ctx: Arc) { - // Process stream... - ctx.persist(); // Automatically increments all time window counters - - // No manual counter management needed! - // All metrics tracked automatically by dotted context path -} - -// HTTP request handler -async fn handle_request(ctx: Arc) { - // Process request... - ctx.persist(); // Auto-tracks "global.http.endpoint-xyz.requests_*" -} -``` - -**Implementation**: After basic Context + counter storage foundation. \ No newline at end of file diff --git a/fastn-context/NEXT-monitoring.md b/fastn-context/NEXT-monitoring.md deleted file mode 100644 index 644271d75..000000000 --- a/fastn-context/NEXT-monitoring.md +++ /dev/null @@ -1,34 +0,0 @@ -# NEXT: Comprehensive Monitoring System - -Features planned for future implementation after basic Context system is working. - -## Status Trees and Monitoring - -- Hierarchical status display with timing -- ANSI-formatted status output -- Event-driven status updates -- System metrics integration (CPU, RAM, disk, network) -- P2P status distribution (`fastn status `) - -## Counter Management - -- Global counter storage with dotted paths -- Total vs live counter tracking -- Automatic counter integration -- Hierarchical counter aggregation - -## Named Locks - -- ContextMutex, ContextRwLock, ContextSemaphore -- Deadlock detection and timing -- Lock status in monitoring tree -- Wait time tracking - -## Advanced Features - -- Builder pattern: `ctx.child("name").with_data().spawn()` -- Metric types and data storage -- HTTP status endpoints -- Status streaming API - -**Implementation**: After basic Context + fastn-p2p integration is complete. \ No newline at end of file diff --git a/fastn-context/NEXT-operation-tracking.md b/fastn-context/NEXT-operation-tracking.md deleted file mode 100644 index bfce50f23..000000000 --- a/fastn-context/NEXT-operation-tracking.md +++ /dev/null @@ -1,84 +0,0 @@ -# NEXT: Operation Tracking for Precise Debugging - -Features for tracking exactly where tasks are stuck using named await and select operations. - -## Named Await Operations - -```rust -/// Track what operation a context is waiting for -let result = fastn_context::await!("waiting-for-response", some_operation()); -// No .await suffix - the macro handles it - -// Examples -let data = fastn_context::await!("reading-file", std::fs::read("config.toml")); -let response = fastn_context::await!("http-request", client.get(url).send()); -let connection = fastn_context::await!("database-connect", db.connect()); -``` - -## Simple Select Tracking - -```rust -/// Track that context is stuck on select (no branch naming needed) -fastn_context::select! { - _ = task_ctx.wait() => println!("Cancelled"), - _ = stream.read() => println!("Data received"), - _ = database.query() => println!("Query complete"), -} -// Records: "stuck on select" -``` - -## Status Display with Operation Names - -``` -Global Context (2h 15m uptime) -├── Remote Access Listener -│ ├── alice@bv478gen (23m connected) -│ │ ├── stdout-handler (stuck on: "reading-stream" 12.3s) ⚠️ STUCK -│ │ └── stderr-stream (stuck on: "select" 8.1s) -│ └── bob@p2nd7avq (stuck on: "database-query" 0.2s) ✅ ACTIVE -└── HTTP Proxy (stuck on: "select" 0.1s) ✅ ACTIVE -``` - -## Design Principles - -### Single Operation per Context -- **Good design**: One await/select per context encourages proper task breakdown -- **Multiple selects**: Suggests need for child contexts instead - -```rust -// ❌ Complex - hard to debug where it's stuck -fastn_context::select! { /* 5 different operations */ } -fastn_context::select! { /* 3 more operations */ } - -// ✅ Clear - each operation has its own context -ctx.spawn_child("network-handler", |ctx| async move { - fastn_context::select! { /* network operations */ } -}); -ctx.spawn_child("database-handler", |ctx| async move { - let result = fastn_context::await!("user-query", db.get_user(id)); -}); -``` - -### Automatic Operation Tracking - -```rust -// Context automatically tracks current operation -pub struct Context { - current_operation: std::sync::Arc>>, - operation_started: std::sync::Arc>>, -} - -// Status can show: -// - What operation is running -// - How long it's been running -// - If it's stuck (running too long) -``` - -## Benefits - -1. **Precise debugging** - Know exactly where each task is stuck -2. **Performance insights** - See which operations take too long -3. **Design enforcement** - Encourages proper context decomposition -4. **Production monitoring** - Real-time operation visibility - -**Implementation**: After basic Context + monitoring system is complete. \ No newline at end of file diff --git a/fastn-context/NEXT-status-distribution.md b/fastn-context/NEXT-status-distribution.md deleted file mode 100644 index f44ecefb9..000000000 --- a/fastn-context/NEXT-status-distribution.md +++ /dev/null @@ -1,36 +0,0 @@ -# NEXT: P2P Status Distribution - -Features for distributed status monitoring over P2P network. - -## Remote Status Access - -```bash -# Remote machine status over P2P -fastn status alice # Status from machine with alias "alice" -fastn status alice -w # Watch remote machine in real-time -fastn status alice,bob,prod # Multiple machines -``` - -## P2P Integration - -- Uses secure fastn remote access (same as `fastn rshell`) -- Status transmitted over encrypted P2P connections -- Requires target machine in `remote-access/config.toml` -- Real-time streaming for watch mode - -## Protocol - -```rust -// Built-in status commands -StatusRequest -> Status // One-time snapshot -StatusStreamProtocol -> Stream // Real-time updates -``` - -## Benefits - -- **Distributed monitoring** - Monitor entire fastn network from any machine -- **Secure access** - Uses same permissions as remote shell -- **No HTTP servers** - Uses P2P infrastructure only -- **Real-time** - Event-driven updates across network - -**Implementation**: After P2P streaming API + basic status system. \ No newline at end of file diff --git a/fastn-context/README-FULL.md b/fastn-context/README-FULL.md deleted file mode 100644 index eb48306d9..000000000 --- a/fastn-context/README-FULL.md +++ /dev/null @@ -1,844 +0,0 @@ -# fastn-context: Hierarchical Application Context for Debugging and Operations - -This crate provides a hierarchical context system for fastn applications, enabling tree-based cancellation, metrics collection, and operational visibility. It forms the operational backbone for all fastn services. - -> **Note**: This README documents the complete design iteratively. Some sections may overlap as features build on each other. The design is internally consistent - later sections refine and extend earlier concepts. - -## Design Philosophy - -- **Hierarchical Structure**: Applications naturally form trees of operations -- **Automatic Inheritance**: Child contexts inherit cancellation and settings from parents -- **Zero Boilerplate**: Context trees build themselves as applications run -- **Production Ready**: Status trees enable debugging of stuck/slow operations -- **Bounded Complexity**: Simple spawn vs detailed child creation as needed - -## Core Concepts - -### Context Tree Structure - -Every fastn application forms a natural hierarchy: - -``` -Global Context (application level) -├── Service Context (e.g., "remote-access-listener") -│ ├── Session Context (e.g., "alice@bv478gen") -│ │ ├── Task Context (e.g., "stdout-handler") -│ │ └── Task Context (e.g., "stderr-stream") -│ └── Session Context (e.g., "bob@p2nd7avq") -├── Service Context (e.g., "http-proxy") -└── Service Context (e.g., "chat-service") -``` - -### Automatic Context Creation - -fastn-context integrates seamlessly with fastn ecosystem: - -```rust -// 1. Global context created by main macro -#[fastn_context::main] -async fn main() -> eyre::Result<()> { - // Global context automatically available -} - -// 2. Service contexts created by operations -let listener = fastn_p2p::server::listen(key, protocols).await?; -// Creates child context: "p2p-listener" under global - -// 3. Session contexts created per connection -// Each incoming connection gets child context: "session-{peer_id}" - -// 4. Task contexts created by spawn operations -session_ctx.child("shell-handler").spawn(handle_shell); -``` - -## API Reference - -### Core Context - -```rust -pub struct Context { - /// Context name for debugging/status - pub name: String, - - /// When this context was created - pub created_at: std::time::Instant, - - // Private: parent, children, cancellation, metrics, data -} - -impl Context { - /// Create new root context (typically only used by main macro) - pub fn new(name: &str) -> std::sync::Arc; - - /// Create child context with given name - pub fn child(&self, name: &str) -> ContextBuilder; - - /// Simple spawn (inherits current context, no child creation) - pub fn spawn(&self, task: F) -> tokio::task::JoinHandle - where F: std::future::Future + Send + 'static; - - /// Wait for cancellation signal - pub async fn wait(&self); - - /// Cancel this context and all children recursively - pub fn cancel(&self); - - /// Add metric data for status reporting - pub fn add_metric(&self, key: &str, value: MetricValue); - - /// Store arbitrary data on this context - pub fn set_data(&self, key: &str, value: serde_json::Value); - - /// Get stored data - pub fn get_data(&self, key: &str) -> Option; - - /// Increment total counter (historical count) - pub fn increment_total(&self, counter: &str); - - /// Increment live counter (current active count) - pub fn increment_live(&self, counter: &str); - - /// Decrement live counter (when operation completes) - pub fn decrement_live(&self, counter: &str); - - /// Get counter values - pub fn get_total(&self, counter: &str) -> u64; - pub fn get_live(&self, counter: &str) -> u64; -} -``` - -### Context Builder - -```rust -pub struct ContextBuilder { - // Pre-created child context ready for configuration -} - -impl ContextBuilder { - /// Add initial data to context - pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; - - /// Add initial metric to context - pub fn with_metric(self, key: &str, value: MetricValue) -> Self; - - /// Spawn task with this configured child context - pub fn spawn(self, task: F) -> tokio::task::JoinHandle - where F: FnOnce(std::sync::Arc) -> Fut + Send + 'static; -} -``` - -### Global Access - -```rust -/// Get the global application context -pub fn global() -> std::sync::Arc; - -/// Get current task's context (thread-local or task-local) -pub fn current() -> std::sync::Arc; - -/// Print status tree for debugging -pub fn status() -> StatusTree; -``` - -### Metric Types - -```rust -#[derive(Debug, Clone)] -pub enum MetricValue { - Counter(u64), - Gauge(f64), - Duration(std::time::Duration), - Text(String), - Bytes(u64), -} -``` - -## Usage Patterns - -### Simple Task Spawning - -```rust -// Inherit current context (no child creation) -let ctx = fastn_context::current(); -ctx.spawn(async { - // Simple background task -}); -``` - -### Detailed Task Spawning - -```rust -// Create child context with debugging info -ctx.child("remote-shell-handler") - .with_data("peer", alice_id52) - .with_data("shell", "bash") - .with_metric("commands_executed", 0) - .spawn(|task_ctx| async move { - // Task can update its own context - task_ctx.add_metric("commands_executed", cmd_count); - task_ctx.set_data("last_command", "ls -la"); - - // Task waits for its own cancellation - tokio::select! { - _ = task_ctx.wait() => { - println!("Shell handler cancelled"); - } - _ = handle_shell_session() => { - println!("Shell session completed"); - } - } - }); -``` - -### Status Tree Output - -``` -$ fastn status -Global Context (2h 15m 32s uptime) -├── Remote Access Listener (1h 45m active) -│ ├── alice@bv478gen (23m 12s, bash shell) -│ │ ├── stdout-handler (23m 12s, 15.2MB processed) -│ │ └── stderr-stream (18m 45s, 2.1KB processed) -│ └── bob@p2nd7avq (8m 33s, ls command) -│ └── command-executor (8m 33s, exit pending) -├── HTTP Proxy (2h 15m active) -│ ├── connection-pool (45 active, 1,234 requests) -│ └── request-handler-pool (12 workers active) -└── Chat Service (35m active) - ├── presence-monitor (35m, 15 users tracked) - └── message-relay (35m, 4,567 messages) -``` - -## Integration with fastn-p2p - -fastn-p2p depends on fastn-context and automatically creates context hierarchies: - -```rust -// fastn-p2p sessions provide access to their context -async fn handle_remote_shell(session: fastn_p2p::server::Session) { - let ctx = session.context(); // Auto-created by fastn-p2p - - // Simple spawn (inherits session context) - ctx.spawn(pipe_stdout(session.send)); - - // Detailed spawn (creates child for debugging) - ctx.child("command-executor") - .with_data("command", session.protocol.command) - .spawn(|task_ctx| async move { - let result = execute_command(&session.protocol.command).await; - task_ctx.set_data("exit_code", result.code); - }); -} -``` - -## Main Function Integration - -The main macro moves to fastn-context and sets up the global context: - -```rust -#[fastn_context::main] -async fn main() -> eyre::Result<()> { - // Global context automatically created and available - - let ctx = fastn_context::global(); - ctx.child("startup") - .with_data("version", env!("CARGO_PKG_VERSION")) - .spawn(|_| async { - // Application initialization - }); -} -``` - -## Design Benefits - -1. **Names Required for Debugging** - Every important operation has a name in status tree -2. **Selective Complexity** - Simple spawn vs detailed child creation as needed -3. **Automatic Tree Building** - Context hierarchy builds as application runs -4. **Production Debugging** - `fastn status` shows exactly where system is stuck -5. **Clean Separation** - Context concerns separate from networking concerns -6. **Ecosystem Wide** - All fastn crates can use the same context infrastructure - -**Key Insight**: Names aren't optional - they're essential for production debugging and operational visibility. - -## Comprehensive Timing and Lock Monitoring - -Every context and operation tracks detailed timing for real-time debugging, including named lock monitoring for deadlock detection. - -### Timing Integration - -```rust -pub struct Context { - pub name: String, - pub created_at: std::time::Instant, // When context started - pub last_activity: std::sync::Arc>, // Last activity - // ... other fields -} - -impl Context { - /// Update last activity timestamp (called automatically by operations) - pub fn touch(&self); - - /// Get how long this context has been alive - pub fn duration(&self) -> std::time::Duration; - - /// Get how long since last activity - pub fn idle_duration(&self) -> std::time::Duration; - - /// Create named mutex within this context - pub fn mutex(&self, name: &str, data: T) -> ContextMutex; - - /// Create named RwLock within this context - pub fn rwlock(&self, name: &str, data: T) -> ContextRwLock; - - /// Create named semaphore within this context - pub fn semaphore(&self, name: &str, permits: usize) -> ContextSemaphore; -} -``` - -### Named Lock Types - -```rust -pub struct ContextMutex { - name: String, - context: std::sync::Arc, - inner: tokio::sync::Mutex, -} - -impl ContextMutex { - /// Lock with automatic status tracking - pub async fn lock(&self) -> ContextMutexGuard; -} - -pub struct ContextMutexGuard { - acquired_at: std::time::Instant, // When lock was acquired - context_name: String, // Which context holds it - lock_name: String, // Lock identifier - // Auto-reports to context status system - // Auto-cleanup on drop -} -``` - -### Detailed Status Output with Comprehensive Timing - -``` -$ fastn status -Global Context (2h 15m 32s uptime, active 0.1s ago) -├── Remote Access Listener (1h 45m active, last activity 2.3s ago) -│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) -│ │ ├── stdout-handler (23m 12s running, CPU active) -│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) -│ │ └── stderr-stream (18m 45s running, idle 8.1s) -│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK -│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) -│ └── command-executor (8m 33s running, exit pending) -├── HTTP Proxy (2h 15m active, last request 0.8s ago) -│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) -│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) -└── Chat Service (35m active, last message 1.2s ago) - ├── presence-monitor (35m running, heartbeat 30s ago) - └── message-relay (35m running, processing queue) - -🔒 Active Locks (3): - - "session-output-lock" held by alice/stdout-handler (12.3s) ⚠️ LONG HELD - - "user-table-write-lock" held by user-service/db-writer (0.1s) - - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) - -⏳ Lock Waiters (1): - - alice/stderr-stream waiting for "session-output-lock" (8.1s) ⚠️ STUCK - -⚠️ Potential Issues: - - Long-held lock "session-output-lock" (12.3s) may indicate deadlock - - stderr-stream stuck waiting (8.1s) suggests blocked I/O -``` - -### Automatic Activity Tracking - -```rust -// All operations automatically maintain timing -ctx.spawn(async { - // ctx.touch() called when task starts - loop { - do_work().await; - ctx.touch(); // Update activity timestamp - } -}); - -// Lock operations update timing automatically -let guard = ctx.mutex("data-lock", data).lock().await; -// Updates: context last_activity, tracks lock hold time - -// Long operations should periodically touch -async fn long_running_task(ctx: std::sync::Arc) { - loop { - process_batch().await; - ctx.touch(); // Show we're still active, not stuck - - tokio::select! { - _ = ctx.wait() => break, // Cancelled - _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {} - } - } -} -``` - -This provides **real-time operational debugging** - administrators can instantly identify stuck operations, deadlocked tasks, and performance bottlenecks with precise timing information. - -## Counter Management System - -Every context can track both historical totals and live counts for detailed operational metrics. - -### Global Counter Storage with Dotted Paths - -```rust -pub struct Context { - pub name: String, - pub full_path: String, // "global.remote-access.alice@bv478gen.stdout-handler" - // ... other fields -} - -impl Context { - /// Get full dotted path for this context - pub fn path(&self) -> &str; - - /// Increment total counter (stored in global hashmap by full path) - pub fn increment_total(&self, counter: &str); - - /// Increment live counter (stored in global hashmap by full path) - pub fn increment_live(&self, counter: &str); - - /// Decrement live counter (stored in global hashmap by full path) - pub fn decrement_live(&self, counter: &str); - - /// Get counter values (retrieved from global storage) - pub fn get_total(&self, counter: &str) -> u64; - pub fn get_live(&self, counter: &str) -> u64; -} - -// Global counter storage (persists beyond context lifetimes) -static GLOBAL_COUNTERS: LazyLock>> = ...; - -// Counter keys format: "{context_path}.{counter_name}" -// Examples: -// "global.connections" -> 1,247 -// "global.remote-access.connections" -> 234 -// "global.remote-access.alice@bv478gen.commands" -> 45 -// "global.http-proxy.requests" -> 1,013 -``` - -### Automatic Counter Integration - -```rust -// fastn-p2p automatically maintains connection counters -async fn handle_incoming_connection(session: fastn_p2p::server::Session) { - let ctx = session.context(); - - // Automatically tracked by fastn-p2p: - ctx.increment_total("connections"); // Total connections ever - ctx.increment_live("connections"); // Current active connections - - // Your handler code... - - // When session ends: - ctx.decrement_live("connections"); // Automatically called -} - -// Custom counters for application logic -async fn handle_remote_command(session: server::Session) { - let ctx = session.context(); - - ctx.increment_total("commands"); // Total commands executed - ctx.increment_live("commands"); // Currently executing commands - - let result = execute_command(&session.protocol.command).await; - - ctx.decrement_live("commands"); // Command completed - - if result.success { - ctx.increment_total("successful_commands"); - } else { - ctx.increment_total("failed_commands"); - } -} -``` - -### Enhanced Status Display with Counters - -``` -$ fastn status -fastn Status Dashboard -System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 -Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m - -Global Context (2h 15m 32s uptime, active 0.1s ago) -├── Total: 1,247 connections, 15,432 requests | Live: 47 connections, 12 active requests -├── Remote Access Listener (1h 45m active, last activity 2.3s ago) -│ ├── Total: 234 connections, 2,156 commands | Live: 2 connections, 3 commands -│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) -│ │ ├── Total: 45 commands (42 success, 3 failed) | Live: 1 command -│ │ ├── stdout-handler (23m 12s running, CPU active) -│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) -│ │ └── stderr-stream (18m 45s running, idle 8.1s) -│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK -│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) -│ ├── Total: 12 commands (12 success) | Live: 1 command -│ └── command-executor (8m 33s running, exit pending) -├── HTTP Proxy (2h 15m active, last request 0.8s ago) -│ ├── Total: 1,013 requests (987 success, 26 failed) | Live: 45 connections, 8 requests -│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) -│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) -└── Chat Service (35m active, last message 1.2s ago) - ├── Total: 4,567 messages, 89 users joined | Live: 15 users, 3 conversations - ├── presence-monitor (35m running, heartbeat 30s ago) - └── message-relay (35m running, processing queue) - -🔒 Active Locks (3): ... -⏳ Lock Waiters (1): ... -``` - -### Counter Storage and Paths - -```rust -// Counter keys are automatically generated from context paths: - -// Global level counters -// "global.connections" -> 1,247 total -// "global.live_connections" -> 47 current - -// Service level counters -// "global.remote-access.connections" -> 234 total -// "global.remote-access.live_connections" -> 2 current - -// Session level counters -// "global.remote-access.alice@bv478gen.commands" -> 45 total -// "global.remote-access.alice@bv478gen.live_commands" -> 1 current - -// Task level counters -// "global.remote-access.alice@bv478gen.stdout-handler.bytes_processed" -> 1,234,567 - -// Examples in code: -async fn handle_connection(session: server::Session) { - let ctx = session.context(); // Path: "global.remote-access.alice@bv478gen" - - // These create global entries: - ctx.increment_total("commands"); // Key: "global.remote-access.alice@bv478gen.commands" - ctx.increment_live("commands"); // Key: "global.remote-access.alice@bv478gen.live_commands" - - // Nested task context - ctx.child("stdout-handler").spawn(|task_ctx| async move { - // task_ctx.path() -> "global.remote-access.alice@bv478gen.stdout-handler" - task_ctx.increment_total("bytes_processed"); - }); -} -``` - -### Persistent Counter Benefits - -- **✅ Survives context drops** - Counters stored globally, persist after contexts end -- **✅ Hierarchical aggregation** - Can sum all child counters for parent totals -- **✅ Path-based queries** - Easy to find counters by context path -- **✅ Historical tracking** - Total counters accumulate across all context instances -- **✅ Live tracking** - Live counters automatically decremented when contexts drop - -**Live counters** show current activity (auto-decremented on context drop). -**Total counters** show historical activity (persist forever for trending). -**Global storage** ensures metrics survive context lifecycles. - -## Status Monitoring and HTTP Dashboard - -fastn-context automatically provides multiple ways to access real-time status information for debugging and monitoring. - -### P2P Status Access - -```rust -#[fastn_context::main] -async fn main() -> eyre::Result<()> { - // Status automatically available over P2P for remote access - // No HTTP server needed - uses secure P2P connections - - // Your application code... -} -``` - -Status is accessible over the P2P network using the remote access system. - -### Status API Functions - -```rust -/// Get current status snapshot with ANSI formatting -pub fn status() -> Status; - -/// Stream of status updates (max once per second) -pub fn status_stream() -> impl futures_core::stream::Stream; - -/// Get raw status data as structured JSON -pub fn status_json() -> serde_json::Value; -``` - -### Status Type with ANSI Display - -```rust -#[derive(Debug, Clone, serde::Serialize)] -pub struct Status { - pub global_context: ContextStatus, - pub active_locks: Vec, - pub lock_waiters: Vec, - pub warnings: Vec, - pub timestamp: std::time::SystemTime, -} - -#[derive(Debug, Clone, serde::Serialize)] -pub struct ContextStatus { - pub name: String, - pub duration: std::time::Duration, - pub last_activity: std::time::Duration, // Time since last activity - pub children: Vec, - pub metrics: std::collections::HashMap, - pub data: std::collections::HashMap, - pub total_counters: std::collections::HashMap, // Historical counts - pub live_counters: std::collections::HashMap, // Current active counts -} - -#[derive(Debug, Clone, serde::Serialize)] -pub struct LockStatus { - pub name: String, - pub held_by_context: String, - pub held_duration: std::time::Duration, - pub lock_type: LockType, // Mutex, RwLock, Semaphore -} - -#[derive(Debug, Clone, serde::Serialize)] -pub struct StatusWarning { - pub message: String, - pub context_path: String, - pub severity: WarningSeverity, -} - -#[derive(Debug, Clone, serde::Serialize)] -pub enum WarningSeverity { - Info, // FYI information - Warning, // Potential issue - Critical, // Likely problem -} -``` - -### ANSI-Formatted Display - -```rust -impl std::fmt::Display for Status { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - use colored::*; // For ANSI colors - - // Header with timestamp - writeln!(f, "{}", "fastn Status Dashboard".bold().blue())?; - writeln!(f, "{}", format!("Snapshot: {}", - humantime::format_rfc3339(self.timestamp)).dimmed())?; - writeln!(f)?; - - // Context tree with colors and timing - self.display_context_tree(f, &self.global_context, 0)?; - - // Active locks section - if !self.active_locks.is_empty() { - writeln!(f, "\n{} Active Locks ({}):", "🔒".yellow(), self.active_locks.len())?; - for lock in &self.active_locks { - let duration_str = humantime::format_duration(lock.held_duration); - let color = if lock.held_duration.as_secs() > 10 { - "red" - } else { - "white" - }; - writeln!(f, " - \"{}\" held by {} ({})", - lock.name.cyan(), - lock.held_by_context.white(), - duration_str.color(color))?; - } - } - - // Lock waiters section - if !self.lock_waiters.is_empty() { - writeln!(f, "\n{} Lock Waiters ({}):", "⏳".yellow(), self.lock_waiters.len())?; - for waiter in &self.lock_waiters { - let duration_str = humantime::format_duration(waiter.waiting_duration); - writeln!(f, " - {} waiting for \"{}\" ({})", - waiter.context_name.white(), - waiter.lock_name.cyan(), - duration_str.red())?; - } - } - - // Warnings section - if !self.warnings.is_empty() { - writeln!(f, "\n{} Warnings:", "⚠️".red())?; - for warning in &self.warnings { - let icon = match warning.severity { - WarningSeverity::Info => "ℹ️", - WarningSeverity::Warning => "⚠️", - WarningSeverity::Critical => "🚨", - }; - writeln!(f, " {} {}", icon, warning.message.yellow())?; - } - } - - Ok(()) - } -} -``` - -### Status Stream (Event-Driven Updates) - -```rust -/// Stream provides updates only when context tree actually changes -/// No polling - efficient for long-running monitoring -let mut status_stream = fastn_context::status_stream(); -while let Some(status) = status_stream.next().await { - // Only prints when something actually changes - print!("\x1B[2J\x1B[H"); // Clear screen - println!("{}", status); // Display with colors -} -``` - -### CLI Integration with P2P Status Access - -fastn-context integrates with the main fastn CLI to provide both local and remote status access: - -```bash -# Local machine status -fastn status # One-time snapshot with ANSI colors -fastn status -w # Watch mode (event-driven, no polling) -fastn status --json # JSON output for programmatic use - -# Remote machine status over P2P (requires remote access) -fastn status alice # Status from machine with alias "alice" -fastn status bv478gen... # Status from machine with ID52 -fastn status alice -w # Watch remote machine's status in real-time -fastn status alice --json # Remote machine status as JSON - -# Multiple machines -fastn status alice,bob,prod # Status from multiple machines -``` - -**P2P Status Protocol:** -- Uses secure fastn remote access (same as `fastn rshell`) -- Requires target machine in your `remote-access/config.toml` -- Status data transmitted over encrypted P2P connection -- Real-time streaming for remote watch mode - -### Status Protocol Integration - -Status access integrates seamlessly with fastn's remote access system: - -```rust -// Status is available as a built-in remote command -// When fastn-daemon receives status requests, fastn-context provides the data - -// Server side - automatic status command handling -// fastn-daemon automatically handles: -// - StatusRequest -> returns current Status -// - StatusStreamRequest -> returns real-time Status stream - -// Client side - transparent remote access -fastn status alice // Translates to fastn_p2p::client::call(alice, StatusRequest) -fastn status alice -w // Translates to fastn_p2p::client::connect(alice, StatusStreamProtocol) -``` - -This gives **comprehensive status access** - terminal, HTTP, streaming, and programmatic - all from the same underlying Status structure with rich ANSI formatting for human consumption. - -## System Metrics Monitoring - -fastn-context automatically monitors system resources and integrates them into the status display. - -### Automatic System Monitoring - -```rust -#[derive(Debug, Clone, serde::Serialize)] -pub struct SystemMetrics { - pub cpu_usage_percent: f32, // Current CPU usage - pub memory_used_bytes: u64, // RAM usage - pub memory_total_bytes: u64, // Total RAM - pub disk_used_bytes: u64, // Disk usage - pub disk_total_bytes: u64, // Total disk - pub network_rx_bytes_per_sec: u64, // Network receive rate - pub network_tx_bytes_per_sec: u64, // Network transmit rate - pub load_average: [f32; 3], // 1min, 5min, 15min load - pub uptime: std::time::Duration, // System uptime -} - -// Added to Status structure -pub struct Status { - pub system_metrics: SystemMetrics, // System resource usage - pub global_context: ContextStatus, - pub active_locks: Vec, - pub lock_waiters: Vec, - pub warnings: Vec, - pub timestamp: std::time::SystemTime, -} -``` - -### Efficient Metric Collection - -```rust -// System metrics cached and updated appropriately: -// - CPU usage: Updated every 1 second (smooth average) -// - Memory/disk: Updated every 5 seconds (less volatile) -// - Network rates: Updated every 1 second (calculated from deltas) -// - Load average: Updated every 10 seconds (system provides this) - -// Metrics only recalculated when status is actually requested -// No background polling unless someone is watching -``` - -### Enhanced Status Display with System Info - -``` -$ fastn status -fastn Status Dashboard -System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 -Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m - -Global Context (2h 15m 32s uptime, active 0.1s ago) -├── Remote Access Listener (1h 45m active, last activity 2.3s ago) -│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) -│ │ ├── stdout-handler (23m 12s running, CPU active) -│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) -│ │ └── stderr-stream (18m 45s running, idle 8.1s) -│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK -│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) -│ └── command-executor (8m 33s running, exit pending) -├── HTTP Proxy (2h 15m active, last request 0.8s ago) -│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) -│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) -└── Chat Service (35m active, last message 1.2s ago) - ├── presence-monitor (35m running, heartbeat 30s ago) - └── message-relay (35m running, processing queue) - -🔒 Active Locks (3): - - "session-output-lock" held by alice/stdout-handler (12.3s) ⚠️ LONG HELD - - "user-table-write-lock" held by user-service/db-writer (0.1s) - - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) - -⏳ Lock Waiters (1): - - alice/stderr-stream waiting for "session-output-lock" (8.1s) ⚠️ STUCK - -⚠️ Alerts: - - Long-held lock "session-output-lock" (12.3s) may indicate deadlock - - stderr-stream stuck waiting (8.1s) suggests blocked I/O - - CPU usage normal (12.3%), memory usage low (13%) -``` - -### Watch Mode (`fastn status -w`) - -```rust -// Event-driven updates - only when something changes -// No CPU overhead when system is idle -// Immediately shows when new contexts/locks appear or disappear - -$ fastn status -w -# Screen updates only when: -# - New context created/destroyed -# - Lock acquired/released -# - Significant activity changes -# - System metrics cross thresholds -# - No updates for days if system is stable -``` - -This provides **complete operational visibility** with both application-specific context trees and system resource monitoring, all with efficient event-driven updates instead of wasteful polling. \ No newline at end of file diff --git a/fastn-context/README.md b/fastn-context/README.md deleted file mode 100644 index 156f95c46..000000000 --- a/fastn-context/README.md +++ /dev/null @@ -1,337 +0,0 @@ -# fastn-context: Hierarchical Application Context for Debugging and Operations - -This crate provides a hierarchical context system for fastn applications, enabling tree-based cancellation, metrics collection, and operational visibility. It forms the operational backbone for all fastn services. - -## Design Philosophy - -- **Hierarchical Structure**: Applications naturally form trees of operations -- **Automatic Inheritance**: Child contexts inherit cancellation and settings from parents -- **Zero Boilerplate**: Context trees build themselves as applications run -- **Production Ready**: Status trees enable debugging of stuck/slow operations -- **Bounded Complexity**: Simple spawn vs detailed child creation as needed - -## Core Concepts - -### Context Tree Structure - -Every fastn application forms a natural hierarchy: - -``` -Global Context (application level) -├── Service Context (e.g., "remote-access-listener") -│ ├── Session Context (e.g., "alice@bv478gen") -│ │ ├── Task Context (e.g., "stdout-handler") -│ │ └── Task Context (e.g., "stderr-stream") -│ └── Session Context (e.g., "bob@p2nd7avq") -├── Service Context (e.g., "http-proxy") -└── Service Context (e.g., "chat-service") -``` - -### Automatic Context Creation - -fastn-context integrates seamlessly with fastn ecosystem: - -```rust -// 1. Global context created by main macro -#[fastn_context::main] -async fn main() -> eyre::Result<()> { - // Global context automatically available -} - -// 2. Service contexts created by operations -let listener = fastn_p2p::server::listen(key, protocols).await?; -// Creates child context: "p2p-listener" under global - -// 3. Session contexts created per connection -// Each incoming connection gets child context: "session-{peer_id}" - -// 4. Task contexts created by spawn operations -session_ctx.child("shell-handler").spawn(handle_shell); -``` - -## API Reference - -### Core Context - -```rust -pub struct Context { - /// Context name for debugging/status - pub name: String, - - // Private: parent, children, cancellation_token -} - -impl Context { - /// Create new root context (typically only used by main macro) - pub fn new(name: &str) -> std::sync::Arc; - - /// Create child context with given name - pub fn child(&self, name: &str) -> ContextBuilder; - - /// Simple spawn (inherits current context, no child creation) - pub fn spawn(&self, task: F) -> tokio::task::JoinHandle - where F: std::future::Future + Send + 'static; - - /// Spawn task with named child context (common case shortcut) - pub fn spawn_child(&self, name: &str, task: F) -> tokio::task::JoinHandle - where - F: FnOnce(std::sync::Arc) -> Fut + Send + 'static, - Fut: std::future::Future + Send + 'static; - - /// Wait for cancellation signal (returns Future for tokio::select!) - pub fn cancelled(&self) -> tokio_util::sync::WaitForCancellationFuture<'_>; - - /// Cancel this context and all children recursively - pub fn cancel(&self); - -} -``` - -### Context Builder - -```rust -pub struct ContextBuilder { - // Pre-created child context ready for spawning -} - -impl ContextBuilder { - /// Spawn task with this child context - pub fn spawn(self, task: F) -> tokio::task::JoinHandle - where F: FnOnce(std::sync::Arc) -> Fut + Send + 'static; -} -``` - -#### Global Access - -```rust -/// Get the global application context -pub fn global() -> std::sync::Arc; -``` - -### Status Display - -```rust -/// Get current status snapshot of entire context tree -pub fn status() -> Status; - -/// Get status including recent completed contexts (distributed tracing) -pub fn status_with_latest() -> Status; - -#[derive(Debug, Clone)] -pub struct Status { - pub global_context: ContextStatus, - pub persisted_contexts: Option>, // Recent completed contexts - pub timestamp: std::time::SystemTime, -} - -#[derive(Debug, Clone)] -pub struct ContextStatus { - pub name: String, - pub is_cancelled: bool, - pub duration: std::time::Duration, - pub children: Vec, -} - -#[derive(Debug, Clone)] -pub struct PersistedContext { - pub name: String, - pub context_path: String, - pub duration: std::time::Duration, - pub completion_time: std::time::SystemTime, - pub success: bool, - pub message: String, -} - -impl std::fmt::Display for Status { - // ANSI-formatted display with tree structure and status icons -} -``` - - -## Usage Patterns - -### Simple Task Spawning - -```rust -// Simple named task spawning (common case) -let ctx = fastn_context::global(); // or passed as parameter -ctx.spawn_child("background-task", |task_ctx| async move { - // Simple background task with explicit context - println!("Running in context: {}", task_ctx.name); -}); - -// Alternative: builder pattern for simple case -ctx.child("background-task") - .spawn(|task_ctx| async move { - // Same result, different syntax - println!("Running in context: {}", task_ctx.name); - }); -``` - -### Status Monitoring - -```rust -// Get current context tree status -let status = fastn_context::status(); -println!("{}", status); - -// Example output: -// fastn Context Status -// ✅ global (2h 15m, active) -// ✅ remote-access-listener (1h 45m, active) -// ✅ alice@bv478gen (23m, active) -// ✅ stdout-handler (23m, active) -// ✅ startup-task (2h 15m, active) - -// With recent completed contexts: -let status = fastn_context::status_with_latest(); -// Shows live contexts + recent completed ones -``` - -### Context Persistence (Distributed Tracing) - -```rust -// P2P stream handler example -async fn handle_p2p_stream(session: Session) { - let ctx = session.context(); // "global.p2p.alice@bv478gen.stream-456" - - let result = process_stream().await; - - // Persist completed context for tracing - ctx.complete_with_status(result.is_ok(), &format!("Processed {} bytes", result.bytes)); - // -> Logs trace, adds to circular buffer, sends to external systems -} - -// HTTP request handler example -async fn handle_http_request(request: HttpRequest) { - let ctx = request.context(); // "global.http.request-789" - - let result = process_request().await; - - // Persist with completion info - ctx.complete_with_status(result.status.is_success(), &result.summary); -} -``` - -### Enhanced Status Display - -``` -$ fastn status --include-latest -✅ global (2h 15m, active) - ✅ p2p-listener (1h 45m, active) - ✅ alice@bv478gen (23m, active, 3 live streams) - -Recent completed contexts (last 10): -- global.p2p.alice@bv478gen.stream-455 (2.3s, success: "Processed 1.2MB") -- global.p2p.bob@p2nd7avq.stream-454 (1.1s, success: "Processed 512KB") -- global.http.request-123 (0.8s, failed: "Database timeout") -- global.p2p.alice@bv478gen.stream-453 (4.1s, success: "Processed 2.1MB") -``` - -### Cancellation Handling - -```rust -// Task waits for context cancellation (proper select pattern) -ctx.spawn_child("shell-handler", |task_ctx| async move { - println!("Shell handler starting: {}", task_ctx.name); - - // Proper cancellation in select (non-blocking) - tokio::select! { - _ = task_ctx.cancelled() => { - println!("Shell handler cancelled"); - } - result = handle_shell_session() => { - println!("Shell session completed: {:?}", result); - } - data = connection.accept() => { - println!("Got connection: {:?}", data); - } - } -}); -``` - -## Integration with fastn-p2p - -fastn-p2p depends on fastn-context and automatically creates context hierarchies: - -```rust -// fastn-p2p sessions provide access to their context -async fn handle_remote_shell(session: fastn_p2p::server::Session) { - let ctx = session.context(); // Auto-created by fastn-p2p - - // Simple spawn (inherits session context) - ctx.spawn(pipe_stdout(session.send)); - - // Named child spawn for debugging - ctx.spawn_child("command-executor", |task_ctx| async move { - println!("Executing command in context: {}", task_ctx.name); - let result = execute_command(&session.protocol.command).await; - println!("Command completed with: {:?}", result); - }); -} -``` - -## Main Function Integration - -The main macro sets up the global context and provides comprehensive configuration: - -```rust -#[fastn_context::main] -async fn main() -> eyre::Result<()> { - // Global context automatically created and available - - let ctx = fastn_context::global(); - ctx.spawn_child("startup", |startup_ctx| async move { - println!("Application starting: {}", startup_ctx.name); - // Application initialization - }); -} -``` - -### Configuration Options - -```rust -#[fastn_context::main( - // Logging configuration - logging = true, // Default: true - simple logging setup - - // Shutdown behavior - shutdown_mode = "single_ctrl_c", // Default: "single_ctrl_c" - shutdown_timeout = "30s", // Default: "30s" - graceful shutdown timeout - - // Double Ctrl+C specific (only when shutdown_mode = "double_ctrl_c") - double_ctrl_c_window = "2s", // Default: "2s" - time window for second Ctrl+C - status_fn = my_status_printer, // Required for double_ctrl_c mode -)] -async fn main() -> eyre::Result<()> { - // Your application code -} - -// Status function (required for double_ctrl_c mode) -async fn my_status_printer() { - println!("=== Application Status ==="); - // Custom status logic - access global registries, counters, etc. - println!("Active services: {}", get_active_service_count()); -} -``` - -## Design Benefits - -1. **Names Required for Debugging** - Every important operation has a name in status tree -2. **Selective Complexity** - Simple spawn vs detailed child creation as needed -3. **Automatic Tree Building** - Context hierarchy builds as application runs -4. **Production Debugging** - Status trees show exactly where system is stuck -5. **Clean Separation** - Context concerns separate from networking concerns -6. **Ecosystem Wide** - All fastn crates can use the same context infrastructure - -**Key Insight**: Names aren't optional - they're essential for production debugging and operational visibility. - -## Future Features - -See NEXT-*.md files for planned enhancements: - -- **NEXT-metrics-and-data.md**: Metric storage and arbitrary data on contexts -- **NEXT-monitoring.md**: Status trees, timing, system metrics monitoring -- **NEXT-locks.md**: Named locks and deadlock detection -- **NEXT-counters.md**: Global counter storage with dotted paths -- **NEXT-status-distribution.md**: P2P distributed status access diff --git a/fastn-context/examples/minimal_test.rs b/fastn-context/examples/minimal_test.rs deleted file mode 100644 index 6a6bf7c77..000000000 --- a/fastn-context/examples/minimal_test.rs +++ /dev/null @@ -1,68 +0,0 @@ -/// Test the minimal fastn-context API needed for fastn-p2p integration -/// This validates our basic Context design before implementation - -#[fastn_context::main] -async fn main() -> Result<(), Box> { - println!("Testing minimal fastn-context API..."); - - // Global context should be automatically available - let global_ctx = fastn_context::global(); - println!("Global context created: {}", global_ctx.name); - - // Test basic child creation with builder - global_ctx - .child("test-service") - .spawn(|service_ctx| async move { - println!("Service context created: {}", service_ctx.name); - - // Test cancellation with proper select pattern - tokio::select! { - _ = service_ctx.cancelled() => { - println!("Service context cancelled"); - } - _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => { - println!("Service context completed"); - } - } - }); - - // Test global context functionality - println!("Global context is cancelled: {}", global_ctx.is_cancelled()); - - // Give tasks time to run and build tree - tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; - - // Test status display - println!("\n=== Context Tree Status ==="); - let status = fastn_context::status(); - println!("{}", status); - - // Test persistence functionality - global_ctx.spawn_child("persist-test", |task_ctx| async move { - tokio::time::sleep(tokio::time::Duration::from_millis(30)).await; - task_ctx.persist(); - }); - - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - - // Test status with persisted contexts - println!("\n=== Status with Persisted Contexts ==="); - let status_with_latest = fastn_context::status_with_latest(); - println!("{}", status_with_latest); - - // Test persistence functionality - global_ctx.spawn_child("persist-test", |task_ctx| async move { - tokio::time::sleep(tokio::time::Duration::from_millis(30)).await; - task_ctx.persist(); - }); - - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - - // Test status with persisted contexts - println!("\n=== Status with Persisted Contexts ==="); - let status_with_latest = fastn_context::status_with_latest(); - println!("{}", status_with_latest); - - println!("Basic API test completed!"); - Ok(()) -} diff --git a/fastn-context/src/context.rs b/fastn-context/src/context.rs deleted file mode 100644 index b03421e5e..000000000 --- a/fastn-context/src/context.rs +++ /dev/null @@ -1,158 +0,0 @@ -/// Hierarchical context for task management and cancellation -pub struct Context { - /// Context name for debugging - pub name: String, - - /// When this context was created - pub created_at: std::time::Instant, - - /// Parent context (None for root) - parent: Option>, - - /// Child contexts - children: std::sync::Arc>>>, - - /// Cancellation token (proper async cancellation) - cancellation_token: tokio_util::sync::CancellationToken, -} - -impl Context { - /// Create new root context (typically only used by main macro) - pub fn new(name: &str) -> std::sync::Arc { - std::sync::Arc::new(Context { - name: name.to_string(), - created_at: std::time::Instant::now(), - parent: None, - children: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())), - cancellation_token: tokio_util::sync::CancellationToken::new(), - }) - } - - /// Create child context - pub fn child(&self, name: &str) -> ContextBuilder { - let child_context = std::sync::Arc::new(Context { - name: name.to_string(), - created_at: std::time::Instant::now(), - parent: Some(std::sync::Arc::new(self.clone())), - children: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())), - cancellation_token: self.cancellation_token.child_token(), - }); - - // Add to parent's children list - if let Ok(mut children) = self.children.lock() { - children.push(child_context.clone()); - } - - ContextBuilder { - context: child_context, - } - } - - /// Simple spawn (inherits current context, no child creation) - pub fn spawn(&self, task: F) -> tokio::task::JoinHandle - where - F: std::future::Future + Send + 'static, - F::Output: Send + 'static, - { - tokio::spawn(task) - } - - /// Spawn task with named child context (common case shortcut) - pub fn spawn_child(&self, name: &str, task: F) -> tokio::task::JoinHandle - where - F: FnOnce(std::sync::Arc) -> Fut + Send + 'static, - Fut: std::future::Future + Send + 'static, - Fut::Output: Send + 'static, - { - let child_ctx = self.child(name); - child_ctx.spawn(task) - } - - /// Wait for cancellation signal (for use in tokio::select!) - pub async fn wait(&self) { - // Poll-based future that completes when cancelled - loop { - if self.is_cancelled() { - return; - } - // Yield to allow other tasks to run, then check again - tokio::task::yield_now().await; - } - } - - /// Wait for cancellation signal (returns proper Future for tokio::select!) - pub fn cancelled(&self) -> tokio_util::sync::WaitForCancellationFuture<'_> { - self.cancellation_token.cancelled() - } - - /// Check if this context is cancelled - pub fn is_cancelled(&self) -> bool { - self.cancellation_token.is_cancelled() - } - - /// Cancel this context and all children recursively - pub fn cancel(&self) { - self.cancellation_token.cancel(); - } - - /// Mark this context for persistence (distributed tracing) - pub fn persist(&self) { - let context_status = self.status(); - crate::status::add_persisted_context(context_status); - } - - /// Get status information for this context and all children - pub fn status(&self) -> crate::status::ContextStatus { - let children = if let Ok(children_lock) = self.children.lock() { - children_lock.iter().map(|child| child.status()).collect() - } else { - Vec::new() - }; - - crate::status::ContextStatus { - name: self.name.clone(), - is_cancelled: self.is_cancelled(), - duration: self.created_at.elapsed(), - children, - } - } -} - -impl Clone for Context { - fn clone(&self) -> Self { - Context { - name: self.name.clone(), - created_at: self.created_at, - parent: self.parent.clone(), - children: self.children.clone(), - cancellation_token: self.cancellation_token.clone(), - } - } -} - -/// Builder for configuring child contexts before spawning -pub struct ContextBuilder { - pub(crate) context: std::sync::Arc, -} - -impl ContextBuilder { - /// Spawn task with this child context - pub fn spawn(self, task: F) -> tokio::task::JoinHandle - where - F: FnOnce(std::sync::Arc) -> Fut + Send + 'static, - Fut: std::future::Future + Send + 'static, - Fut::Output: Send + 'static, - { - let context = self.context; - tokio::spawn(async move { task(context).await }) - } -} - -/// Global context storage -static GLOBAL_CONTEXT: std::sync::LazyLock> = - std::sync::LazyLock::new(|| Context::new("global")); - -/// Get the global application context -pub fn global() -> std::sync::Arc { - GLOBAL_CONTEXT.clone() -} diff --git a/fastn-context/src/lib.rs b/fastn-context/src/lib.rs deleted file mode 100644 index e49a69954..000000000 --- a/fastn-context/src/lib.rs +++ /dev/null @@ -1,14 +0,0 @@ -#![warn(unused_extern_crates)] -#![deny(unused_crate_dependencies)] - -use tokio as _; // used by main macro -use tokio_util as _; // used for cancellation tokens - -mod context; -mod status; - -pub use context::{Context, ContextBuilder, global}; -pub use status::{ContextStatus, Status, status, status_with_latest}; - -// Re-export main macro -pub use fastn_context_macros::main; diff --git a/fastn-context/src/status.rs b/fastn-context/src/status.rs deleted file mode 100644 index 17fb73e74..000000000 --- a/fastn-context/src/status.rs +++ /dev/null @@ -1,145 +0,0 @@ -/// Status snapshot of the context tree -#[derive(Debug, Clone)] -pub struct Status { - pub global_context: ContextStatus, - pub persisted_contexts: Option>, - pub timestamp: std::time::SystemTime, -} - -/// Status information for a single context -#[derive(Debug, Clone)] -pub struct ContextStatus { - pub name: String, - pub is_cancelled: bool, - pub duration: std::time::Duration, - pub children: Vec, -} - -/// Global storage for persisted contexts (circular buffer) -static PERSISTED_CONTEXTS: std::sync::LazyLock< - std::sync::RwLock>, -> = std::sync::LazyLock::new(|| std::sync::RwLock::new(std::collections::VecDeque::new())); - -/// Maximum number of persisted contexts to keep (configurable via env) -const MAX_PERSISTED_CONTEXTS: usize = 10; // TODO: Make configurable via env var - -/// Add a context to the persisted contexts circular buffer -pub fn add_persisted_context(context_status: ContextStatus) { - if let Ok(mut contexts) = PERSISTED_CONTEXTS.write() { - // Add to front - contexts.push_front(context_status.clone()); - - // Keep only max number - if contexts.len() > MAX_PERSISTED_CONTEXTS { - contexts.pop_back(); - } - } - - // Log as trace event - println!( - "TRACE: {} completed in {:?}", - context_status.name, context_status.duration - ); -} - -/// Get current status snapshot of entire context tree -pub fn status() -> Status { - Status { - global_context: crate::context::global().status(), - persisted_contexts: None, - timestamp: std::time::SystemTime::now(), - } -} - -/// Get status including recent completed contexts (distributed tracing) -pub fn status_with_latest() -> Status { - let persisted = if let Ok(contexts) = PERSISTED_CONTEXTS.read() { - Some(contexts.iter().cloned().collect()) - } else { - None - }; - - Status { - global_context: crate::context::global().status(), - persisted_contexts: persisted, - timestamp: std::time::SystemTime::now(), - } -} - -impl std::fmt::Display for Status { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!(f, "fastn Context Status")?; - writeln!(f, "Snapshot: {:?}", self.timestamp)?; - writeln!(f)?; - - Self::display_context(&self.global_context, f, 0)?; - - // Show persisted contexts if included - if let Some(persisted) = &self.persisted_contexts - && !persisted.is_empty() - { - writeln!(f, "\nRecent completed contexts (last {}):", persisted.len())?; - for ctx in persisted { - let duration_str = if ctx.duration.as_secs() > 60 { - format!( - "{}m {}s", - ctx.duration.as_secs() / 60, - ctx.duration.as_secs() % 60 - ) - } else { - format!("{:.1}s", ctx.duration.as_secs_f64()) - }; - - let status_str = if ctx.is_cancelled { - "cancelled" - } else { - "completed" - }; - writeln!(f, "- {} ({}, {})", ctx.name, duration_str, status_str)?; - } - } - - Ok(()) - } -} - -impl Status { - fn display_context( - ctx: &ContextStatus, - f: &mut std::fmt::Formatter<'_>, - depth: usize, - ) -> std::fmt::Result { - let indent = " ".repeat(depth); - let status_icon = if ctx.is_cancelled { "❌" } else { "✅" }; - - let duration_str = if ctx.duration.as_secs() > 60 { - format!( - "{}m {}s", - ctx.duration.as_secs() / 60, - ctx.duration.as_secs() % 60 - ) - } else { - format!("{:.1}s", ctx.duration.as_secs_f64()) - }; - - writeln!( - f, - "{}{} {} ({}, {})", - indent, - status_icon, - ctx.name, - duration_str, - if ctx.is_cancelled { - "cancelled" - } else { - "active" - } - )?; - - for child in &ctx.children { - Self::display_context(child, f, depth + 1)?; - } - - Ok(()) - } -} diff --git a/fastn-core/src/library2022/processor/http.rs b/fastn-core/src/library2022/processor/http.rs index d6fe68200..501229220 100644 --- a/fastn-core/src/library2022/processor/http.rs +++ b/fastn-core/src/library2022/processor/http.rs @@ -112,20 +112,16 @@ pub async fn process( req_config.config.package.clone() }; - let (mut url, mountpoint, mut conf) = { - let (mut url, mountpoint, conf) = - fastn_core::config::utils::get_clean_url(&package, req_config, url.as_str()) - .await - .map_err(|e| ftd::interpreter::Error::ParseError { - message: format!("invalid url: {e:?}"), - doc_id: doc.name.to_string(), - line_number, - })?; - if !req_config.request.query_string().is_empty() { - url.set_query(Some(req_config.request.query_string())); - } - (url, mountpoint, conf) - }; + let (mut url, mountpoint, mut conf) = + fastn_core::config::utils::get_clean_url(&package, req_config, url.as_str()) + .await + .map_err(|e| ftd::interpreter::Error::ParseError { + message: format!("invalid url: {e:?}"), + doc_id: doc.name.to_string(), + line_number, + })?; + + tracing::info!(?url); let mut body = serde_json::Map::new(); for header in headers.0 { diff --git a/fastn/Cargo.toml b/fastn/Cargo.toml index 81c19886e..8b3eed62b 100644 --- a/fastn/Cargo.toml +++ b/fastn/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fastn" -version = "0.4.112" +version = "0.4.113" authors.workspace = true edition.workspace = true license.workspace = true diff --git a/flake.lock b/flake.lock index f9557d4a1..055c14348 100644 --- a/flake.lock +++ b/flake.lock @@ -20,10 +20,12 @@ }, "nixpkgs": { "locked": { - "lastModified": 0, - "narHash": "sha256-ZHSasdLwEEjSOD/WTW1o7dr3/EjuYsdwYB4NSgICZ2I=", - "path": "/nix/store/zi50l9z7jjfv92nr6m12czq5qcrrmqdr-source", - "type": "path" + "lastModified": 1757746433, + "narHash": "sha256-fEvTiU4s9lWgW7mYEU/1QUPirgkn+odUBTaindgiziY=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "6d7ec06d6868ac6d94c371458fc2391ded9ff13d", + "type": "github" }, "original": { "id": "nixpkgs", @@ -44,11 +46,11 @@ ] }, "locked": { - "lastModified": 1750991972, - "narHash": "sha256-jzadGZL1MtqmHb5AZcjZhHpNulOdMZPxf8Wifg8e5VA=", + "lastModified": 1757903816, + "narHash": "sha256-bVi6V/HZtUedmLPM5OP/tYhwi6G7FIyFH6+/EFb7qGo=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "b6509555d8ffaa0727f998af6ace901c5b78dc26", + "rev": "11a559c0baf1019bde7bbf0363a22db978be4363", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index 666da4e91..4ef85843c 100644 --- a/flake.nix +++ b/flake.nix @@ -28,7 +28,8 @@ openssl.dev diesel-cli rust-analyzer-unwrapped - ] ++ lib.optionals stdenv.isDarwin [ darwin.apple_sdk.frameworks.Foundation ]; + git + ] ++ lib.optionals stdenv.isDarwin [ ]; shellHook = '' export PATH="$PATH:$HOME/.cargo/bin" diff --git a/v0.5/Cargo.lock b/v0.5/Cargo.lock index d117d3e11..ca74d79e3 100644 --- a/v0.5/Cargo.lock +++ b/v0.5/Cargo.lock @@ -436,6 +436,17 @@ dependencies = [ "url", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi 0.1.19", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -1974,6 +1985,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "fastn-context" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb4d708d15b187e30d12f75dc5aedc4691074f056f12b3536a69effcea9dee4" +dependencies = [ + "fastn-context-macros", + "tokio", + "tokio-util", +] + +[[package]] +name = "fastn-context-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee801bf528c8d05900d55f1e3f3d6e3d04cf9286c876515c2fbaadda63315c88" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "fastn-continuation" version = "0.1.0" @@ -2117,6 +2150,7 @@ dependencies = [ "async-stream", "enum-display-derive", "eyre", + "fastn-context", "fastn-id52", "fastn-net", "fastn-p2p-macros", @@ -2149,8 +2183,12 @@ dependencies = [ name = "fastn-p2p-test" version = "0.1.0" dependencies = [ + "atty", "chrono", + "clap", + "colored", "eyre", + "fastn-context", "fastn-id52", "fastn-net", "fastn-p2p", @@ -2782,6 +2820,15 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + [[package]] name = "hermit-abi" version = "0.5.2" @@ -4322,7 +4369,7 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" dependencies = [ - "hermit-abi", + "hermit-abi 0.5.2", "libc", ] @@ -4788,7 +4835,7 @@ checksum = "b5bd19146350fe804f7cb2669c851c03d69da628803dab0d98018142aaa5d829" dependencies = [ "cfg-if", "concurrent-queue", - "hermit-abi", + "hermit-abi 0.5.2", "pin-project-lite", "rustix 1.0.8", "windows-sys 0.60.2", diff --git a/v0.5/Cargo.toml b/v0.5/Cargo.toml index 293b6db78..c1421cb88 100644 --- a/v0.5/Cargo.toml +++ b/v0.5/Cargo.toml @@ -97,6 +97,7 @@ fastn-utils = { path = "fastn-utils" } ed25519-dalek = { version = "2.1.1", features = ["rand_core"] } data-encoding = "2" eyre = "0.6" +fastn-context = "0.1" ft-sys-shared = { version = "0.2.1", features = ["rusqlite", "host-only"] } futures-util = { version = "0.3", default-features = false, features = ["std"] } futures-core = "0.3.31" diff --git a/v0.5/fastn-p2p-test/Cargo.toml b/v0.5/fastn-p2p-test/Cargo.toml index 8b0f11ab8..35b946043 100644 --- a/v0.5/fastn-p2p-test/Cargo.toml +++ b/v0.5/fastn-p2p-test/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" fastn-p2p = { path = "../fastn-p2p" } fastn-net = { path = "../fastn-net" } fastn-id52 = { path = "../fastn-id52" } +fastn-context.workspace = true tokio = { version = "1", features = ["full"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } @@ -15,15 +16,27 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" futures-util = "0.3" chrono = { version = "0.4", features = ["serde"] } +clap = { version = "4", features = ["derive"] } +colored = "3" +atty = "0.2" [dev-dependencies] tempfile = "3" [[bin]] name = "p2p_sender" -path = "src/sender.rs" +path = "src/bin/sender.rs" [[bin]] name = "p2p_receiver" -path = "src/receiver.rs" +path = "src/bin/receiver.rs" + +# Remote Shell Demo - High-quality reference implementations +[[bin]] +name = "remote-shell-daemon" +path = "src/bin/remote_shell/daemon.rs" + +[[bin]] +name = "remote-shell-client" +path = "src/bin/remote_shell/client.rs" diff --git a/v0.5/fastn-p2p-test/README.md b/v0.5/fastn-p2p-test/README.md new file mode 100644 index 000000000..0222f796a --- /dev/null +++ b/v0.5/fastn-p2p-test/README.md @@ -0,0 +1,251 @@ +# fastn-p2p-test + +High-quality reference implementations and testing utilities for the fastn-p2p library. + +## Overview + +This crate provides comprehensive examples of how to build P2P applications using fastn-p2p. All code is written to demonstrate best practices in: + +- **API Design**: Type-safe protocols with strong error handling +- **Documentation**: Comprehensive docs with examples and rationale +- **Code Organization**: Clear module structure and separation of concerns +- **Security**: Security considerations and safe defaults +- **Testing**: Unit tests and integration examples +- **User Experience**: Helpful error messages and CLI design + +## Reference Implementations + +### Remote Shell System + +A complete P2P remote shell implementation demonstrating request/response patterns: + +- **`remote-shell-daemon`**: Server that accepts P2P connections and executes commands +- **`remote-shell-client`**: Client that connects to remote daemons to execute commands + +#### Quick Start + +1. **Start the daemon** on the target machine: + ```bash + cargo run --bin remote-shell-daemon + ``` + + This will generate a new private key and display the ID52 for clients to connect to: + ``` + 🔑 Generated new private key + Your ID52: 12D3KooWABC123...xyz + ``` + +2. **Execute commands** from the client machine: + ```bash + # Replace 12D3K...xyz with the daemon's ID52 + cargo run --bin remote-shell-client 12D3KooWABC123...xyz whoami + cargo run --bin remote-shell-client 12D3KooWABC123...xyz ls -la + cargo run --bin remote-shell-client 12D3KooWABC123...xyz "echo 'Hello World'" + ``` + +#### Security Warning + +⚠️ **SECURITY WARNING** ⚠️ + +The remote shell implementation allows **FULL COMMAND EXECUTION** from remote machines. Only use this with completely trusted systems. In production, implement additional security measures: + +- Command whitelisting/blacklisting +- User privilege separation +- Resource limits and sandboxing +- Authentication and authorization +- Audit logging and monitoring + +#### Architecture + +The remote shell system demonstrates several fastn-p2p patterns: + +- **Protocol Definition**: Type-safe enum protocols with Display implementation +- **Request/Response Types**: Structured data with comprehensive error types +- **Server Implementation**: Using `fastn_p2p::listen()` and `Request::handle()` +- **Client Implementation**: Using `fastn_p2p::client::call()` for RPC-style calls +- **Error Handling**: Comprehensive error types and user-friendly messages +- **Concurrent Processing**: Using `fastn_p2p::spawn()` for handling multiple requests + +## API Design Patterns + +### Protocol Definition + +```rust +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum RemoteShellProtocol { + Execute, +} + +impl std::fmt::Display for RemoteShellProtocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RemoteShellProtocol::Execute => write!(f, "remote-shell-execute"), + } + } +} +``` + +### Request/Response Types + +```rust +#[derive(Debug, Serialize, Deserialize)] +pub struct ExecuteRequest { + pub command: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ExecuteResponse { + pub exit_code: i32, + pub stdout: String, + pub stderr: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum RemoteShellError { + CommandFailed { message: String, exit_code: Option }, + ExecutionError { message: String }, +} +``` + +### Server Implementation Pattern + +```rust +// Start listener +let protocols = [RemoteShellProtocol::Execute]; +let stream = fastn_p2p::listen(private_key, &protocols)?; +let mut stream = std::pin::pin!(stream); + +// Handle requests +while let Some(request_result) = stream.next().await { + let request = request_result?; + + // Spawn concurrent handler + fastn_p2p::spawn(async move { + request.handle(|execute_request: ExecuteRequest| async move { + // Your business logic here + execute_command(execute_request).await + }).await + }); +} +``` + +### Client Implementation Pattern + +```rust +let request = ExecuteRequest { command }; + +let result: Result, CallError> = + fastn_p2p::client::call( + private_key, + target, + RemoteShellProtocol::Execute, + request + ).await; + +match result { + Ok(Ok(response)) => { + // Handle successful response + println!("{}", response.stdout); + std::process::exit(response.exit_code); + } + Ok(Err(shell_error)) => { + // Handle application-level errors + eprintln!("Command failed: {}", shell_error); + } + Err(p2p_error) => { + // Handle network/P2P errors + eprintln!("Connection failed: {}", p2p_error); + } +} +``` + +## Testing + +The crate includes comprehensive tests demonstrating various scenarios: + +```bash +# Run all tests +cargo test + +# Run specific test module +cargo test remote_shell + +# Run integration tests +cargo test --test '*' +``` + +## Building + +```bash +# Build all binaries +cargo build --release + +# Build specific binary +cargo build --release --bin remote-shell-daemon +cargo build --release --bin remote-shell-client + +# Run with arguments +cargo run --bin remote-shell-daemon -- --help +cargo run --bin remote-shell-client -- --help +``` + +## Development Guidelines + +When extending or modifying this crate, follow these guidelines: + +### Documentation +- Every public item should have comprehensive documentation +- Include examples in doc comments where appropriate +- Explain design decisions and trade-offs +- Document security considerations + +### Error Handling +- Use specific error types rather than generic errors +- Provide helpful error messages with context +- Include troubleshooting information where appropriate +- Log errors appropriately for debugging + +### Code Organization +- Separate protocol definitions from implementation logic +- Use modules to organize related functionality +- Keep binary entry points simple and focused +- Extract reusable logic into library functions + +### Testing +- Write unit tests for individual functions +- Include integration tests for end-to-end scenarios +- Test error conditions and edge cases +- Use property-based testing where appropriate + +## Future Enhancements + +This crate provides a foundation for more advanced P2P applications: + +### Streaming Applications +- **Interactive Shell**: Real-time terminal interaction using streaming APIs +- **File Transfer**: Large file transfers with progress tracking +- **Audio/Video**: Real-time media streaming over P2P + +### Advanced Protocols +- **Multi-Protocol Support**: Negotiating different protocol versions +- **Authentication**: Secure authentication and authorization +- **Discovery**: Automatic peer discovery and service advertisement + +### Performance Applications +- **Load Testing**: Tools for testing P2P network performance +- **Benchmarking**: Measuring throughput and latency +- **Monitoring**: Real-time network and application metrics + +## Contributing + +When contributing to fastn-p2p-test: + +1. Follow the established code organization patterns +2. Add comprehensive documentation and examples +3. Include appropriate tests +4. Consider security implications +5. Update this README for significant changes + +## License + +This crate follows the same license as the main fastn project. \ No newline at end of file diff --git a/v0.5/fastn-p2p-test/src/bin/receiver.rs b/v0.5/fastn-p2p-test/src/bin/receiver.rs new file mode 100644 index 000000000..69eb16f72 --- /dev/null +++ b/v0.5/fastn-p2p-test/src/bin/receiver.rs @@ -0,0 +1,43 @@ +//! Minimal P2P Receiver Example +//! +//! Shows how to listen for P2P messages using fastn-p2p in just a few lines. +//! +//! Usage: receiver [private_key] + +use fastn_p2p_test::protocols::{EchoProtocol, EchoRequest, EchoResponse, EchoError, get_or_generate_key, init_logging}; +use futures_util::StreamExt; + +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + init_logging(); + + // Get or generate key + let args: Vec = std::env::args().collect(); + let key_str = args.get(1).map(|s| s.as_str()); + let receiver_key = get_or_generate_key(key_str)?; + + println!("Listening on ID52: {}", receiver_key.id52()); + println!("Connect with: sender {} ", receiver_key.id52()); + + // Start listening - this is the core fastn-p2p usage! + let protocols = [EchoProtocol::Echo]; + let mut stream = fastn_p2p::listen!(receiver_key, &protocols); + + // Handle incoming messages + while let Some(request) = stream.next().await { + let request = request?; + println!("📨 Message from {}", request.peer().id52()); + + // Process request using the convenient handle method + request.handle(|req: EchoRequest| async move { + println!(" Content: {}", req.message); + Ok::(EchoResponse { + echo: format!("Echo: {}", req.message), + }) + }).await?; + + println!("✅ Response sent"); + } + + Ok(()) +} \ No newline at end of file diff --git a/v0.5/fastn-p2p-test/src/bin/remote_shell/client.rs b/v0.5/fastn-p2p-test/src/bin/remote_shell/client.rs new file mode 100644 index 000000000..a66cc476b --- /dev/null +++ b/v0.5/fastn-p2p-test/src/bin/remote_shell/client.rs @@ -0,0 +1,62 @@ +//! Minimal Remote Shell Client +//! +//! Execute commands on remote machines over P2P. +//! +//! Usage: remote-shell-client + +use clap::Parser; +use fastn_p2p_test::{ + protocols::{parse_id52, get_or_generate_key}, + remote_shell::{RemoteShellProtocol, ExecuteRequest, ExecuteResponse, RemoteShellError}, +}; + +#[derive(Parser)] +#[command(about = "Execute commands on remote machines over P2P")] +struct Args { + /// Target machine ID52 + target: String, + + /// Private key (optional) + #[arg(long)] + private_key: Option, + + /// Command to execute + #[arg(trailing_var_arg = true)] + command: Vec, +} + +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + let args = Args::parse(); + + // Parse inputs + let target = parse_id52(&args.target)?; + let private_key = get_or_generate_key(args.private_key.as_deref())?; + let command = if args.command.is_empty() { + vec!["whoami".to_string()] + } else { + args.command + }; + + // Execute remote command - this is the core fastn-p2p usage! + let request = ExecuteRequest { command }; + let result: Result = fastn_p2p::client::call( + private_key, + target, + RemoteShellProtocol::Execute, + request, + ).await?; + + // Handle response + match result { + Ok(response) => { + print!("{}", response.stdout); + eprint!("{}", response.stderr); + std::process::exit(response.exit_code); + } + Err(error) => { + eprintln!("Remote error: {}", error); + std::process::exit(1); + } + } +} \ No newline at end of file diff --git a/v0.5/fastn-p2p-test/src/bin/remote_shell/daemon.rs b/v0.5/fastn-p2p-test/src/bin/remote_shell/daemon.rs new file mode 100644 index 000000000..6aa844022 --- /dev/null +++ b/v0.5/fastn-p2p-test/src/bin/remote_shell/daemon.rs @@ -0,0 +1,56 @@ +//! Minimal Remote Shell Daemon +//! +//! Accept P2P connections and execute commands. +//! +//! ⚠️ SECURITY WARNING: This allows full command execution from remote machines! +//! Only share your ID52 with completely trusted systems. +//! +//! Usage: remote-shell-daemon [--private-key ] + +use clap::Parser; +use fastn_p2p_test::{ + protocols::get_or_generate_key, + remote_shell::{RemoteShellProtocol, execute_command}, +}; +use futures_util::StreamExt; + +#[derive(Parser)] +#[command(about = "P2P Remote Shell Daemon")] +struct Args { + /// Private key (optional, generates if not provided) + #[arg(long)] + private_key: Option, +} + +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + let args = Args::parse(); + + // Get or generate key + let private_key = get_or_generate_key(args.private_key.as_deref())?; + + println!("⚠️ SECURITY WARNING: This daemon allows FULL COMMAND EXECUTION!"); + println!("🔑 Your ID52: {}", private_key.id52()); + println!("🚀 Connect with: remote-shell-client {} ", private_key.id52()); + println!("📡 Listening for P2P connections..."); + + // Start listening - this is the core fastn-p2p usage! + let protocols = [RemoteShellProtocol::Execute]; + let mut stream = fastn_p2p::listen!(private_key, &protocols); + + // Handle incoming requests + while let Some(request) = stream.next().await { + let request = request?; + println!("📨 Command from {}", request.peer().id52()); + + // Execute command using the convenient handle method + let result = request.handle(execute_command).await; + + match result { + Ok(()) => println!("✅ Command completed"), + Err(e) => eprintln!("❌ Request failed: {}", e), + } + } + + Ok(()) +} \ No newline at end of file diff --git a/v0.5/fastn-p2p-test/src/bin/sender.rs b/v0.5/fastn-p2p-test/src/bin/sender.rs new file mode 100644 index 000000000..0bdf048a9 --- /dev/null +++ b/v0.5/fastn-p2p-test/src/bin/sender.rs @@ -0,0 +1,42 @@ +//! Minimal P2P Sender Example +//! +//! Shows how to send a simple message using fastn-p2p in just a few lines. +//! +//! Usage: sender [message] + +use fastn_p2p_test::protocols::{EchoProtocol, EchoRequest, EchoResponse, EchoError, parse_id52, get_or_generate_key, init_logging}; + +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + init_logging(); + + let args: Vec = std::env::args().collect(); + if args.len() < 2 { + eprintln!("Usage: {} [message]", args[0]); + std::process::exit(1); + } + + // Parse arguments + let target = parse_id52(&args[1])?; + let message = args.get(2).map(|s| s.clone()).unwrap_or_else(|| "Hello P2P!".to_string()); + let sender_key = get_or_generate_key(None)?; + + println!("Sending '{}' to {}", message, target); + + // Send message - this is the core fastn-p2p usage! + let request = EchoRequest { message }; + let result: Result = fastn_p2p::client::call( + sender_key, + target, + EchoProtocol::Echo, + request, + ).await?; + + // Handle response + match result { + Ok(response) => println!("✅ Response: {}", response.echo), + Err(error) => eprintln!("❌ Error: {}", error.error), + } + + Ok(()) +} \ No newline at end of file diff --git a/v0.5/fastn-p2p-test/src/lib.rs b/v0.5/fastn-p2p-test/src/lib.rs index 7730e218e..9306b839e 100644 --- a/v0.5/fastn-p2p-test/src/lib.rs +++ b/v0.5/fastn-p2p-test/src/lib.rs @@ -1,7 +1,37 @@ -//! Shared test protocol definitions for fastn-p2p testing +//! fastn-p2p Test Suite and Reference Implementations +//! +//! This crate provides comprehensive testing utilities and reference implementations +//! for the fastn-p2p library. It includes: +//! +//! - **Test Protocols**: Simple protocols for basic P2P testing +//! - **Reference Applications**: Complete, well-documented example applications +//! - **Performance Tests**: Benchmarks and stress tests +//! - **Integration Tests**: End-to-end testing scenarios +//! +//! # Modules +//! +//! - [`remote_shell`]: Complete remote shell implementation demonstrating +//! request/response patterns, command execution, and error handling +//! +//! # Design Philosophy +//! +//! All code in this crate is written to demonstrate best practices: +//! +//! - **Documentation**: Comprehensive docs with examples and rationale +//! - **Error Handling**: Proper error types and user-friendly messages +//! - **Type Safety**: Strong typing and compile-time verification +//! - **Security**: Security considerations and safe defaults +//! - **Testing**: Unit tests and integration test examples +//! - **Code Organization**: Clear module structure and separation of concerns use serde::{Deserialize, Serialize}; +// Reference implementation modules +pub mod remote_shell; + +// Shared protocols and utilities for all examples +pub mod protocols; + /// Test protocol with meaningful names #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] pub enum TestProtocol { diff --git a/v0.5/fastn-p2p-test/src/protocols.rs b/v0.5/fastn-p2p-test/src/protocols.rs new file mode 100644 index 000000000..8bfc8e77d --- /dev/null +++ b/v0.5/fastn-p2p-test/src/protocols.rs @@ -0,0 +1,67 @@ +//! Shared Protocol Definitions +//! +//! This module contains all the P2P protocols used by the example binaries. +//! By centralizing protocol definitions, we eliminate duplication and ensure +//! consistency across all examples. + +use serde::{Deserialize, Serialize}; + +// ================================================================================================ +// Echo Protocol - For basic P2P testing +// ================================================================================================ + +/// Simple echo protocol for basic P2P communication testing +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum EchoProtocol { + Echo, +} + +impl std::fmt::Display for EchoProtocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "echo") + } +} + +/// Request message for echo protocol +#[derive(Debug, Serialize, Deserialize)] +pub struct EchoRequest { + pub message: String, +} + +/// Response message for echo protocol +#[derive(Debug, Serialize, Deserialize)] +pub struct EchoResponse { + pub echo: String, +} + +/// Error type for echo protocol +#[derive(Debug, Serialize, Deserialize)] +pub struct EchoError { + pub error: String, +} + +// ================================================================================================ +// Helper Functions - Common operations across binaries +// ================================================================================================ + +/// Parse a secret key from string or generate a new one +pub fn get_or_generate_key(key_str: Option<&str>) -> eyre::Result { + match key_str { + Some(s) => s.parse().map_err(|e| eyre::eyre!("Invalid key: {}", e)), + None => Ok(fastn_id52::SecretKey::generate()), + } +} + +/// Parse a public key (ID52) from string +pub fn parse_id52(id52_str: &str) -> eyre::Result { + id52_str.parse().map_err(|e| eyre::eyre!("Invalid ID52: {}", e)) +} + +/// Initialize minimal logging for examples +pub fn init_logging() { + tracing_subscriber::fmt() + .with_env_filter("fastn_p2p=info") + .with_target(false) + .without_time() + .init(); +} \ No newline at end of file diff --git a/v0.5/fastn-p2p-test/src/receiver.rs b/v0.5/fastn-p2p-test/src/receiver.rs deleted file mode 100644 index ad47dff9f..000000000 --- a/v0.5/fastn-p2p-test/src/receiver.rs +++ /dev/null @@ -1,132 +0,0 @@ -//! Minimal fastn-p2p receiver test -//! -//! Tests the generic protocol system with meaningful protocol names - -use futures_util::stream::StreamExt; -use serde::{Deserialize, Serialize}; - -/// Test protocol - meaningful names instead of Ping/Http! -#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] -pub enum TestProtocol { - Echo, -} - -impl std::fmt::Display for TestProtocol { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{self:?}") - } -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct EchoRequest { - pub from: String, - pub to: String, - pub message: String, - pub timestamp: i64, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct EchoResponse { - pub response: String, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct EchoError { - pub error: String, -} - -#[tokio::main] -async fn main() -> eyre::Result<()> { - // Initialize tracing - tracing_subscriber::fmt() - .with_env_filter("fastn_p2p=trace,fastn_p2p_test=info") - .init(); - - // Get secret key from command line args - let args: Vec = std::env::args().collect(); - let receiver_key = if args.len() > 1 { - let secret_key_str = &args[1]; - match secret_key_str.parse::() { - Ok(key) => { - println!("🔑 Using provided secret key"); - key - } - Err(e) => { - eprintln!("❌ Invalid secret key provided: {e}"); - return Err(eyre::eyre!("Invalid secret key: {}", e)); - } - } - } else { - println!("🔑 Generating new receiver key"); - fastn_id52::SecretKey::generate() - }; - - let receiver_id52 = receiver_key.public_key().id52(); - println!("🔑 Receiver ID52: {receiver_id52}"); - - // Output JSON for easy parsing in tests - let startup_info = serde_json::json!({ - "status": "started", - "receiver_id52": receiver_id52, - "secret_key": receiver_key.to_string(), - "timestamp": chrono::Utc::now().to_rfc3339() - }); - println!( - "📋 STARTUP: {}", - serde_json::to_string(&startup_info).unwrap_or_default() - ); - - // Start listening using fastn-p2p - println!("🔧 DEBUG: About to create protocols vec"); - let protocols = vec![TestProtocol::Echo]; - println!("🔧 DEBUG: About to call fastn_p2p::listen!"); - let mut stream = fastn_p2p::listen!(receiver_key, &protocols); - println!("🔧 DEBUG: listen! returned successfully"); - - println!("📡 fastn-p2p receiver listening on Echo protocol"); - println!("🎯 Waiting for connections..."); - - println!("🔧 DEBUG: About to call stream.next().await"); - - // Handle multiple connections - let mut message_count = 0; - - while let Some(request_result) = stream.next().await { - let request = request_result?; - message_count += 1; - - println!( - "🔗 Accepted connection #{} from {}", - message_count, - request.peer().id52() - ); - println!("📨 Received {} protocol request", request.protocol); - - // Handle the echo request - let result = request - .handle(|req: EchoRequest| async move { - println!("📨 Received message: {}", req.message); - - let response = EchoResponse { - response: format!("Echo: {}", req.message), - }; - - Result::::Ok(response) - }) - .await; - - match result { - Ok(_) => println!("✅ Request #{message_count} handled successfully"), - Err(e) => eprintln!("❌ Request #{message_count} handling failed: {e}"), - } - - // Stop after handling 10 messages for this test - if message_count >= 10 { - println!("🎯 Handled {message_count} messages, shutting down receiver"); - break; - } - } - - println!("🎯 fastn-p2p receiver test completed"); - Ok(()) -} diff --git a/v0.5/fastn-p2p-test/src/remote_shell/executor.rs b/v0.5/fastn-p2p-test/src/remote_shell/executor.rs new file mode 100644 index 000000000..5f1f29484 --- /dev/null +++ b/v0.5/fastn-p2p-test/src/remote_shell/executor.rs @@ -0,0 +1,232 @@ +//! Command Execution Engine +//! +//! This module provides secure and robust command execution capabilities +//! for the remote shell system. It handles process management, output +//! capture, and error handling with appropriate security considerations. + +use crate::remote_shell::protocol::{ExecuteRequest, ExecuteResponse, RemoteShellError}; +use colored::*; +use tokio::process::Command; + +/// Execute a command request and return the response +/// +/// This function is the core command execution engine. It takes a validated +/// command request and executes it on the local system, capturing all output +/// and handling errors appropriately. +/// +/// # Arguments +/// +/// * `request` - The command execution request containing the command and arguments +/// +/// # Returns +/// +/// * `Ok(ExecuteResponse)` - Successful execution with captured output +/// * `Err(RemoteShellError)` - Various execution failures with detailed error information +/// +/// # Security Considerations +/// +/// This function executes arbitrary commands on the local system, which poses +/// significant security risks. In production deployments, consider: +/// +/// - Command whitelisting/blacklisting +/// - Sandboxing (containers, chroot, etc.) +/// - Resource limits (CPU, memory, time) +/// - User privilege dropping +/// - Audit logging +/// +/// # Example +/// +/// ```rust,ignore +/// use crate::remote_shell::protocol::ExecuteRequest; +/// use crate::remote_shell::executor::execute_command; +/// +/// let request = ExecuteRequest { +/// command: vec!["echo".to_string(), "Hello World".to_string()], +/// }; +/// +/// match execute_command(request).await { +/// Ok(response) => { +/// println!("Exit code: {}", response.exit_code); +/// println!("Output: {}", response.stdout); +/// } +/// Err(error) => { +/// eprintln!("Error: {}", error); +/// } +/// } +/// ``` +pub async fn execute_command(request: ExecuteRequest) -> Result { + // Validate command is not empty + if request.command.is_empty() { + return Err(RemoteShellError::ExecutionError { + message: "Empty command provided".to_string(), + }); + } + + let program = &request.command[0]; + let args = &request.command[1..]; + + tracing::info!( + program = %program, + args = ?args, + "Executing remote command" + ); + + // Log the command execution for the daemon operator + println!( + "{} {} {}", + "🚀 Executing:".green(), + program.cyan(), + args.join(" ").yellow() + ); + + // Execute the command with proper error handling + let output = Command::new(program) + .args(args) + .output() + .await + .map_err(|e| { + let error_msg = format!("Failed to execute '{}': {}", program, e); + tracing::error!(error = %e, program = %program, "Command execution failed"); + + RemoteShellError::ExecutionError { + message: error_msg, + } + })?; + + // Convert output to strings, handling invalid UTF-8 gracefully + let stdout = String::from_utf8_lossy(&output.stdout).to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + let exit_code = output.status.code().unwrap_or(-1); + + // Log output for daemon operator (with size limits for readability) + log_output("stdout", &stdout); + log_output("stderr", &stderr); + + println!( + "{} {}", + "✅ Command completed with exit code:".green(), + exit_code.to_string().cyan() + ); + + tracing::info!( + exit_code = exit_code, + stdout_len = stdout.len(), + stderr_len = stderr.len(), + "Command execution completed" + ); + + Ok(ExecuteResponse { + exit_code, + stdout, + stderr, + }) +} + +/// Log command output with appropriate formatting and size limits +/// +/// This helper function logs command output in a readable format, +/// truncating very large outputs to prevent log spam while still +/// providing useful debugging information. +/// +/// # Arguments +/// +/// * `stream_name` - Name of the output stream ("stdout" or "stderr") +/// * `content` - The content to log +fn log_output(stream_name: &str, content: &str) { + if content.is_empty() { + return; + } + + const MAX_LOG_LENGTH: usize = 1000; + let trimmed = content.trim(); + + if trimmed.len() <= MAX_LOG_LENGTH { + println!("📤 {}: {}", stream_name.blue(), trimmed); + } else { + println!( + "📤 {} ({}): {}... (truncated, {} total bytes)", + stream_name.blue(), + "large output".yellow(), + &trimmed[..MAX_LOG_LENGTH], + content.len() + ); + } +} + +/// Validate command for security (placeholder for future implementation) +/// +/// This function is a placeholder for implementing command validation +/// and security policies. In production systems, this should include: +/// +/// - Whitelist/blacklist checking +/// - Path traversal prevention +/// - Resource limit validation +/// - Privilege checking +/// +/// # Arguments +/// +/// * `command` - The command vector to validate +/// +/// # Returns +/// +/// * `Ok(())` - Command is allowed +/// * `Err(RemoteShellError)` - Command is blocked with reason +#[allow(dead_code)] +pub fn validate_command(command: &[String]) -> Result<(), RemoteShellError> { + // Placeholder for future security implementations + + // Example: Block dangerous commands (this is just a demo) + if !command.is_empty() { + let program = &command[0]; + + // This is just an example - real implementations should be more sophisticated + let blocked_commands = ["rm", "dd", "mkfs", "format"]; + + if blocked_commands.contains(&program.as_str()) { + return Err(RemoteShellError::ExecutionError { + message: format!("Command '{}' is blocked by security policy", program), + }); + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_simple_command() { + let request = ExecuteRequest { + command: vec!["echo".to_string(), "test".to_string()], + }; + + let response = execute_command(request).await.unwrap(); + assert_eq!(response.exit_code, 0); + assert_eq!(response.stdout.trim(), "test"); + assert!(response.stderr.is_empty()); + } + + #[tokio::test] + async fn test_empty_command() { + let request = ExecuteRequest { + command: vec![], + }; + + let result = execute_command(request).await; + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), RemoteShellError::ExecutionError { .. })); + } + + #[tokio::test] + async fn test_nonexistent_command() { + let request = ExecuteRequest { + command: vec!["this_command_does_not_exist_12345".to_string()], + }; + + let result = execute_command(request).await; + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), RemoteShellError::ExecutionError { .. })); + } +} \ No newline at end of file diff --git a/v0.5/fastn-p2p-test/src/remote_shell/mod.rs b/v0.5/fastn-p2p-test/src/remote_shell/mod.rs new file mode 100644 index 000000000..e66f85633 --- /dev/null +++ b/v0.5/fastn-p2p-test/src/remote_shell/mod.rs @@ -0,0 +1,46 @@ +//! Remote Shell Module +//! +//! This module provides a complete reference implementation of a remote shell +//! system using fastn-p2p. It demonstrates best practices for: +//! +//! - Protocol design and type safety +//! - Command execution and security +//! - Error handling and user experience +//! - P2P communication patterns +//! - Code organization and documentation +//! +//! # Architecture +//! +//! The remote shell system consists of three main components: +//! +//! - **Protocol**: Type-safe definitions for P2P communication +//! - **Executor**: Secure command execution engine +//! - **Binaries**: Client and daemon applications +//! +//! # Security Considerations +//! +//! This implementation is intended for demonstration and testing purposes. +//! Production deployments should include additional security measures: +//! +//! - Authentication and authorization +//! - Command sandboxing and restrictions +//! - Resource limits and monitoring +//! - Audit logging and compliance +//! - Network security and encryption +//! +//! # Usage Patterns +//! +//! This module serves as a reference for implementing other P2P applications: +//! +//! - Request/response protocols +//! - Error handling strategies +//! - User interface design +//! - Testing approaches +//! - Documentation standards + +pub mod executor; +pub mod protocol; + +// Re-export commonly used types for convenience +pub use executor::execute_command; +pub use protocol::{ExecuteRequest, ExecuteResponse, RemoteShellError, RemoteShellProtocol}; \ No newline at end of file diff --git a/v0.5/fastn-p2p-test/src/remote_shell/protocol.rs b/v0.5/fastn-p2p-test/src/remote_shell/protocol.rs new file mode 100644 index 000000000..bd249a16d --- /dev/null +++ b/v0.5/fastn-p2p-test/src/remote_shell/protocol.rs @@ -0,0 +1,146 @@ +//! Remote Shell Protocol +//! +//! This module defines the P2P communication protocol for remote shell execution. +//! It demonstrates how to create type-safe, versioned protocols for fastn-p2p applications. + +use serde::{Deserialize, Serialize}; + +/// Protocol identifier for remote shell communication +/// +/// This enum defines the different types of operations supported by the remote shell system. +/// Each variant represents a specific protocol that clients and servers can negotiate. +/// +/// # Design Notes +/// +/// - Uses enum to allow for future protocol extensions (e.g., Interactive, FileTransfer) +/// - Implements all required traits for fastn-p2p protocol negotiation +/// - Display implementation provides human-readable protocol names for debugging +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum RemoteShellProtocol { + /// Execute a single command and return the result + Execute, +} + +impl std::fmt::Display for RemoteShellProtocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RemoteShellProtocol::Execute => write!(f, "remote-shell-execute"), + } + } +} + +/// Request to execute a command on a remote machine +/// +/// This structure encapsulates all the information needed to execute a command +/// on a remote system through the P2P network. +/// +/// # Fields +/// +/// - `command`: Vector of strings representing the command and its arguments +/// - First element is the program name +/// - Subsequent elements are command-line arguments +/// - Follows standard Unix convention (e.g., ["ls", "-la", "/tmp"]) +/// +/// # Examples +/// +/// ```rust,ignore +/// // Simple command +/// let request = ExecuteRequest { +/// command: vec!["echo".to_string(), "hello world".to_string()], +/// }; +/// +/// // Complex command with multiple arguments +/// let request = ExecuteRequest { +/// command: vec![ +/// "find".to_string(), +/// "/var/log".to_string(), +/// "-name".to_string(), +/// "*.log".to_string(), +/// "-mtime".to_string(), +/// "-7".to_string(), +/// ], +/// }; +/// ``` +#[derive(Debug, Serialize, Deserialize)] +pub struct ExecuteRequest { + pub command: Vec, +} + +/// Response from remote command execution +/// +/// This structure contains the complete result of a command execution, +/// including exit status and all output streams. +/// +/// # Fields +/// +/// - `exit_code`: The exit status code returned by the command +/// - 0 typically indicates success +/// - Non-zero values indicate various error conditions +/// - -1 is used when the exit code cannot be determined +/// - `stdout`: All output written to standard output by the command +/// - `stderr`: All output written to standard error by the command +/// +/// # Design Notes +/// +/// - Captures both stdout and stderr separately for proper handling +/// - Preserves exact command output including whitespace and formatting +/// - Exit code allows clients to determine success/failure programmatically +#[derive(Debug, Serialize, Deserialize)] +pub struct ExecuteResponse { + pub exit_code: i32, + pub stdout: String, + pub stderr: String, +} + +/// Error conditions for remote shell operations +/// +/// This enum defines the various error states that can occur during +/// remote command execution, providing detailed error information +/// for proper client-side handling. +/// +/// # Variants +/// +/// - `CommandFailed`: The command was executed but returned an error +/// - `ExecutionError`: The command could not be executed at all +/// +/// # Design Philosophy +/// +/// Different error types allow clients to distinguish between: +/// 1. Commands that ran but failed (CommandFailed) +/// 2. System-level execution problems (ExecutionError) +/// +/// This enables appropriate error handling and user feedback. +#[derive(Debug, Serialize, Deserialize)] +pub enum RemoteShellError { + /// Command was executed but returned a non-zero exit code + CommandFailed { + /// Human-readable error description + message: String, + /// The actual exit code returned by the command + exit_code: Option, + }, + /// Command could not be executed due to system-level errors + ExecutionError { + /// Detailed error message explaining what went wrong + message: String, + }, +} + +impl std::fmt::Display for RemoteShellError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RemoteShellError::CommandFailed { message, exit_code } => { + if let Some(code) = exit_code { + write!(f, "Command failed (exit code {}): {}", code, message) + } else { + write!(f, "Command failed: {}", message) + } + } + RemoteShellError::ExecutionError { message } => { + write!(f, "Execution error: {}", message) + } + } + } +} + +impl std::error::Error for RemoteShellError {} \ No newline at end of file diff --git a/v0.5/fastn-p2p-test/src/sender.rs b/v0.5/fastn-p2p-test/src/sender.rs deleted file mode 100644 index 5cd0fa920..000000000 --- a/v0.5/fastn-p2p-test/src/sender.rs +++ /dev/null @@ -1,132 +0,0 @@ -//! Minimal fastn-p2p sender test -//! -//! Tests the generic protocol system by sending meaningful protocol requests - -use serde::{Deserialize, Serialize}; - -/// Test protocol - meaningful names! -#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] -pub enum TestProtocol { - Echo, -} - -impl std::fmt::Display for TestProtocol { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{self:?}") - } -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct EchoRequest { - pub from: String, - pub to: String, - pub message: String, - pub timestamp: i64, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct EchoResponse { - pub response: String, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct EchoError { - pub error: String, -} - -#[tokio::main] -async fn main() -> eyre::Result<()> { - println!("🔧 DEBUG SENDER: Starting main function"); - - // Initialize tracing - tracing_subscriber::fmt() - .with_env_filter("fastn_p2p=trace,fastn_p2p_test=info") - .init(); - - println!("🔧 DEBUG SENDER: Tracing initialized"); - - // Parse command line arguments: sender - let args: Vec = std::env::args().collect(); - - let (sender_key, receiver_id52) = if args.len() >= 3 { - let sender_secret_str = &args[1]; - let receiver_id52 = args[2].clone(); - - let sender_key = match sender_secret_str.parse::() { - Ok(key) => key, - Err(e) => { - eprintln!("❌ Invalid sender secret key: {e}"); - std::process::exit(1); - } - }; - - (sender_key, receiver_id52) - } else { - eprintln!("❌ Usage: sender "); - std::process::exit(1); - }; - - let sender_id52 = sender_key.public_key().id52(); - println!("🔑 Sender ID52: {sender_id52}"); - println!("🎯 Target ID52: {receiver_id52}"); - - // Convert receiver ID52 to public key - let receiver_public_key = receiver_id52 - .parse::() - .map_err(|e| eyre::eyre!("Invalid receiver_id52: {}", e))?; - - println!("📤 Sending test message via fastn-p2p"); - - // Create test request - let request = EchoRequest { - from: sender_id52, - to: receiver_id52, - message: "Hello from fastn-p2p test!".to_string(), - timestamp: chrono::Utc::now().timestamp(), - }; - - // Send using fastn-p2p call with meaningful protocol name - println!("🔧 DEBUG: About to call fastn_p2p::call"); - let result: Result = fastn_p2p::call( - sender_key, - &receiver_public_key, - TestProtocol::Echo, - request, - ) - .await - .map_err(|e| { - eprintln!("❌ fastn_p2p::call failed: {e}"); - e - })?; - println!("🔧 DEBUG: fastn_p2p::call completed successfully"); - - println!("🔧 DEBUG: About to match result"); - match result { - Ok(response) => { - println!("🔧 DEBUG: Got Ok response"); - println!("✅ Received response: {}", response.response); - - // Output JSON result for test parsing - let result_json = serde_json::json!({ - "status": "success", - "response": response.response, - "timestamp": chrono::Utc::now().to_rfc3339() - }); - println!("📋 RESULT: {}", serde_json::to_string(&result_json)?); - } - Err(error) => { - println!("🔧 DEBUG: Got Err response"); - eprintln!("❌ Received error: {}", error.error); - - let error_json = serde_json::json!({ - "status": "error", - "error": error.error, - "timestamp": chrono::Utc::now().to_rfc3339() - }); - println!("📋 RESULT: {}", serde_json::to_string(&error_json)?); - } - } - - println!("🎯 fastn-p2p sender test completed"); - Ok(()) -} diff --git a/v0.5/fastn-p2p/Cargo.toml b/v0.5/fastn-p2p/Cargo.toml index 5fdadb24f..3c65fbea5 100644 --- a/v0.5/fastn-p2p/Cargo.toml +++ b/v0.5/fastn-p2p/Cargo.toml @@ -6,6 +6,7 @@ description = "High-level, type-safe P2P communication for fastn" homepage.workspace = true license.workspace = true + [dependencies] fastn-net = { path = "../fastn-net", version = "0.1.2" } fastn-id52 = { path = "../fastn-id52", version = "0.1.1" } @@ -21,9 +22,13 @@ tokio.workspace = true tokio-util.workspace = true tracing.workspace = true +# Context integration +fastn-context.workspace = true + # Re-export proc macros from separate crate fastn-p2p-macros = { path = "../fastn-p2p-macros", version = "0.1.0" } + [dev-dependencies] tokio-test = "0.4" enum-display-derive = "0.1" \ No newline at end of file diff --git a/v0.5/fastn-p2p/src/client.rs b/v0.5/fastn-p2p/src/client.rs index 1d15c8652..98c62821c 100644 --- a/v0.5/fastn-p2p/src/client.rs +++ b/v0.5/fastn-p2p/src/client.rs @@ -1,46 +1,18 @@ -/// Error type for call function -#[derive(Debug, thiserror::Error)] -pub enum CallError { - #[error("Failed to establish P2P stream: {source}")] - Endpoint { source: eyre::Error }, - - #[error("Failed to establish P2P stream: {source}")] - Stream { source: eyre::Error }, - - #[error("Failed to serialize request: {source}")] - Serialization { source: serde_json::Error }, - - #[error("Failed to send request: {source}")] - Send { source: eyre::Error }, - - #[error("Failed to receive response: {source}")] - Receive { source: eyre::Error }, - - #[error("Failed to deserialize response: {source}")] - Deserialization { source: serde_json::Error }, -} - -/// Make a P2P call using global singletons +/// Client-side P2P communication /// -/// This is the main function end users should use. It automatically uses -/// the global connection pool and graceful shutdown coordinator. -/// -/// # Example -/// -/// ```rust,ignore -/// let result: Result = fastn_p2p::call( -/// secret_key, &target, protocol, request -/// ).await?; -/// ``` -pub async fn call( - sender: fastn_id52::SecretKey, - target: &fastn_id52::PublicKey, - protocol: P, - input: INPUT, -) -> Result, CallError> +/// This module provides client APIs for establishing both simple request/response +/// connections and complex streaming sessions with remote P2P endpoints. + +/// Simple request/response communication (existing functionality) +pub async fn call( + our_key: fastn_id52::SecretKey, + target: fastn_id52::PublicKey, + protocol: PROTOCOL, + request: REQUEST +) -> Result, CallError> where - P: serde::Serialize - + for<'de> serde::Deserialize<'de> + PROTOCOL: serde::Serialize + + for<'de> serde::Deserialize<'de> + Clone + PartialEq + std::fmt::Display @@ -48,10 +20,84 @@ where + Send + Sync + 'static, - INPUT: serde::Serialize, - OUTPUT: for<'de> serde::Deserialize<'de>, - ERROR: for<'de> serde::Deserialize<'de>, + REQUEST: serde::Serialize + for<'de> serde::Deserialize<'de>, + RESPONSE: serde::Serialize + for<'de> serde::Deserialize<'de>, + ERROR: serde::Serialize + for<'de> serde::Deserialize<'de>, { - // Delegate to coordination module which has strict singleton access control - crate::coordination::internal_call(sender, target, protocol, input).await + // Delegate to existing coordination infrastructure (will be restored in Phase 5) + crate::coordination::internal_call(our_key, &target, protocol, request).await } + +/// Establish streaming P2P session (new functionality) +pub async fn connect( + our_key: fastn_id52::SecretKey, + target: fastn_id52::PublicKey, + protocol: PROTOCOL +) -> Result +where + PROTOCOL: serde::Serialize + for<'de> serde::Deserialize<'de> + std::fmt::Debug, +{ + // TODO: Implement streaming connection establishment + todo!("Connect to {target} with protocol {protocol:?} using {}", our_key.id52()) +} + +/// Client-side streaming session +pub struct Session { + /// Input stream to server + pub stdin: iroh::endpoint::SendStream, + /// Output stream from server + pub stdout: iroh::endpoint::RecvStream, + // TODO: Add context integration + // context: std::sync::Arc, +} + +impl Session { + /// Accept unidirectional stream back from server (e.g., stderr) + pub async fn accept_uni(&mut self) -> Result { + // TODO: Accept incoming unidirectional stream from server + todo!("Accept unidirectional stream from server") + } + + /// Accept bidirectional stream back from server + pub async fn accept_bi(&mut self) -> Result<(iroh::endpoint::RecvStream, iroh::endpoint::SendStream), ConnectionError> { + // TODO: Accept incoming bidirectional stream from server + todo!("Accept bidirectional stream from server") + } +} + +/// Errors for client operations +#[derive(Debug, thiserror::Error)] +pub enum CallError { + #[error("Connection failed: {source}")] + Connection { source: eyre::Error }, + + #[error("Request/response error: {source}")] + RequestResponse { source: eyre::Error }, + + #[error("Serialization error: {source}")] + Serialization { source: serde_json::Error }, + + #[error("Endpoint error: {source}")] + Endpoint { source: eyre::Error }, + + #[error("Stream error: {source}")] + Stream { source: eyre::Error }, + + #[error("Send error: {source}")] + Send { source: eyre::Error }, + + #[error("Receive error: {source}")] + Receive { source: eyre::Error }, + + #[error("Deserialization error: {source}")] + Deserialization { source: serde_json::Error }, +} + +#[derive(Debug, thiserror::Error)] +pub enum ConnectionError { + #[error("Failed to establish streaming connection: {source}")] + Connection { source: eyre::Error }, + + #[error("Stream error: {source}")] + Stream { source: eyre::Error }, +} \ No newline at end of file diff --git a/v0.5/fastn-p2p/src/lib.rs b/v0.5/fastn-p2p/src/lib.rs index a0940be15..0f8dca5bc 100644 --- a/v0.5/fastn-p2p/src/lib.rs +++ b/v0.5/fastn-p2p/src/lib.rs @@ -31,11 +31,13 @@ extern crate self as fastn_p2p; -mod client; mod coordination; mod globals; mod macros; -mod server; + +// Export client and server modules (new modular API) +pub mod client; +pub mod server; // Re-export essential types from fastn-net that users need pub use fastn_net::{Graceful, Protocol}; @@ -48,12 +50,9 @@ pub use fastn_p2p_macros::main; pub use coordination::{cancelled, shutdown, spawn}; pub use globals::{graceful, pool}; -// Client API - clean, simple naming (only expose simple version) -pub use client::{CallError, call}; - -// Server API - clean, simple naming +// Note: Legacy call() export removed - will be restored in Phase 5 migration pub use server::{ GetInputError, HandleRequestError, ListenerAlreadyActiveError, ListenerNotFoundError, Request, ResponseHandle, SendError, active_listener_count, active_listeners, is_listening, listen, - stop_listening, + stop_listening, Session, }; diff --git a/v0.5/fastn-p2p/src/server/mod.rs b/v0.5/fastn-p2p/src/server/mod.rs index c882759b7..2d481dc32 100644 --- a/v0.5/fastn-p2p/src/server/mod.rs +++ b/v0.5/fastn-p2p/src/server/mod.rs @@ -6,6 +6,7 @@ pub mod handle; pub mod listener; pub mod management; pub mod request; +pub mod session; // Public API exports - no use statements, direct qualification pub use handle::{ResponseHandle, SendError}; @@ -15,3 +16,4 @@ pub use management::{ is_listening, stop_listening, }; pub use request::{GetInputError, HandleRequestError, Request}; +pub use session::Session; diff --git a/v0.5/fastn-p2p/src/server/session.rs b/v0.5/fastn-p2p/src/server/session.rs new file mode 100644 index 000000000..13e12a552 --- /dev/null +++ b/v0.5/fastn-p2p/src/server/session.rs @@ -0,0 +1,61 @@ +/// Server-side streaming session (handles both RPC and streaming) +pub struct Session { + /// Protocol negotiated with client + pub protocol: PROTOCOL, + /// Stream to client (stdout) + pub send: iroh::endpoint::SendStream, + /// Stream from client (stdin) + pub recv: iroh::endpoint::RecvStream, + /// Peer's public key + peer: fastn_id52::PublicKey, + /// Context for this session (integration with fastn-context) + context: std::sync::Arc, +} + +impl Session { + /// Get the peer's public key + pub fn peer(&self) -> &fastn_id52::PublicKey { + &self.peer + } + + /// Get the context for this session + pub fn context(&self) -> &std::sync::Arc { + &self.context + } + + /// Convert to Request for RPC handling (consumes Session) + pub fn into_request(self) -> super::request::Request { + // TODO: Convert Session to Request for RPC pattern + todo!("Convert Session to Request for RPC handling") + } + + /// Open unidirectional stream back to client (e.g., stderr) + pub async fn open_uni(&mut self) -> Result { + // TODO: Open unidirectional stream to client + todo!("Open unidirectional stream back to client") + } + + /// Open bidirectional stream back to client + pub async fn open_bi(&mut self) -> Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream), crate::client::ConnectionError> { + // TODO: Open bidirectional stream to client + todo!("Open bidirectional stream back to client") + } +} + +/// Create a new Session (used internally by listener) +pub(crate) fn create_session( + protocol: PROTOCOL, + send: iroh::endpoint::SendStream, + recv: iroh::endpoint::RecvStream, + peer: fastn_id52::PublicKey, + parent_context: &std::sync::Arc, +) -> Session { + // Use parent context for now (can create child context later) + Session { + protocol, + send, + recv, + peer, + context: parent_context.clone(), + } +} \ No newline at end of file