Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,33 @@ enabled = true
algorithm = "zstd" # Options: gzip, zstd, lz4, brotli
level = 3

# SQLite performance tuning (optional)
# Based on: https://kerkour.com/sqlite-for-servers
[sqlite]
# Cache size across all shards in MB (default ≈ 100 MB per shard)
# Tuning tip: cache_size_mb ≈ (Available RAM in GB * 0.7 * 1024)
# Example: 16GB RAM dedicated at 70% → cache_size_mb ≈ 11468 (≈2867 MB per shard for 4 shards)
cache_size_mb = 400

# Maximum SQLite connections per shard (default: 10)
# Increase for higher concurrency; remember total cache/mmap budgets are divided across shards * connections
# max_connections = 10

# Busy timeout in milliseconds (default: 5000 = 5 seconds)
# Increase for high-contention workloads
busy_timeout_ms = 5000

# Synchronous mode (default: NORMAL)
# OFF: Fastest, but risk of corruption on power loss (not recommended)
# NORMAL: Recommended - good performance with corruption safety in WAL mode
# FULL: Safest but slower, use for critical data
synchronous = "NORMAL"

# Total memory-mapped I/O size in MB (default: 0 = disabled)
# Only useful for databases larger than cache_size
# Example: 3000 (≈3 GB total budget)
# mmap_size = 0

# Metrics configuration (optional)
# Enable Prometheus-compatible metrics endpoint
[metrics]
Expand Down
34 changes: 24 additions & 10 deletions src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use miette::Result;
use moka::future::Cache;
use mpchash::HashRing;
use sqlx::SqlitePool;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use std::fs;
use std::str::FromStr;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -63,10 +63,17 @@ impl AppState {
validate_shard_count(&cfg.data_dir, cfg.num_shards)?;

let mut db_pools_futures = vec![];
let pool_max_connections = cfg.sqlite_pool_max_connections();
for i in 0..cfg.num_shards {
let data_dir = cfg.data_dir.clone();
let db_path = format!("{}/shard_{}.db", data_dir, i);

// Get SQLite configuration with defaults
let cache_size_mb = cfg.sqlite_cache_size_per_connection_mb();
let busy_timeout_ms = cfg.sqlite_busy_timeout_ms();
let synchronous = cfg.sqlite_synchronous();
let mmap_size = cfg.sqlite_mmap_per_connection_bytes();

let mut connect_options =
SqliteConnectOptions::from_str(&format!("sqlite:{}", db_path))
.expect(&format!(
Expand All @@ -75,18 +82,25 @@ impl AppState {
))
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.busy_timeout(std::time::Duration::from_millis(5000));
.busy_timeout(std::time::Duration::from_millis(busy_timeout_ms));

// These PRAGMAs are often set for performance with WAL mode.
// `synchronous = OFF` is safe except for power loss.
// `cache_size` is negative to indicate KiB, so -4000 is 4MB.
// `temp_store = MEMORY` avoids disk I/O for temporary tables.
// Configure SQLite PRAGMAs for optimal server performance
connect_options = connect_options
.pragma("synchronous", "OFF")
.pragma("cache_size", "-100000") // 4MB cache per shard
.pragma("temp_store", "MEMORY");
.pragma("synchronous", synchronous.as_str())
.pragma("cache_size", format!("-{}", cache_size_mb * 1024)) // Negative means KiB
.pragma("temp_store", "MEMORY")
.pragma("foreign_keys", "true");

// Enable memory-mapped I/O if configured
if mmap_size > 0 {
connect_options = connect_options.pragma("mmap_size", mmap_size.to_string());
}

db_pools_futures.push(sqlx::SqlitePool::connect_with(connect_options))
db_pools_futures.push(
SqlitePoolOptions::new()
.max_connections(pool_max_connections)
.connect_with(connect_options),
)
}

let db_pool_results: Vec<Result<SqlitePool, sqlx::Error>> =
Expand Down
12 changes: 6 additions & 6 deletions src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,10 @@ impl ClusterManager {
let (local_ip, local_port) = if let Some(ref advertise_addr) = self.config.advertise_addr {
// Parse advertise_addr which is in "ip:port" format
if let Some((ip, port)) = advertise_addr.split_once(':') {
(ip.to_string(), port.parse().unwrap_or(self.local_addr.port()))
(
ip.to_string(),
port.parse().unwrap_or(self.local_addr.port()),
)
} else {
// If no port in advertise_addr, use the IP with local port
(advertise_addr.clone(), self.local_addr.port())
Expand All @@ -444,13 +447,10 @@ impl ClusterManager {
} else {
(self.local_addr.ip().to_string(), self.local_addr.port())
};

let local_info = format!(
"{} {}:{} myself,master - 0 0 0 connected {}",
self.node_id,
local_ip,
local_port,
local_slots_ranges
self.node_id, local_ip, local_port, local_slots_ranges
);
result.push(local_info);

Expand Down
197 changes: 197 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,38 @@ pub struct Cfg {
pub addr: Option<String>,
pub cluster: Option<ClusterConfig>,
pub metrics: Option<MetricsConfig>,
pub sqlite: Option<SqliteConfig>,
}

#[derive(Debug, Clone, serde::Deserialize)]
pub struct SqliteConfig {
/// SQLite cache size in MB for the whole process
/// Default: 100 MB per shard (derived from shard count)
/// Higher values improve read performance but consume more RAM
pub cache_size_mb: Option<i32>,

/// SQLite busy timeout in milliseconds
/// Default: 5000 ms (5 seconds)
/// Increase for high-contention workloads
pub busy_timeout_ms: Option<u64>,

/// SQLite synchronous mode: OFF, NORMAL, FULL
/// Default: NORMAL (recommended for WAL mode)
/// OFF = fastest but risk of corruption on power loss
/// NORMAL = good performance with corruption safety in WAL mode
/// FULL = safest but slower
pub synchronous: Option<SqliteSynchronous>,

/// Memory-mapped I/O size in MB for the whole process
/// Default: 0 (disabled)
/// Only useful for large databases that don't fit in cache
/// Example: 3000 (≈3 GB total)
pub mmap_size: Option<u64>,

/// Maximum number of SQLite connections per shard (pool size)
/// Default: 10 (sqlx default)
/// Increase for higher concurrency at the cost of RAM
pub max_connections: Option<u32>,
}

#[derive(Debug, Clone, serde::Deserialize)]
Expand Down Expand Up @@ -46,6 +78,24 @@ pub enum CompressionType {
Brotli,
}

#[derive(Debug, Clone, Copy, serde::Deserialize, PartialEq)]
#[serde(rename_all = "UPPERCASE")]
pub enum SqliteSynchronous {
OFF,
NORMAL,
FULL,
}

impl SqliteSynchronous {
pub fn as_str(&self) -> &'static str {
match self {
SqliteSynchronous::OFF => "OFF",
SqliteSynchronous::NORMAL => "NORMAL",
SqliteSynchronous::FULL => "FULL",
}
}
}

#[derive(Debug, Clone, serde::Deserialize)]
pub struct CompressionConfig {
pub enabled: bool,
Expand Down Expand Up @@ -110,6 +160,34 @@ impl Cfg {
}
}

// Print SQLite configuration
let cache_total_mb = cfg.sqlite_cache_total_mb();
let cache_per_shard_mb = cfg.sqlite_cache_size_per_shard_mb();
let cache_per_connection_mb = cfg.sqlite_cache_size_per_connection_mb();
let busy_timeout = cfg.sqlite_busy_timeout_ms();
let synchronous = cfg.sqlite_synchronous();
let mmap_total_mb = cfg.sqlite_mmap_total_mb();
let mmap_per_shard_mb = cfg.sqlite_mmap_per_shard_mb();
let mmap_per_connection_mb = cfg.sqlite_mmap_per_connection_mb();
let pool_max_connections = cfg.sqlite_pool_max_connections();

println!("SQLite configuration:");
println!(
" cache_size: {} MB total ({} MB per shard, {} MB per connection; floor)",
cache_total_mb, cache_per_shard_mb, cache_per_connection_mb
);
println!(" busy_timeout: {} ms", busy_timeout);
println!(" synchronous: {}", synchronous.as_str());
if mmap_total_mb > 0 {
println!(
" mmap_size: {} MB total ({} MB per shard, {} MB per connection; floor)",
mmap_total_mb, mmap_per_shard_mb, mmap_per_connection_mb
);
} else {
println!(" mmap_size: disabled");
}
println!(" pool_max_connections: {}", pool_max_connections);

Ok(cfg)
}

Expand All @@ -119,4 +197,123 @@ impl Cfg {
_ => false,
}
}

/// Get configured SQLite cache size in MB for the whole process.
/// Default scales with shard count to preserve the 100 MB per-shard baseline.
pub fn sqlite_cache_total_mb(&self) -> i32 {
let default_per_shard = 100usize;
let default_total = self
.num_shards
.saturating_mul(default_per_shard)
.min(i32::MAX as usize) as i32;

self.sqlite
.as_ref()
.and_then(|s| s.cache_size_mb)
.unwrap_or(default_total)
}

/// Derived SQLite cache size in MB per shard (floor division).
pub fn sqlite_cache_size_per_shard_mb(&self) -> i32 {
let total = self.sqlite_cache_total_mb();
if total <= 0 {
return 0;
}

let shards = self.num_shards.max(1) as i64;
let per_shard = (total as i64) / shards;
per_shard.clamp(i32::MIN as i64, i32::MAX as i64).max(0) as i32
}

/// Derived SQLite cache size in MB per connection (floor division).
pub fn sqlite_cache_size_per_connection_mb(&self) -> i32 {
let total = self.sqlite_cache_total_mb();
if total <= 0 {
return 0;
}

let shards = self.num_shards.max(1) as i64;
let connections = self.sqlite_pool_max_connections().max(1) as i64;
let divisor = shards.saturating_mul(connections);

let per_connection = (total as i64) / divisor;
per_connection
.clamp(i32::MIN as i64, i32::MAX as i64)
.max(0) as i32
}

/// Get SQLite busy timeout in milliseconds (default: 5000 ms = 5 seconds)
pub fn sqlite_busy_timeout_ms(&self) -> u64 {
self.sqlite
.as_ref()
.and_then(|s| s.busy_timeout_ms)
.unwrap_or(5000)
}

/// Get SQLite synchronous mode (default: NORMAL)
pub fn sqlite_synchronous(&self) -> SqliteSynchronous {
self.sqlite
.as_ref()
.and_then(|s| s.synchronous)
.unwrap_or(SqliteSynchronous::NORMAL)
}

/// Get SQLite mmap size in MB (default: 0 = disabled)
pub fn sqlite_mmap_total_mb(&self) -> u64 {
self.sqlite.as_ref().and_then(|s| s.mmap_size).unwrap_or(0)
}

/// Derived SQLite mmap size in MB per shard (floor division).
pub fn sqlite_mmap_per_shard_mb(&self) -> u64 {
let total = self.sqlite_mmap_total_mb();
if total == 0 {
return 0;
}

let shards = self.num_shards.max(1) as u64;
total / shards
}

/// Derived SQLite mmap size in MB per connection (floor division).
pub fn sqlite_mmap_per_connection_mb(&self) -> u64 {
let total = self.sqlite_mmap_total_mb();
if total == 0 {
return 0;
}

let shards = self.num_shards.max(1) as u64;
let connections = self.sqlite_pool_max_connections().max(1) as u64;
let divisor = shards.saturating_mul(connections);

if divisor == 0 {
return 0;
}

total / divisor
}

/// Derived SQLite mmap size in bytes per connection (floor division).
pub fn sqlite_mmap_per_connection_bytes(&self) -> u64 {
let per_connection_mb = self.sqlite_mmap_per_connection_mb();
if per_connection_mb == 0 {
return 0;
}

per_connection_mb.saturating_mul(1024).saturating_mul(1024)
}

/// Maximum number of SQLite connections per shard (pool size).
pub fn sqlite_pool_max_connections(&self) -> u32 {
let default = 10;
let configured = self
.sqlite
.as_ref()
.and_then(|s| s.max_connections)
.unwrap_or(default);

match configured {
0 => 1,
value => value,
}
}
}
9 changes: 6 additions & 3 deletions src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,17 @@ impl MigrationManager {
async fn create_connection_pool(&self, shard_id: usize) -> Result<SqlitePool> {
let db_path = format!("{}/shard_{}.db", self.data_dir, shard_id);

// Use recommended SQLite settings for migration operations
// See: https://kerkour.com/sqlite-for-servers
let connect_options = SqliteConnectOptions::from_str(&format!("sqlite:{}", db_path))
.map_err(|e| sqlx_to_miette(e, "Failed to parse connection string"))?
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.busy_timeout(std::time::Duration::from_millis(5000))
.pragma("synchronous", "OFF")
.pragma("cache_size", "-100000")
.pragma("temp_store", "MEMORY");
.pragma("synchronous", "NORMAL") // NORMAL is safer than OFF
.pragma("cache_size", "-1024000") // 1GB cache for better performance during migration
.pragma("temp_store", "MEMORY")
.pragma("foreign_keys", "true");

SqlitePool::connect_with(connect_options)
.await
Expand Down
Loading