diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index c2c84f3707..98dbe10b99 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -65,6 +65,10 @@ pub const SNAP_LIMIT: usize = 128; // increasing them may be the cause of peers disconnection pub const MAX_BLOCK_BODIES_TO_REQUEST: usize = 128; +const STORAGE_ROOTS_PER_CHUNK: usize = 10_000; +// How many storage roots we include in a single task sent to a peer. +const STORAGE_ROOTS_PER_TASK: usize = 300; + /// An abstraction over the [Kademlia] containing logic to make requests to peers #[derive(Debug, Clone)] pub struct PeerHandler { @@ -1277,43 +1281,87 @@ impl PeerHandler { .current_step .set(CurrentStepValue::RequestingStorageRanges); debug!("Starting request_storage_ranges function"); - // 1) split the range in chunks of same length - let mut accounts_by_root_hash: BTreeMap<_, Vec<_>> = BTreeMap::new(); - for (account, (maybe_root_hash, _)) in &account_storage_roots.accounts_with_storage_root { - match maybe_root_hash { - Some(root) => { - accounts_by_root_hash - .entry(*root) - .or_default() - .push(*account); - } + // 1) collect pairs of (account_hash, storage_root) + let account_root_pairs: Vec<(H256, Option)> = account_storage_roots + .accounts_with_storage_root + .iter() + .map(|(account, (maybe_root_hash, _))| (*account, *maybe_root_hash)) + .collect(); + let mut chunk_groups: BTreeMap> = BTreeMap::new(); + + // 2) group accounts by storage root and process them in chunks of STORAGE_ROOTS_PER_CHUNK + for (account, maybe_root_hash) in account_root_pairs { + // 2.1) Make sure we have the storage root for the account + let root = match maybe_root_hash { + Some(root) => root, None => { - let root = store - .get_account_state_by_acc_hash(pivot_header.hash(), *account) + store + .get_account_state_by_acc_hash(pivot_header.hash(), account) .expect("Failed to get account in state trie") .expect("Could not find account that should have been downloaded or healed") - .storage_root; - accounts_by_root_hash - .entry(root) - .or_default() - .push(*account); + .storage_root } + }; + + chunk_groups.entry(root).or_default().push(account); + + // 2.2) If we have enough roots, process them + if chunk_groups.len() >= STORAGE_ROOTS_PER_CHUNK { + let chunk_accounts = Vec::from_iter(chunk_groups.into_iter()); + self.process_storage_chunk( + chunk_accounts, + account_storage_roots, + account_storages_snapshots_dir, + &mut chunk_index, + pivot_header, + ) + .await?; + chunk_groups = BTreeMap::new(); } } - let mut accounts_by_root_hash = Vec::from_iter(accounts_by_root_hash); - // TODO: Turn this into a stable sort for binary search. + + // 2.3) Process remaining roots if any + if !chunk_groups.is_empty() { + let chunk_accounts = Vec::from_iter(chunk_groups.into_iter()); + self.process_storage_chunk( + chunk_accounts, + account_storage_roots, + account_storages_snapshots_dir, + &mut chunk_index, + pivot_header, + ) + .await?; + } + + Ok(chunk_index) + } + + async fn process_storage_chunk( + &mut self, + mut accounts_by_root_hash: Vec<(H256, Vec)>, + account_storage_roots: &mut AccountStorageRoots, + account_storages_snapshots_dir: &Path, + chunk_index: &mut u64, + pivot_header: &mut BlockHeader, + ) -> Result<(), PeerHandlerError> { + if accounts_by_root_hash.is_empty() { + return Ok(()); + } + + // Maintain previous prioritization of busy roots accounts_by_root_hash.sort_unstable_by_key(|(_, accounts)| !accounts.len()); - let chunk_size = 300; - let chunk_count = (accounts_by_root_hash.len() / chunk_size) + 1; + let total_roots = accounts_by_root_hash.len(); + let task_span = STORAGE_ROOTS_PER_TASK.min(STORAGE_ROOTS_PER_CHUNK); + // how many fully-populated task_span slices fit in + let task_partition_count = total_roots.div_ceil(task_span); // list of tasks to be executed // Types are (start_index, end_index, starting_hash) // NOTE: end_index is NOT inclusive - let mut tasks_queue_not_started = VecDeque::::new(); - for i in 0..chunk_count { - let chunk_start = chunk_size * i; - let chunk_end = (chunk_start + chunk_size).min(accounts_by_root_hash.len()); + for i in 0..task_partition_count { + let chunk_start = task_span * i; + let chunk_end = ((i + 1) * task_span).min(total_roots); tasks_queue_not_started.push_back(StorageTask { start_index: chunk_start, end_index: chunk_end, @@ -1339,7 +1387,7 @@ impl PeerHandler { // vector of hashed storage keys and storage values. let mut current_account_storages: BTreeMap = BTreeMap::new(); - debug!("Starting request_storage_ranges loop"); + debug!(chunk = chunk_index, "Starting request_storage_ranges loop"); loop { if current_account_storages .values() @@ -1370,15 +1418,16 @@ impl PeerHandler { }) .map_err(PeerHandlerError::DumpError)?; } + let file_index = *chunk_index; disk_joinset.spawn(async move { let path = get_account_storages_snapshot_file( &account_storages_snapshots_dir_cloned, - chunk_index, + file_index, ); dump_storages_to_file(&path, snapshot) }); - chunk_index += 1; + *chunk_index += 1; } if let Ok(result) = task_receiver.try_recv() { @@ -1396,9 +1445,7 @@ impl PeerHandler { for (_, accounts) in accounts_by_root_hash[start_index..remaining_start].iter() { for account in accounts { - if !accounts_done.contains_key(account) { - accounts_done.insert(*account, vec![]); - } + accounts_done.entry(*account).or_default(); } } @@ -1429,7 +1476,11 @@ impl PeerHandler { let acc_hash = accounts_by_root_hash[remaining_start].1[0]; let (_, old_intervals) = account_storage_roots .accounts_with_storage_root - .get_mut(&acc_hash).ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; + .get_mut(&acc_hash) + .ok_or(PeerHandlerError::UnrecoverableError( + "Tried to get the old download intervals for an account but did not find them" + .to_owned(), + ))?; for (old_start, end) in old_intervals { if end == &hash_end { *old_start = hash_start; @@ -1461,7 +1512,10 @@ impl PeerHandler { let (_, old_intervals) = account_storage_roots .accounts_with_storage_root .get_mut(&acc_hash) - .ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; + .ok_or(PeerHandlerError::UnrecoverableError( + "Tried to get the old download intervals for an account but did not find them" + .to_owned(), + ))?; old_intervals.remove( old_intervals .iter() @@ -1533,7 +1587,10 @@ impl PeerHandler { let (_, intervals) = account_storage_roots .accounts_with_storage_root .get_mut(&accounts_by_root_hash[remaining_start].1[0]) - .ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; + .ok_or(PeerHandlerError::UnrecoverableError( + "Tried to get the old download intervals for an account but did not find them" + .to_owned(), + ))?; for i in 0..chunk_count { let start_hash_u256 = start_hash_u256 + chunk_size * i; @@ -1569,7 +1626,10 @@ impl PeerHandler { let (_, intervals) = account_storage_roots .accounts_with_storage_root .get_mut(&accounts_by_root_hash[remaining_start].1[0]) - .ok_or(PeerHandlerError::UnrecoverableError("Trie to get the old download intervals for an account but did not find them".to_owned()))?; + .ok_or(PeerHandlerError::UnrecoverableError( + "Tried to get the old download intervals for an account but did not find them" + .to_owned(), + ))?; for i in 0..chunk_count { let start_hash_u256 = start_hash_u256 + chunk_size * i; @@ -1713,10 +1773,12 @@ impl PeerHandler { std::fs::create_dir_all(account_storages_snapshots_dir) .map_err(|_| PeerHandlerError::CreateStorageSnapshotsDir)?; } + let path = - get_account_storages_snapshot_file(account_storages_snapshots_dir, chunk_index); + get_account_storages_snapshot_file(account_storages_snapshots_dir, *chunk_index); dump_storages_to_file(&path, snapshot) - .map_err(|_| PeerHandlerError::WriteStorageSnapshotsDir(chunk_index))?; + .map_err(|_| PeerHandlerError::WriteStorageSnapshotsDir(*chunk_index))?; + *chunk_index += 1; } disk_joinset .join_all() @@ -1744,7 +1806,7 @@ impl PeerHandler { self.peer_table.free_peer(&result.peer_id).await?; } - Ok(chunk_index + 1) + Ok(()) } async fn request_storage_ranges_worker( diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index aec627d3a9..51f59676d8 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -9,8 +9,8 @@ use ethrex_common::{ }; use ethrex_trie::{Nibbles, NodeHash, Trie}; use rocksdb::{ - BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamilyDescriptor, MultiThreaded, - OptimisticTransactionDB, Options, WriteBatchWithTransaction, + BlockBasedOptions, BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, + Options, WriteBatch, }; use std::{collections::HashSet, path::Path, sync::Arc}; use tracing::info; @@ -103,7 +103,7 @@ const CF_INVALID_ANCESTORS: &str = "invalid_ancestors"; #[derive(Debug)] pub struct Store { - db: Arc>, + db: Arc>, } impl Store { @@ -112,8 +112,6 @@ impl Store { db_options.create_if_missing(true); db_options.create_missing_column_families(true); - let cache = Cache::new_lru_cache(4 * 1024 * 1024 * 1024); // 4GB cache - db_options.set_max_open_files(-1); db_options.set_max_file_opening_threads(16); @@ -166,18 +164,17 @@ impl Store { ]; // Get existing column families to know which ones to drop later - let existing_cfs = - match OptimisticTransactionDB::::list_cf(&db_options, path) { - Ok(cfs) => { - info!("Found existing column families: {:?}", cfs); - cfs - } - Err(_) => { - // Database doesn't exist yet - info!("Database doesn't exist, will create with expected column families"); - vec!["default".to_string()] - } - }; + let existing_cfs = match DBWithThreadMode::::list_cf(&db_options, path) { + Ok(cfs) => { + info!("Found existing column families: {:?}", cfs); + cfs + } + Err(_) => { + // Database doesn't exist yet + info!("Database doesn't exist, will create with expected column families"); + vec!["default".to_string()] + } + }; // Create descriptors for ALL existing CFs + expected ones (RocksDB requires opening all existing CFs) let mut all_cfs_to_open = HashSet::new(); @@ -211,9 +208,7 @@ impl Store { cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB let mut block_opts = BlockBasedOptions::default(); - block_opts.set_block_cache(&cache); block_opts.set_block_size(32 * 1024); // 32KB blocks - block_opts.set_cache_index_and_filter_blocks(true); cf_opts.set_block_based_table_factory(&block_opts); } CF_CANONICAL_BLOCK_HASHES | CF_BLOCK_NUMBERS => { @@ -223,10 +218,8 @@ impl Store { cf_opts.set_target_file_size_base(128 * 1024 * 1024); // 128MB let mut block_opts = BlockBasedOptions::default(); - block_opts.set_block_cache(&cache); block_opts.set_block_size(16 * 1024); // 16KB block_opts.set_bloom_filter(10.0, false); - block_opts.set_cache_index_and_filter_blocks(true); cf_opts.set_block_based_table_factory(&block_opts); } CF_STATE_TRIE_NODES | CF_STORAGE_TRIES_NODES => { @@ -239,10 +232,7 @@ impl Store { let mut block_opts = BlockBasedOptions::default(); block_opts.set_block_size(16 * 1024); // 16KB - block_opts.set_block_cache(&cache); block_opts.set_bloom_filter(10.0, false); // 10 bits per key - block_opts.set_cache_index_and_filter_blocks(true); - block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); cf_opts.set_block_based_table_factory(&block_opts); } CF_RECEIPTS | CF_ACCOUNT_CODES => { @@ -252,9 +242,7 @@ impl Store { cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB let mut block_opts = BlockBasedOptions::default(); - block_opts.set_block_cache(&cache); block_opts.set_block_size(32 * 1024); // 32KB - block_opts.set_block_cache(&cache); cf_opts.set_block_based_table_factory(&block_opts); } _ => { @@ -266,7 +254,6 @@ impl Store { let mut block_opts = BlockBasedOptions::default(); block_opts.set_block_size(16 * 1024); - block_opts.set_block_cache(&cache); cf_opts.set_block_based_table_factory(&block_opts); } } @@ -274,7 +261,7 @@ impl Store { cf_descriptors.push(ColumnFamilyDescriptor::new(cf_name, cf_opts)); } - let db = OptimisticTransactionDB::::open_cf_descriptors( + let db = DBWithThreadMode::::open_cf_descriptors( &db_options, path, cf_descriptors, @@ -370,7 +357,7 @@ impl Store { let db = self.db.clone(); tokio::task::spawn_blocking(move || { - let mut batch = WriteBatchWithTransaction::default(); + let mut batch = WriteBatch::default(); for (cf_name, key, value) in batch_ops { let cf = db.cf_handle(&cf_name).ok_or_else(|| { @@ -467,7 +454,7 @@ impl StoreEngine for Store { )?; let _span = tracing::trace_span!("Block DB update").entered(); - let mut batch = WriteBatchWithTransaction::default(); + let mut batch = WriteBatch::default(); for (node_hash, node_data) in update_batch.account_updates { batch.put_cf(&cf_state, node_hash.as_ref(), node_data); @@ -537,7 +524,7 @@ impl StoreEngine for Store { let db = self.db.clone(); tokio::task::spawn_blocking(move || { - let mut batch = WriteBatchWithTransaction::default(); + let mut batch = WriteBatch::default(); let [cf_headers, cf_bodies, cf_block_numbers, cf_tx_locations] = open_cfs( &db, @@ -646,7 +633,7 @@ impl StoreEngine for Store { } async fn remove_block(&self, block_number: BlockNumber) -> Result<(), StoreError> { - let mut batch = WriteBatchWithTransaction::default(); + let mut batch = WriteBatch::default(); let Some(hash) = self.get_canonical_block_hash_sync(block_number)? else { return Ok(()); @@ -896,7 +883,7 @@ impl StoreEngine for Store { .ok_or_else(|| StoreError::Custom("Column family not found".to_string()))?; let mut iter = db.iterator_cf(&cf, rocksdb::IteratorMode::Start); - let mut batch = WriteBatchWithTransaction::default(); + let mut batch = WriteBatch::default(); while let Some(Ok((key, _))) = iter.next() { batch.delete_cf(&cf, key); @@ -1120,7 +1107,7 @@ impl StoreEngine for Store { let db = self.db.clone(); tokio::task::spawn_blocking(move || { - let mut batch = WriteBatchWithTransaction::default(); + let mut batch = WriteBatch::default(); let [cf_canonical, cf_chain_data] = open_cfs(&db, [CF_CANONICAL_BLOCK_HASHES, CF_CHAIN_DATA])?; @@ -1396,7 +1383,7 @@ impl StoreEngine for Store { /// Open column families fn open_cfs<'a, const N: usize>( - db: &'a Arc>, + db: &'a Arc>, names: [&str; N], ) -> Result<[Arc>; N], StoreError> { let mut handles = Vec::with_capacity(N); diff --git a/crates/storage/trie_db/rocksdb.rs b/crates/storage/trie_db/rocksdb.rs index 9983ce02bf..b4f780ebf6 100644 --- a/crates/storage/trie_db/rocksdb.rs +++ b/crates/storage/trie_db/rocksdb.rs @@ -1,13 +1,13 @@ use ethrex_common::H256; use ethrex_rlp::encode::RLPEncode; use ethrex_trie::{Node, NodeHash, TrieDB, error::TrieError}; -use rocksdb::{MultiThreaded, OptimisticTransactionDB}; +use rocksdb::{DBWithThreadMode, MultiThreaded}; use std::sync::Arc; /// RocksDB implementation for the TrieDB trait, with get and put operations. pub struct RocksDBTrieDB { /// RocksDB database - db: Arc>, + db: Arc>, /// Column family name cf_name: String, /// Storage trie address prefix @@ -16,7 +16,7 @@ pub struct RocksDBTrieDB { impl RocksDBTrieDB { pub fn new( - db: Arc>, + db: Arc>, cf_name: &str, address_prefix: Option, ) -> Result { @@ -69,7 +69,7 @@ impl TrieDB for RocksDBTrieDB { fn put_batch(&self, key_values: Vec<(NodeHash, Vec)>) -> Result<(), TrieError> { let cf = self.cf_handle()?; - let mut batch = rocksdb::WriteBatchWithTransaction::default(); + let mut batch = rocksdb::WriteBatch::default(); for (key, value) in key_values { let db_key = self.make_key(&key); @@ -104,7 +104,7 @@ impl TrieDB for RocksDBTrieDB { mod tests { use super::*; use ethrex_trie::NodeHash; - use rocksdb::{ColumnFamilyDescriptor, MultiThreaded, Options}; + use rocksdb::{ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options}; use tempfile::TempDir; #[test] @@ -118,7 +118,7 @@ mod tests { db_options.create_missing_column_families(true); let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default()); - let db = OptimisticTransactionDB::::open_cf_descriptors( + let db = DBWithThreadMode::::open_cf_descriptors( &db_options, db_path, vec![cf_descriptor], @@ -158,7 +158,7 @@ mod tests { db_options.create_missing_column_families(true); let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default()); - let db = OptimisticTransactionDB::::open_cf_descriptors( + let db = DBWithThreadMode::::open_cf_descriptors( &db_options, db_path, vec![cf_descriptor], @@ -195,7 +195,7 @@ mod tests { db_options.create_missing_column_families(true); let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default()); - let db = OptimisticTransactionDB::::open_cf_descriptors( + let db = DBWithThreadMode::::open_cf_descriptors( &db_options, db_path, vec![cf_descriptor], diff --git a/crates/storage/trie_db/rocksdb_locked.rs b/crates/storage/trie_db/rocksdb_locked.rs index a843ba4605..ed32338899 100644 --- a/crates/storage/trie_db/rocksdb_locked.rs +++ b/crates/storage/trie_db/rocksdb_locked.rs @@ -1,23 +1,23 @@ use ethrex_common::H256; use ethrex_trie::{NodeHash, TrieDB, error::TrieError}; -use rocksdb::{MultiThreaded, OptimisticTransactionDB, SnapshotWithThreadMode}; +use rocksdb::{DBWithThreadMode, MultiThreaded, SnapshotWithThreadMode}; use std::sync::Arc; /// RocksDB locked implementation for the TrieDB trait, read-only with consistent snapshot. pub struct RocksDBLockedTrieDB { /// RocksDB database - db: &'static Arc>, + db: &'static Arc>, /// Column family handle cf: std::sync::Arc>, /// Read-only snapshot for consistent reads - snapshot: SnapshotWithThreadMode<'static, OptimisticTransactionDB>, + snapshot: SnapshotWithThreadMode<'static, DBWithThreadMode>, /// Storage trie address prefix address_prefix: Option, } impl RocksDBLockedTrieDB { pub fn new( - db: Arc>, + db: Arc>, cf_name: &str, address_prefix: Option, ) -> Result { @@ -61,8 +61,8 @@ impl Drop for RocksDBLockedTrieDB { // Restore the leaked database reference unsafe { drop(Box::from_raw( - self.db as *const Arc> - as *mut Arc>, + self.db as *const Arc> + as *mut Arc>, )); } }