From 04e64abd865fef1e9885619e0db88bd2d3b9c87a Mon Sep 17 00:00:00 2001 From: Sarat Chandra Date: Mon, 3 Nov 2025 23:42:25 +0530 Subject: [PATCH 1/2] feat: Add SQLite performance tuning configuration options - Add `[sqlite]` configuration section with cache_size_mb, busy_timeout_ms, synchronous mode, and mmap_size options - Implement automatic per-shard calculation for cache and mmap settings with floor division - Update default synchronous mode to NORMAL for better safety in WAL mode - Increase migration cache to 1GB for better performance during shard migrations - Add comprehensive documentation in config.example.toml with tuning recommendations - Print SQLite configuration on server startup for visibility - Enable foreign_keys pragma for data integrity --- config.example.toml | 23 +++++++ src/app_state.rs | 25 ++++--- src/cluster.rs | 12 ++-- src/config.rs | 146 ++++++++++++++++++++++++++++++++++++++++ src/migration.rs | 9 ++- src/redis/protocol.rs | 120 ++++++++++++++++++++++++--------- src/shard_manager.rs | 98 ++++++++++++++++++--------- tests/migration_test.rs | 9 ++- 8 files changed, 360 insertions(+), 82 deletions(-) diff --git a/config.example.toml b/config.example.toml index c04fbb8..81226b0 100644 --- a/config.example.toml +++ b/config.example.toml @@ -51,6 +51,29 @@ enabled = true algorithm = "zstd" # Options: gzip, zstd, lz4, brotli level = 3 +# SQLite performance tuning (optional) +# Based on: https://kerkour.com/sqlite-for-servers +[sqlite] +# Cache size across all shards in MB (default ≈ 100 MB per shard) +# Tuning tip: cache_size_mb ≈ (Available RAM in GB * 0.7 * 1024) +# Example: 16GB RAM dedicated at 70% → cache_size_mb ≈ 11468 (≈2867 MB per shard for 4 shards) +cache_size_mb = 400 + +# Busy timeout in milliseconds (default: 5000 = 5 seconds) +# Increase for high-contention workloads +busy_timeout_ms = 5000 + +# Synchronous mode (default: NORMAL) +# OFF: Fastest, but risk of corruption on power loss (not recommended) +# NORMAL: Recommended - good performance with corruption safety in WAL mode +# FULL: Safest but slower, use for critical data +synchronous = "NORMAL" + +# Total memory-mapped I/O size in MB (default: 0 = disabled) +# Only useful for databases larger than cache_size +# Example: 3000 (≈3 GB total budget) +# mmap_size = 0 + # Metrics configuration (optional) # Enable Prometheus-compatible metrics endpoint [metrics] diff --git a/src/app_state.rs b/src/app_state.rs index f372411..299684b 100644 --- a/src/app_state.rs +++ b/src/app_state.rs @@ -67,6 +67,12 @@ impl AppState { let data_dir = cfg.data_dir.clone(); let db_path = format!("{}/shard_{}.db", data_dir, i); + // Get SQLite configuration with defaults + let cache_size_mb = cfg.sqlite_cache_size_per_shard_mb(); + let busy_timeout_ms = cfg.sqlite_busy_timeout_ms(); + let synchronous = cfg.sqlite_synchronous(); + let mmap_size = cfg.sqlite_mmap_per_shard_bytes(); + let mut connect_options = SqliteConnectOptions::from_str(&format!("sqlite:{}", db_path)) .expect(&format!( @@ -75,16 +81,19 @@ impl AppState { )) .create_if_missing(true) .journal_mode(SqliteJournalMode::Wal) - .busy_timeout(std::time::Duration::from_millis(5000)); + .busy_timeout(std::time::Duration::from_millis(busy_timeout_ms)); - // These PRAGMAs are often set for performance with WAL mode. - // `synchronous = OFF` is safe except for power loss. - // `cache_size` is negative to indicate KiB, so -4000 is 4MB. - // `temp_store = MEMORY` avoids disk I/O for temporary tables. + // Configure SQLite PRAGMAs for optimal server performance connect_options = connect_options - .pragma("synchronous", "OFF") - .pragma("cache_size", "-100000") // 4MB cache per shard - .pragma("temp_store", "MEMORY"); + .pragma("synchronous", synchronous.as_str()) + .pragma("cache_size", format!("-{}", cache_size_mb * 1024)) // Negative means KiB + .pragma("temp_store", "MEMORY") + .pragma("foreign_keys", "true"); + + // Enable memory-mapped I/O if configured + if mmap_size > 0 { + connect_options = connect_options.pragma("mmap_size", mmap_size.to_string()); + } db_pools_futures.push(sqlx::SqlitePool::connect_with(connect_options)) } diff --git a/src/cluster.rs b/src/cluster.rs index b300267..2e01307 100644 --- a/src/cluster.rs +++ b/src/cluster.rs @@ -434,7 +434,10 @@ impl ClusterManager { let (local_ip, local_port) = if let Some(ref advertise_addr) = self.config.advertise_addr { // Parse advertise_addr which is in "ip:port" format if let Some((ip, port)) = advertise_addr.split_once(':') { - (ip.to_string(), port.parse().unwrap_or(self.local_addr.port())) + ( + ip.to_string(), + port.parse().unwrap_or(self.local_addr.port()), + ) } else { // If no port in advertise_addr, use the IP with local port (advertise_addr.clone(), self.local_addr.port()) @@ -444,13 +447,10 @@ impl ClusterManager { } else { (self.local_addr.ip().to_string(), self.local_addr.port()) }; - + let local_info = format!( "{} {}:{} myself,master - 0 0 0 connected {}", - self.node_id, - local_ip, - local_port, - local_slots_ranges + self.node_id, local_ip, local_port, local_slots_ranges ); result.push(local_info); diff --git a/src/config.rs b/src/config.rs index 4102dd0..dedecba 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,6 +14,33 @@ pub struct Cfg { pub addr: Option, pub cluster: Option, pub metrics: Option, + pub sqlite: Option, +} + +#[derive(Debug, Clone, serde::Deserialize)] +pub struct SqliteConfig { + /// SQLite cache size in MB for the whole process + /// Default: 100 MB per shard (derived from shard count) + /// Higher values improve read performance but consume more RAM + pub cache_size_mb: Option, + + /// SQLite busy timeout in milliseconds + /// Default: 5000 ms (5 seconds) + /// Increase for high-contention workloads + pub busy_timeout_ms: Option, + + /// SQLite synchronous mode: OFF, NORMAL, FULL + /// Default: NORMAL (recommended for WAL mode) + /// OFF = fastest but risk of corruption on power loss + /// NORMAL = good performance with corruption safety in WAL mode + /// FULL = safest but slower + pub synchronous: Option, + + /// Memory-mapped I/O size in MB for the whole process + /// Default: 0 (disabled) + /// Only useful for large databases that don't fit in cache + /// Example: 3000 (≈3 GB total) + pub mmap_size: Option, } #[derive(Debug, Clone, serde::Deserialize)] @@ -46,6 +73,24 @@ pub enum CompressionType { Brotli, } +#[derive(Debug, Clone, Copy, serde::Deserialize, PartialEq)] +#[serde(rename_all = "UPPERCASE")] +pub enum SqliteSynchronous { + OFF, + NORMAL, + FULL, +} + +impl SqliteSynchronous { + pub fn as_str(&self) -> &'static str { + match self { + SqliteSynchronous::OFF => "OFF", + SqliteSynchronous::NORMAL => "NORMAL", + SqliteSynchronous::FULL => "FULL", + } + } +} + #[derive(Debug, Clone, serde::Deserialize)] pub struct CompressionConfig { pub enabled: bool, @@ -110,6 +155,30 @@ impl Cfg { } } + // Print SQLite configuration + let cache_total_mb = cfg.sqlite_cache_total_mb(); + let cache_per_shard_mb = cfg.sqlite_cache_size_per_shard_mb(); + let busy_timeout = cfg.sqlite_busy_timeout_ms(); + let synchronous = cfg.sqlite_synchronous(); + let mmap_total_mb = cfg.sqlite_mmap_total_mb(); + let mmap_per_shard_mb = cfg.sqlite_mmap_per_shard_mb(); + + println!("SQLite configuration:"); + println!( + " cache_size: {} MB total ({} MB per shard, floor)", + cache_total_mb, cache_per_shard_mb + ); + println!(" busy_timeout: {} ms", busy_timeout); + println!(" synchronous: {}", synchronous.as_str()); + if mmap_total_mb > 0 { + println!( + " mmap_size: {} MB total ({} MB per shard, floor)", + mmap_total_mb, mmap_per_shard_mb + ); + } else { + println!(" mmap_size: disabled"); + } + Ok(cfg) } @@ -119,4 +188,81 @@ impl Cfg { _ => false, } } + + /// Get configured SQLite cache size in MB for the whole process. + /// Default scales with shard count to preserve the 100 MB per-shard baseline. + pub fn sqlite_cache_total_mb(&self) -> i32 { + let default_per_shard = 100usize; + let default_total = self + .num_shards + .saturating_mul(default_per_shard) + .min(i32::MAX as usize) as i32; + + self.sqlite + .as_ref() + .and_then(|s| s.cache_size_mb) + .unwrap_or(default_total) + } + + /// Derived SQLite cache size in MB per shard (floor division). + pub fn sqlite_cache_size_per_shard_mb(&self) -> i32 { + let total = self.sqlite_cache_total_mb(); + if total <= 0 { + return 0; + } + + let shards = self.num_shards as i32; + if shards <= 0 { + return 0; + } + + let per_shard = total / shards; + per_shard.max(0) + } + + /// Get SQLite busy timeout in milliseconds (default: 5000 ms = 5 seconds) + pub fn sqlite_busy_timeout_ms(&self) -> u64 { + self.sqlite + .as_ref() + .and_then(|s| s.busy_timeout_ms) + .unwrap_or(5000) + } + + /// Get SQLite synchronous mode (default: NORMAL) + pub fn sqlite_synchronous(&self) -> SqliteSynchronous { + self.sqlite + .as_ref() + .and_then(|s| s.synchronous) + .unwrap_or(SqliteSynchronous::NORMAL) + } + + /// Get SQLite mmap size in MB (default: 0 = disabled) + pub fn sqlite_mmap_total_mb(&self) -> u64 { + self.sqlite.as_ref().and_then(|s| s.mmap_size).unwrap_or(0) + } + + /// Derived SQLite mmap size in MB per shard (floor division). + pub fn sqlite_mmap_per_shard_mb(&self) -> u64 { + let total = self.sqlite_mmap_total_mb(); + if total == 0 { + return 0; + } + + let shards = self.num_shards as u64; + if shards == 0 { + return 0; + } + + total / shards + } + + /// Derived SQLite mmap size in bytes per shard (floor division). + pub fn sqlite_mmap_per_shard_bytes(&self) -> u64 { + let per_shard_mb = self.sqlite_mmap_per_shard_mb(); + if per_shard_mb == 0 { + return 0; + } + + per_shard_mb.saturating_mul(1024).saturating_mul(1024) + } } diff --git a/src/migration.rs b/src/migration.rs index 6674a94..7c78e74 100644 --- a/src/migration.rs +++ b/src/migration.rs @@ -55,14 +55,17 @@ impl MigrationManager { async fn create_connection_pool(&self, shard_id: usize) -> Result { let db_path = format!("{}/shard_{}.db", self.data_dir, shard_id); + // Use recommended SQLite settings for migration operations + // See: https://kerkour.com/sqlite-for-servers let connect_options = SqliteConnectOptions::from_str(&format!("sqlite:{}", db_path)) .map_err(|e| sqlx_to_miette(e, "Failed to parse connection string"))? .create_if_missing(true) .journal_mode(SqliteJournalMode::Wal) .busy_timeout(std::time::Duration::from_millis(5000)) - .pragma("synchronous", "OFF") - .pragma("cache_size", "-100000") - .pragma("temp_store", "MEMORY"); + .pragma("synchronous", "NORMAL") // NORMAL is safer than OFF + .pragma("cache_size", "-1024000") // 1GB cache for better performance during migration + .pragma("temp_store", "MEMORY") + .pragma("foreign_keys", "true"); SqlitePool::connect_with(connect_options) .await diff --git a/src/redis/protocol.rs b/src/redis/protocol.rs index f837aa7..018fce4 100644 --- a/src/redis/protocol.rs +++ b/src/redis/protocol.rs @@ -7,10 +7,10 @@ pub type RespValue = BytesFrame; /// Expiration options for commands #[derive(Debug, Clone, PartialEq)] pub enum ExpireOption { - Ex(u64), // seconds - Px(u64), // milliseconds - ExAt(i64), // unix timestamp in seconds - PxAt(i64), // unix timestamp in milliseconds + Ex(u64), // seconds + Px(u64), // milliseconds + ExAt(i64), // unix timestamp in seconds + PxAt(i64), // unix timestamp in milliseconds KeepTtl, } @@ -178,9 +178,9 @@ fn parse_command_array(elements: Vec) -> Result) -> Result { - return Err(ParseError::Invalid(format!("Unknown SET option: {}", option))); + return Err(ParseError::Invalid(format!( + "Unknown SET option: {}", + option + ))); } } } - - Ok(RedisCommand::Set { key, value, ttl_seconds }) + + Ok(RedisCommand::Set { + key, + value, + ttl_seconds, + }) } "DEL" => { if elements.len() < 2 { @@ -282,16 +289,17 @@ fn parse_command_array(elements: Vec) -> Result { if elements.len() < 5 { return Err(ParseError::Invalid( - "HSETEX requires at least key, FIELDS, numfields, and one field-value pair".to_string(), + "HSETEX requires at least key, FIELDS, numfields, and one field-value pair" + .to_string(), )); } - + let key = extract_string(&elements[1])?; let mut idx = 2; let mut fnx = false; let mut fxx = false; let mut expire_option = None; - + // Parse options while idx < elements.len() { let opt = extract_string(&elements[idx])?.to_uppercase(); @@ -356,38 +364,46 @@ fn parse_command_array(elements: Vec) -> Result= elements.len() || extract_string(&elements[idx])?.to_uppercase() != "FIELDS" { - return Err(ParseError::Invalid("HSETEX requires FIELDS keyword".to_string())); + return Err(ParseError::Invalid( + "HSETEX requires FIELDS keyword".to_string(), + )); } idx += 1; - + // Get number of fields if idx >= elements.len() { - return Err(ParseError::Invalid("HSETEX requires field count after FIELDS".to_string())); + return Err(ParseError::Invalid( + "HSETEX requires field count after FIELDS".to_string(), + )); } let num_fields = extract_string(&elements[idx])? .parse::() .map_err(|_| ParseError::Invalid("Invalid field count".to_string()))?; idx += 1; - + // Parse field-value pairs let mut fields = Vec::new(); for _ in 0..num_fields { if idx + 1 >= elements.len() { - return Err(ParseError::Invalid("Not enough field-value pairs".to_string())); + return Err(ParseError::Invalid( + "Not enough field-value pairs".to_string(), + )); } let field = extract_string(&elements[idx])?; let value = extract_bytes(&elements[idx + 1])?; fields.push((field, value)); idx += 2; } - + if fields.is_empty() { - return Err(ParseError::Invalid("HSETEX requires at least one field-value pair".to_string())); + return Err(ParseError::Invalid( + "HSETEX requires at least one field-value pair".to_string(), + )); } - + Ok(RedisCommand::HSetEx { key, fnx, @@ -565,7 +581,8 @@ mod tests { #[test] fn test_parse_set_command_with_ex() { - let input = b"*5\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$11\r\nhello world\r\n$2\r\nEX\r\n$2\r\n60\r\n"; + let input = + b"*5\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$11\r\nhello world\r\n$2\r\nEX\r\n$2\r\n60\r\n"; let (resp, _) = parse_resp_with_remaining(input).unwrap(); let command = parse_command(resp).unwrap(); assert_eq!( @@ -580,7 +597,8 @@ mod tests { #[test] fn test_parse_set_command_with_px() { - let input = b"*5\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$11\r\nhello world\r\n$2\r\nPX\r\n$5\r\n60000\r\n"; + let input = + b"*5\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$11\r\nhello world\r\n$2\r\nPX\r\n$5\r\n60000\r\n"; let (resp, _) = parse_resp_with_remaining(input).unwrap(); let command = parse_command(resp).unwrap(); assert_eq!( @@ -644,14 +662,24 @@ mod tests { let (resp, _) = parse_resp_with_remaining(input).unwrap(); let result = parse_command(resp); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("EX requires a value")); + assert!( + result + .unwrap_err() + .to_string() + .contains("EX requires a value") + ); // Test SET with PX but no value let input = b"*4\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$5\r\nvalue\r\n$2\r\nPX\r\n"; let (resp, _) = parse_resp_with_remaining(input).unwrap(); let result = parse_command(resp); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("PX requires a value")); + assert!( + result + .unwrap_err() + .to_string() + .contains("PX requires a value") + ); } #[test] @@ -660,7 +688,12 @@ mod tests { let (resp, _) = parse_resp_with_remaining(input).unwrap(); let result = parse_command(resp); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("Unknown SET option: XX")); + assert!( + result + .unwrap_err() + .to_string() + .contains("Unknown SET option: XX") + ); } #[test] @@ -669,7 +702,12 @@ mod tests { let (resp, _) = parse_resp_with_remaining(input).unwrap(); let result = parse_command(resp); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("Invalid seconds value")); + assert!( + result + .unwrap_err() + .to_string() + .contains("Invalid seconds value") + ); } #[test] @@ -679,14 +717,24 @@ mod tests { let (resp, _) = parse_resp_with_remaining(input).unwrap(); let result = parse_command(resp); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("TTL requires exactly 1 argument")); + assert!( + result + .unwrap_err() + .to_string() + .contains("TTL requires exactly 1 argument") + ); // Too few arguments let input = b"*1\r\n$3\r\nTTL\r\n"; let (resp, _) = parse_resp_with_remaining(input).unwrap(); let result = parse_command(resp); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("TTL requires exactly 1 argument")); + assert!( + result + .unwrap_err() + .to_string() + .contains("TTL requires exactly 1 argument") + ); } #[test] @@ -696,14 +744,24 @@ mod tests { let (resp, _) = parse_resp_with_remaining(input).unwrap(); let result = parse_command(resp); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("EXPIRE requires exactly 2 arguments")); + assert!( + result + .unwrap_err() + .to_string() + .contains("EXPIRE requires exactly 2 arguments") + ); // Too few arguments let input = b"*2\r\n$6\r\nEXPIRE\r\n$5\r\nmykey\r\n"; let (resp, _) = parse_resp_with_remaining(input).unwrap(); let result = parse_command(resp); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("EXPIRE requires exactly 2 arguments")); + assert!( + result + .unwrap_err() + .to_string() + .contains("EXPIRE requires exactly 2 arguments") + ); } #[test] diff --git a/src/shard_manager.rs b/src/shard_manager.rs index cd6cf5c..510bb8b 100644 --- a/src/shard_manager.rs +++ b/src/shard_manager.rs @@ -5,7 +5,7 @@ use moka::future::Cache; use sqlx::SqlitePool; use std::collections::{HashSet, VecDeque}; use tokio::sync::{mpsc, oneshot}; -use tokio::time::{Duration, timeout, interval}; +use tokio::time::{Duration, interval, timeout}; // Message type for writer consumers pub enum ShardWriteOperation { @@ -199,8 +199,17 @@ async fn process_batch( // Execute all operations in the transaction for (idx, operation) in batch.iter().enumerate() { let result = match operation { - ShardWriteOperation::Set { key, data, expires_at, .. } - | ShardWriteOperation::SetAsync { key, data, expires_at } => { + ShardWriteOperation::Set { + key, + data, + expires_at, + .. + } + | ShardWriteOperation::SetAsync { + key, + data, + expires_at, + } => { match operation { ShardWriteOperation::Set { .. } => sync_operations.push(idx), _ => {} @@ -471,9 +480,11 @@ async fn process_batch( Ok(()) } } - ShardWriteOperation::Expire { key, expires_at, .. } => { + ShardWriteOperation::Expire { + key, expires_at, .. + } => { sync_operations.push(idx); - + // Update the expires_at field for the key if it exists match sqlx::query("UPDATE blobs SET expires_at = ? WHERE key = ?") .bind(expires_at) @@ -556,14 +567,18 @@ async fn process_batch( ShardWriteOperation::Expire { responder, .. } => { // For expire operations, we need to send the actual result (bool) // Find the corresponding expire result using current operation index - if let Some((_, success)) = expire_results.iter().find(|(idx, _)| *idx == operation_idx) { + if let Some((_, success)) = + expire_results.iter().find(|(idx, _)| *idx == operation_idx) + { let final_result = match &commit_result { Ok(_) => Ok(*success), Err(e) => Err(e.clone()), }; let _ = responder.send(final_result); } else { - let _ = responder.send(Err("Internal error: could not find expire result".to_string())); + let _ = responder.send(Err( + "Internal error: could not find expire result".to_string() + )); } } ShardWriteOperation::SetAsync { .. } @@ -674,20 +689,20 @@ async fn ensure_namespaced_table_exists( } /// Background task to clean up expired keys from a shard -pub async fn shard_cleanup_task( - shard_id: usize, - pool: SqlitePool, - cleanup_interval_secs: u64, -) { +pub async fn shard_cleanup_task(shard_id: usize, pool: SqlitePool, cleanup_interval_secs: u64) { let mut interval = interval(Duration::from_secs(cleanup_interval_secs)); - - tracing::info!("[Shard {}] Starting cleanup task with interval {} seconds", shard_id, cleanup_interval_secs); - + + tracing::info!( + "[Shard {}] Starting cleanup task with interval {} seconds", + shard_id, + cleanup_interval_secs + ); + loop { interval.tick().await; - + let now = chrono::Utc::now().timestamp(); - + // Clean up expired keys from the main blobs table match sqlx::query("DELETE FROM blobs WHERE expires_at IS NOT NULL AND expires_at <= ?") .bind(now) @@ -697,45 +712,66 @@ pub async fn shard_cleanup_task( Ok(result) => { let deleted_count = result.rows_affected(); if deleted_count > 0 { - tracing::info!("[Shard {}] Cleaned up {} expired keys from blobs table", shard_id, deleted_count); + tracing::info!( + "[Shard {}] Cleaned up {} expired keys from blobs table", + shard_id, + deleted_count + ); } } Err(e) => { - tracing::error!("[Shard {}] Error during cleanup of blobs table: {}", shard_id, e); + tracing::error!( + "[Shard {}] Error during cleanup of blobs table: {}", + shard_id, + e + ); } } - + // Clean up expired keys from namespaced tables // First, get all table names that start with "blobs_" let tables_result = sqlx::query_as::<_, (String,)>( - "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'blobs_%'" + "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'blobs_%'", ) .fetch_all(&pool) .await; - + match tables_result { Ok(tables) => { for (table_name,) in tables { - let query = format!("DELETE FROM {} WHERE expires_at IS NOT NULL AND expires_at <= ?", table_name); - match sqlx::query(&query) - .bind(now) - .execute(&pool) - .await - { + let query = format!( + "DELETE FROM {} WHERE expires_at IS NOT NULL AND expires_at <= ?", + table_name + ); + match sqlx::query(&query).bind(now).execute(&pool).await { Ok(result) => { let deleted_count = result.rows_affected(); if deleted_count > 0 { - tracing::info!("[Shard {}] Cleaned up {} expired keys from table {}", shard_id, deleted_count, table_name); + tracing::info!( + "[Shard {}] Cleaned up {} expired keys from table {}", + shard_id, + deleted_count, + table_name + ); } } Err(e) => { - tracing::error!("[Shard {}] Error during cleanup of table {}: {}", shard_id, table_name, e); + tracing::error!( + "[Shard {}] Error during cleanup of table {}: {}", + shard_id, + table_name, + e + ); } } } } Err(e) => { - tracing::error!("[Shard {}] Error querying table names for cleanup: {}", shard_id, e); + tracing::error!( + "[Shard {}] Error querying table names for cleanup: {}", + shard_id, + e + ); } } } diff --git a/tests/migration_test.rs b/tests/migration_test.rs index c8356d3..09287ca 100644 --- a/tests/migration_test.rs +++ b/tests/migration_test.rs @@ -19,13 +19,16 @@ struct ShardNode(u64); // Helper function to create a test database pool async fn create_test_pool(path: &str) -> Result { + // Use recommended SQLite settings even in tests + // See: https://kerkour.com/sqlite-for-servers let connect_options = SqliteConnectOptions::from_str(&format!("sqlite:{}", path))? .create_if_missing(true) .journal_mode(SqliteJournalMode::Wal) .busy_timeout(std::time::Duration::from_millis(5000)) - .pragma("synchronous", "OFF") - .pragma("cache_size", "-100000") - .pragma("temp_store", "MEMORY"); + .pragma("synchronous", "NORMAL") // NORMAL is safer than OFF + .pragma("cache_size", "-1024000") // 1GB cache + .pragma("temp_store", "MEMORY") + .pragma("foreign_keys", "true"); SqlitePool::connect_with(connect_options).await } From 7e25245387c7171a6d0fe6c7762afc58c19e069e Mon Sep 17 00:00:00 2001 From: Sarat Chandra Date: Mon, 3 Nov 2025 23:56:54 +0530 Subject: [PATCH 2/2] feat: Divide SQLite cache/mmap budgets by connections per shard - Add max_connections config option for SQLite connection pool size (default: 10) - Update cache and mmap calculation to divide by (shards * connections) instead of just shards - Add helper methods for per-connection cache and mmap sizes - Update startup output to show per-connection resource allocation - Improve integer overflow safety with saturating arithmetic and clamping --- config.example.toml | 4 +++ src/app_state.rs | 13 ++++--- src/config.rs | 83 ++++++++++++++++++++++++++++++++++++--------- 3 files changed, 80 insertions(+), 20 deletions(-) diff --git a/config.example.toml b/config.example.toml index 81226b0..9507353 100644 --- a/config.example.toml +++ b/config.example.toml @@ -59,6 +59,10 @@ level = 3 # Example: 16GB RAM dedicated at 70% → cache_size_mb ≈ 11468 (≈2867 MB per shard for 4 shards) cache_size_mb = 400 +# Maximum SQLite connections per shard (default: 10) +# Increase for higher concurrency; remember total cache/mmap budgets are divided across shards * connections +# max_connections = 10 + # Busy timeout in milliseconds (default: 5000 = 5 seconds) # Increase for high-contention workloads busy_timeout_ms = 5000 diff --git a/src/app_state.rs b/src/app_state.rs index 299684b..4584553 100644 --- a/src/app_state.rs +++ b/src/app_state.rs @@ -3,7 +3,7 @@ use miette::Result; use moka::future::Cache; use mpchash::HashRing; use sqlx::SqlitePool; -use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; +use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; use std::fs; use std::str::FromStr; use tokio::sync::mpsc; @@ -63,15 +63,16 @@ impl AppState { validate_shard_count(&cfg.data_dir, cfg.num_shards)?; let mut db_pools_futures = vec![]; + let pool_max_connections = cfg.sqlite_pool_max_connections(); for i in 0..cfg.num_shards { let data_dir = cfg.data_dir.clone(); let db_path = format!("{}/shard_{}.db", data_dir, i); // Get SQLite configuration with defaults - let cache_size_mb = cfg.sqlite_cache_size_per_shard_mb(); + let cache_size_mb = cfg.sqlite_cache_size_per_connection_mb(); let busy_timeout_ms = cfg.sqlite_busy_timeout_ms(); let synchronous = cfg.sqlite_synchronous(); - let mmap_size = cfg.sqlite_mmap_per_shard_bytes(); + let mmap_size = cfg.sqlite_mmap_per_connection_bytes(); let mut connect_options = SqliteConnectOptions::from_str(&format!("sqlite:{}", db_path)) @@ -95,7 +96,11 @@ impl AppState { connect_options = connect_options.pragma("mmap_size", mmap_size.to_string()); } - db_pools_futures.push(sqlx::SqlitePool::connect_with(connect_options)) + db_pools_futures.push( + SqlitePoolOptions::new() + .max_connections(pool_max_connections) + .connect_with(connect_options), + ) } let db_pool_results: Vec> = diff --git a/src/config.rs b/src/config.rs index dedecba..cd3a7cd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -41,6 +41,11 @@ pub struct SqliteConfig { /// Only useful for large databases that don't fit in cache /// Example: 3000 (≈3 GB total) pub mmap_size: Option, + + /// Maximum number of SQLite connections per shard (pool size) + /// Default: 10 (sqlx default) + /// Increase for higher concurrency at the cost of RAM + pub max_connections: Option, } #[derive(Debug, Clone, serde::Deserialize)] @@ -158,26 +163,30 @@ impl Cfg { // Print SQLite configuration let cache_total_mb = cfg.sqlite_cache_total_mb(); let cache_per_shard_mb = cfg.sqlite_cache_size_per_shard_mb(); + let cache_per_connection_mb = cfg.sqlite_cache_size_per_connection_mb(); let busy_timeout = cfg.sqlite_busy_timeout_ms(); let synchronous = cfg.sqlite_synchronous(); let mmap_total_mb = cfg.sqlite_mmap_total_mb(); let mmap_per_shard_mb = cfg.sqlite_mmap_per_shard_mb(); + let mmap_per_connection_mb = cfg.sqlite_mmap_per_connection_mb(); + let pool_max_connections = cfg.sqlite_pool_max_connections(); println!("SQLite configuration:"); println!( - " cache_size: {} MB total ({} MB per shard, floor)", - cache_total_mb, cache_per_shard_mb + " cache_size: {} MB total ({} MB per shard, {} MB per connection; floor)", + cache_total_mb, cache_per_shard_mb, cache_per_connection_mb ); println!(" busy_timeout: {} ms", busy_timeout); println!(" synchronous: {}", synchronous.as_str()); if mmap_total_mb > 0 { println!( - " mmap_size: {} MB total ({} MB per shard, floor)", - mmap_total_mb, mmap_per_shard_mb + " mmap_size: {} MB total ({} MB per shard, {} MB per connection; floor)", + mmap_total_mb, mmap_per_shard_mb, mmap_per_connection_mb ); } else { println!(" mmap_size: disabled"); } + println!(" pool_max_connections: {}", pool_max_connections); Ok(cfg) } @@ -211,13 +220,26 @@ impl Cfg { return 0; } - let shards = self.num_shards as i32; - if shards <= 0 { + let shards = self.num_shards.max(1) as i64; + let per_shard = (total as i64) / shards; + per_shard.clamp(i32::MIN as i64, i32::MAX as i64).max(0) as i32 + } + + /// Derived SQLite cache size in MB per connection (floor division). + pub fn sqlite_cache_size_per_connection_mb(&self) -> i32 { + let total = self.sqlite_cache_total_mb(); + if total <= 0 { return 0; } - let per_shard = total / shards; - per_shard.max(0) + let shards = self.num_shards.max(1) as i64; + let connections = self.sqlite_pool_max_connections().max(1) as i64; + let divisor = shards.saturating_mul(connections); + + let per_connection = (total as i64) / divisor; + per_connection + .clamp(i32::MIN as i64, i32::MAX as i64) + .max(0) as i32 } /// Get SQLite busy timeout in milliseconds (default: 5000 ms = 5 seconds) @@ -248,21 +270,50 @@ impl Cfg { return 0; } - let shards = self.num_shards as u64; - if shards == 0 { + let shards = self.num_shards.max(1) as u64; + total / shards + } + + /// Derived SQLite mmap size in MB per connection (floor division). + pub fn sqlite_mmap_per_connection_mb(&self) -> u64 { + let total = self.sqlite_mmap_total_mb(); + if total == 0 { return 0; } - total / shards + let shards = self.num_shards.max(1) as u64; + let connections = self.sqlite_pool_max_connections().max(1) as u64; + let divisor = shards.saturating_mul(connections); + + if divisor == 0 { + return 0; + } + + total / divisor } - /// Derived SQLite mmap size in bytes per shard (floor division). - pub fn sqlite_mmap_per_shard_bytes(&self) -> u64 { - let per_shard_mb = self.sqlite_mmap_per_shard_mb(); - if per_shard_mb == 0 { + /// Derived SQLite mmap size in bytes per connection (floor division). + pub fn sqlite_mmap_per_connection_bytes(&self) -> u64 { + let per_connection_mb = self.sqlite_mmap_per_connection_mb(); + if per_connection_mb == 0 { return 0; } - per_shard_mb.saturating_mul(1024).saturating_mul(1024) + per_connection_mb.saturating_mul(1024).saturating_mul(1024) + } + + /// Maximum number of SQLite connections per shard (pool size). + pub fn sqlite_pool_max_connections(&self) -> u32 { + let default = 10; + let configured = self + .sqlite + .as_ref() + .and_then(|s| s.max_connections) + .unwrap_or(default); + + match configured { + 0 => 1, + value => value, + } } }