diff --git a/.github/workflows/pr-main_l1.yaml b/.github/workflows/pr-main_l1.yaml index 15691741c7..1560c17806 100644 --- a/.github/workflows/pr-main_l1.yaml +++ b/.github/workflows/pr-main_l1.yaml @@ -167,7 +167,7 @@ jobs: artifact_prefix: engine_paris - name: "Engine withdrawal tests" simulation: ethereum/engine - limit: "engine-withdrawals/Corrupted Block Hash Payload|Empty Withdrawals|engine-withdrawals test loader|GetPayloadBodies|GetPayloadV2 Block Value|Max Initcode Size|Sync after 2 blocks - Withdrawals on Genesis|Withdraw many accounts|Withdraw to a single account|Withdraw to two accounts|Withdraw zero amount|Withdraw many accounts|Withdrawals Fork on Block 1 - 1 Block Re-Org|Withdrawals Fork on Block 1 - 8 Block Re-Org NewPayload|Withdrawals Fork on Block 2|Withdrawals Fork on Block 3|Withdrawals Fork on Block 8 - 10 Block Re-Org NewPayload|Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org [^S]|Withdrawals Fork on Canonical Block 8 / Side Block 9 - 10 Block Re-Org [^S]" + limit: "engine-withdrawals" artifact_prefix: engine_withdrawals # Investigate this test # - name: "Sync" diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 6f31c301a5..14a24cead0 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -19,7 +19,7 @@ use crate::{ }, }, snap::encodable_to_proof, - sync::{AccountStorageRoots, BlockSyncState, block_is_stale, update_pivot}, + sync::{AccountStorageRoots, SnapBlockSyncState, block_is_stale, update_pivot}, utils::{ AccountsWithStorage, dump_accounts_to_file, dump_storages_to_file, get_account_state_snapshot_file, get_account_storages_snapshot_file, @@ -469,11 +469,13 @@ impl PeerHandler { ) .await { - if are_block_headers_chained(&block_headers, &order) { + if are_block_headers_chained(&block_headers, &order) + && !block_headers.is_empty() + { return Ok(Some(block_headers)); } else { warn!( - "[SYNCING] Received invalid headers from peer, penalizing peer {peer_id}" + "[SYNCING] Received empty/invalid headers from peer, penalizing peer {peer_id}" ); } } @@ -678,7 +680,7 @@ impl PeerHandler { limit: H256, account_state_snapshots_dir: &Path, pivot_header: &mut BlockHeader, - block_sync_state: &mut BlockSyncState, + block_sync_state: &mut SnapBlockSyncState, ) -> Result<(), PeerHandlerError> { METRICS .current_step diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 8d0d7a8254..4ef9592da9 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -174,12 +174,10 @@ impl Syncer { /// Performs the sync cycle described in `start_sync`, returns an error if the sync fails at any given step and aborts all active processes async fn sync_cycle_snap(&mut self, sync_head: H256, store: Store) -> Result<(), SyncError> { - // Take picture of the current sync mode, we will update the original value when we need to - let mut sync_mode = SyncMode::Snap; // Request all block headers between the current head and the sync head // We will begin from the current head so that we download the earliest state first // This step is not parallelized - let mut block_sync_state = BlockSyncState::new(&sync_mode, store.clone()); + let mut block_sync_state = SnapBlockSyncState::new(store.clone()); // Check if we have some blocks downloaded from a previous sync attempt // This applies only to snap sync—full sync always starts fetching headers // from the canonical block, which updates as new block headers are fetched. @@ -214,11 +212,7 @@ impl Syncer { debug!("Sync Log 1: In snap sync"); debug!( "Sync Log 2: State block hashes len {}", - match block_sync_state { - BlockSyncState::Full(_) => 0, - BlockSyncState::Snap(ref snap_block_sync_state) => - snap_block_sync_state.block_hashes.len(), - } + block_sync_state.block_hashes.len() ); debug!("Requesting Block Headers from {current_head}"); @@ -282,16 +276,15 @@ impl Syncer { current_head_number = last_block_number; // If the sync head is less than 64 blocks away from our current head switch to full-sync - if sync_mode == SyncMode::Snap && sync_head_found { + if sync_head_found { let latest_block_number = store.get_latest_block_number().await?; if last_block_number.saturating_sub(latest_block_number) < MIN_FULL_BLOCKS as u64 { // Too few blocks for a snap sync, switching to full sync debug!( "Sync head is less than {MIN_FULL_BLOCKS} blocks away, switching to FullSync" ); - sync_mode = SyncMode::Full; self.snap_enabled.store(false, Ordering::Relaxed); - block_sync_state = block_sync_state.into_fullsync().await?; + return self.sync_cycle_full(sync_head, store.clone()).await; } } @@ -299,23 +292,9 @@ impl Syncer { if block_headers.len() > 1 { let block_headers_iter = block_headers.into_iter().skip(1); - match block_sync_state { - BlockSyncState::Full(ref mut state) => { - state - .process_incoming_headers( - block_headers_iter, - sync_head, - sync_head_found, - self.blockchain.clone(), - self.peers.clone(), - self.cancel_token.clone(), - ) - .await?; - } - BlockSyncState::Snap(ref mut state) => { - state.process_incoming_headers(block_headers_iter).await? - } - } + block_sync_state + .process_incoming_headers(block_headers_iter) + .await?; } if sync_head_found { @@ -323,13 +302,11 @@ impl Syncer { }; } - if let SyncMode::Snap = sync_mode { - self.snap_sync(&store, &mut block_sync_state).await?; + self.snap_sync(&store, &mut block_sync_state).await?; - store.clear_snap_state().await?; + store.clear_snap_state().await?; + self.snap_enabled.store(false, Ordering::Relaxed); - self.snap_enabled.store(false, Ordering::Relaxed); - } Ok(()) } @@ -338,109 +315,220 @@ impl Syncer { /// # Returns /// /// Returns an error if the sync fails at any given step and aborts all active processes - async fn sync_cycle_full(&mut self, sync_head: H256, store: Store) -> Result<(), SyncError> { - // Request all block headers between the current head and the sync head - // We will begin from the current head so that we download the earliest state first - // This step is not parallelized - let mut block_sync_state = FullBlockSyncState::new(store.clone()); - // Check if we have some blocks downloaded from a previous sync attempt - // This applies only to snap sync—full sync always starts fetching headers - // from the canonical block, which updates as new block headers are fetched. - let mut current_head = block_sync_state.get_current_head().await?; - info!( - "Syncing from current head {:?} to sync_head {:?}", - current_head, sync_head - ); + async fn sync_cycle_full( + &mut self, + mut sync_head: H256, + store: Store, + ) -> Result<(), SyncError> { + info!("Syncing to sync_head {:?}", sync_head); - loop { - debug!("Sync Log 1: In Full Sync"); - debug!( - "Sync Log 3: State current headers len {}", - block_sync_state.current_headers.len() - ); - debug!( - "Sync Log 4: State current blocks len {}", - block_sync_state.current_blocks.len() - ); + // Check if the sync_head is a pending block, if so, gather all pending blocks belonging to its chain + let mut pending_blocks = vec![]; + while let Some(block) = store.get_pending_block(sync_head).await? { + if store.is_canonical_sync(block.hash())? { + // Ignore canonical blocks still in pending + break; + } + sync_head = block.header.parent_hash; + pending_blocks.insert(0, block); + } - debug!("Requesting Block Headers from {current_head}"); + // Request all block headers between the sync head and our local chain + // We will begin from the sync head so that we download the latest state first, ensuring we follow the correct chain + // This step is not parallelized + let mut start_block_number; + let mut end_block_number = 0; + let mut headers = vec![]; + let mut single_batch = true; + // Request and store all block headers from the advertised sync head + loop { let Some(mut block_headers) = self .peers - .request_block_headers_from_hash(current_head, BlockRequestOrder::OldToNew) + .request_block_headers_from_hash(sync_head, BlockRequestOrder::NewToOld) .await? else { warn!("Sync failed to find target block header, aborting"); debug!("Sync Log 8: Sync failed to find target block header, aborting"); return Ok(()); }; - debug!("Sync Log 9: Received {} block headers", block_headers.len()); - let (first_block_hash, first_block_number, first_block_parent_hash) = - match block_headers.first() { - Some(header) => (header.hash(), header.number, header.parent_hash), - None => continue, - }; - let (last_block_hash, last_block_number) = match block_headers.last() { - Some(header) => (header.hash(), header.number), - None => continue, - }; - // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck - // on a loop when the target head is not found, i.e. on a reorg with a side-chain. - if first_block_hash == last_block_hash - && first_block_hash == current_head - && current_head != sync_head - { - // There is no path to the sync head this goes back until it find a common ancerstor - warn!("Sync failed to find target block header, going back to the previous parent"); - current_head = first_block_parent_hash; - continue; - } + let first_header = block_headers.first().ok_or(SyncError::NoBlocks)?; + let last_header = block_headers.last().ok_or(SyncError::NoBlocks)?; - debug!( + info!( "Received {} block headers| First Number: {} Last Number: {}", block_headers.len(), - first_block_number, - last_block_number + first_header.number, + last_header.number, ); - - // Filter out everything after the sync_head - let mut sync_head_found = false; - if let Some(index) = block_headers - .iter() - .position(|header| header.hash() == sync_head) - { - sync_head_found = true; - block_headers.drain(index + 1..); + end_block_number = end_block_number.max(first_header.number); + start_block_number = last_header.number; + + sync_head = last_header.parent_hash; + if store.is_canonical_sync(sync_head)? || sync_head.is_zero() { + // Incoming chain merged with current chain + // Filter out already canonical blocks from batch + let mut first_canon_block = block_headers.len(); + for (index, header) in block_headers.iter().enumerate() { + if store.is_canonical_sync(header.hash())? { + first_canon_block = index; + break; + } + } + block_headers.drain(first_canon_block..block_headers.len()); + if !block_headers.is_empty() { + start_block_number = block_headers.last().ok_or(SyncError::NoBlocks)?.number + } + // If the fullsync consists of a single batch of headers we can just keep them in memory instead of writing them to Store + if single_batch { + headers = block_headers.into_iter().rev().collect(); + } else { + store.add_fullsync_batch(block_headers).await?; + } + break; + } + store.add_fullsync_batch(block_headers).await?; + single_batch = false; + } + end_block_number += 1; + start_block_number = start_block_number.max(1); + + // Download block bodies and execute full blocks in batches + for start in (start_block_number..end_block_number).step_by(*EXECUTE_BATCH_SIZE) { + let batch_size = EXECUTE_BATCH_SIZE.min((end_block_number - start) as usize); + let final_batch = end_block_number == start + batch_size as u64; + // Retrieve batch from DB + if !single_batch { + headers = store.read_fullsync_batch(start, batch_size as u64).await?; + } + let mut blocks = Vec::new(); + // Request block bodies + // Download block bodies + while !headers.is_empty() { + let header_batch = &headers[..min(MAX_BLOCK_BODIES_TO_REQUEST, headers.len())]; + let bodies = self + .peers + .request_and_validate_block_bodies(header_batch) + .await? + .ok_or(SyncError::BodiesNotFound)?; + debug!("Obtained: {} block bodies", bodies.len()); + let block_batch = headers + .drain(..bodies.len()) + .zip(bodies) + .map(|(header, body)| Block { header, body }); + blocks.extend(block_batch); + } + if !blocks.is_empty() { + // Execute blocks + info!( + "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", + blocks.len(), + blocks.first().ok_or(SyncError::NoBlocks)?.hash(), + blocks.last().ok_or(SyncError::NoBlocks)?.hash() + ); + self.add_blocks_in_batch(blocks, final_batch, store.clone()) + .await?; } + } - // Update current fetch head - current_head = last_block_hash; + // Execute pending blocks + if !pending_blocks.is_empty() { + info!( + "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", + pending_blocks.len(), + pending_blocks.first().ok_or(SyncError::NoBlocks)?.hash(), + pending_blocks.last().ok_or(SyncError::NoBlocks)?.hash() + ); + self.add_blocks_in_batch(pending_blocks, true, store.clone()) + .await?; + } - // Discard the first header as we already have it - if block_headers.len() > 1 { - let mut finished = false; - while !finished { - let headers = std::mem::take(&mut block_headers); - let block_headers_iter = headers.into_iter().skip(1); - (finished, sync_head_found) = block_sync_state - .process_incoming_headers( - block_headers_iter, - sync_head, - sync_head_found, - self.blockchain.clone(), - self.peers.clone(), - self.cancel_token.clone(), - ) - .await?; + store.clear_fullsync_headers().await?; + Ok(()) + } + + async fn add_blocks_in_batch( + &self, + blocks: Vec, + final_batch: bool, + store: Store, + ) -> Result<(), SyncError> { + let execution_start = Instant::now(); + // Copy some values for later + let blocks_len = blocks.len(); + let numbers_and_hashes = blocks + .iter() + .map(|b| (b.header.number, b.hash())) + .collect::>(); + let (last_block_number, last_block_hash) = numbers_and_hashes + .last() + .cloned() + .ok_or(SyncError::InvalidRangeReceived)?; + let (first_block_number, first_block_hash) = numbers_and_hashes + .first() + .cloned() + .ok_or(SyncError::InvalidRangeReceived)?; + + let blocks_hashes = blocks.iter().map(|block| block.hash()).collect::>(); + // Run the batch + if let Err((err, batch_failure)) = Syncer::add_blocks( + self.blockchain.clone(), + blocks, + final_batch, + self.cancel_token.clone(), + ) + .await + { + if let Some(batch_failure) = batch_failure { + warn!("Failed to add block during FullSync: {err}"); + // Since running the batch failed we set the failing block and it's descendants with having an invalid ancestor on the following cases. + if let ChainError::InvalidBlock(_) = err { + let mut block_hashes_with_invalid_ancestor: Vec = vec![]; + if let Some(index) = blocks_hashes + .iter() + .position(|x| x == &batch_failure.failed_block_hash) + { + block_hashes_with_invalid_ancestor = blocks_hashes[index..].to_vec(); + } + + for hash in block_hashes_with_invalid_ancestor { + store + .set_latest_valid_ancestor(hash, batch_failure.last_valid_hash) + .await?; + } } } - - if sync_head_found { - break; - }; + return Err(err.into()); } + + store + .forkchoice_update( + Some(numbers_and_hashes), + last_block_number, + last_block_hash, + None, + None, + ) + .await?; + + let execution_time: f64 = execution_start.elapsed().as_millis() as f64 / 1000.0; + let blocks_per_second = blocks_len as f64 / execution_time; + + info!( + "[SYNCING] Executed & stored {} blocks in {:.3} seconds.\n\ + Started at block with hash {} (number {}).\n\ + Finished at block with hash {} (number {}).\n\ + Blocks per second: {:.3}", + blocks_len, + execution_time, + first_block_hash, + first_block_number, + last_block_hash, + last_block_number, + blocks_per_second + ); Ok(()) } @@ -527,13 +615,6 @@ async fn store_receipts( Ok(()) } -/// Persisted State during the Block Sync phase -#[derive(Clone)] -pub enum BlockSyncState { - Full(FullBlockSyncState), - Snap(SnapBlockSyncState), -} - /// Persisted State during the Block Sync phase for SnapSync #[derive(Clone)] pub struct SnapBlockSyncState { @@ -541,225 +622,6 @@ pub struct SnapBlockSyncState { store: Store, } -/// Persisted State during the Block Sync phase for FullSync -#[derive(Clone)] -pub struct FullBlockSyncState { - current_headers: Vec, - current_blocks: Vec, - store: Store, -} - -impl BlockSyncState { - fn new(sync_mode: &SyncMode, store: Store) -> Self { - match sync_mode { - SyncMode::Full => BlockSyncState::Full(FullBlockSyncState::new(store)), - SyncMode::Snap => BlockSyncState::Snap(SnapBlockSyncState::new(store)), - } - } - - /// Obtain the current head from where to start or resume block sync - async fn get_current_head(&self) -> Result { - match self { - BlockSyncState::Full(state) => state.get_current_head().await, - BlockSyncState::Snap(state) => state.get_current_head().await, - } - } - - /// Converts self into a FullSync state, does nothing if self is already a FullSync state - pub async fn into_fullsync(self) -> Result { - // Switch from Snap to Full sync and vice versa - let state = match self { - BlockSyncState::Full(state) => state, - BlockSyncState::Snap(state) => state.into_fullsync().await?, - }; - Ok(Self::Full(state)) - } -} - -impl FullBlockSyncState { - fn new(store: Store) -> Self { - Self { - store, - current_headers: Vec::new(), - current_blocks: Vec::new(), - } - } - - /// Obtain the current head from where to start or resume block sync - async fn get_current_head(&self) -> Result { - self.store - .get_latest_canonical_block_hash() - .await? - .ok_or(SyncError::NoLatestCanonical) - } - - /// Saves incoming headers, requests as many block bodies as needed to complete - /// an execution batch and executes it. - /// An incomplete batch may be executed if the sync_head was already found - /// Returns bool finish to know whether the amount of block headers was less than MAX_BLOCK_BODIES_TO_REQUEST - /// to determine if there's still more blocks to download. - /// Returns bool sync_head_found to know whether full sync was completed. - async fn process_incoming_headers( - &mut self, - block_headers: impl Iterator, - sync_head: H256, - sync_head_found_in_block_headers: bool, - blockchain: Arc, - mut peers: PeerHandler, - cancel_token: CancellationToken, - ) -> Result<(bool, bool), SyncError> { - info!("Processing incoming headers full sync"); - self.current_headers.extend(block_headers); - - let mut sync_head_found = sync_head_found_in_block_headers; - let finished = self.current_headers.len() <= MAX_BLOCK_BODIES_TO_REQUEST; - // if self.current_headers.len() < *EXECUTE_BATCH_SIZE && !sync_head_found { - // // We don't have enough headers to fill up a batch, lets request more - // return Ok(()); - // } - // If we have enough headers to fill execution batches, request the matching bodies - // while self.current_headers.len() >= *EXECUTE_BATCH_SIZE - // || !self.current_headers.is_empty() && sync_head_found - // { - // Download block bodies - let headers = - &self.current_headers[..min(MAX_BLOCK_BODIES_TO_REQUEST, self.current_headers.len())]; - let bodies = peers - .request_and_validate_block_bodies(headers) - .await? - .ok_or(SyncError::BodiesNotFound)?; - debug!("Obtained: {} block bodies", bodies.len()); - let blocks = self - .current_headers - .drain(..bodies.len()) - .zip(bodies) - .map(|(header, body)| Block { header, body }); - self.current_blocks.extend(blocks); - // } - - // If we have the sync_head as a pending block from a new_payload request and its parent_hash matches the hash of the latest received header - // we set the sync_head as found. Then we add it in current_blocks for execution. - if let Some(block) = self.store.get_pending_block(sync_head).await? - && self - .current_blocks - .last() - .is_some_and(|last_block| last_block.hash() == block.header.parent_hash) - { - self.current_blocks.push(block); - sync_head_found = true; - } - // Execute full blocks - // while self.current_blocks.len() >= *EXECUTE_BATCH_SIZE - // || (!self.current_blocks.is_empty() && sync_head_found) - // { - // Now that we have a full batch, we can execute and store the blocks in batch - - info!( - "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", - self.current_blocks.len(), - self.current_blocks - .first() - .ok_or(SyncError::NoBlocks)? - .hash(), - self.current_blocks - .last() - .ok_or(SyncError::NoBlocks)? - .hash() - ); - let execution_start = Instant::now(); - let block_batch: Vec = self - .current_blocks - .drain(..min(*EXECUTE_BATCH_SIZE, self.current_blocks.len())) - .collect(); - // Copy some values for later - let blocks_len = block_batch.len(); - let numbers_and_hashes = block_batch - .iter() - .map(|b| (b.header.number, b.hash())) - .collect::>(); - let (last_block_number, last_block_hash) = numbers_and_hashes - .last() - .cloned() - .ok_or(SyncError::InvalidRangeReceived)?; - let (first_block_number, first_block_hash) = numbers_and_hashes - .first() - .cloned() - .ok_or(SyncError::InvalidRangeReceived)?; - - let block_batch_hashes = block_batch - .iter() - .map(|block| block.hash()) - .collect::>(); - - // Run the batch - if let Err((err, batch_failure)) = Syncer::add_blocks( - blockchain.clone(), - block_batch, - sync_head_found, - cancel_token.clone(), - ) - .await - { - if let Some(batch_failure) = batch_failure { - let failed_block_hash = batch_failure.failed_block_hash; - warn!(%err, block=%failed_block_hash, "Failed to add block during FullSync"); - // Since running the batch failed we set the failing block and it's descendants with having an invalid ancestor on the following cases. - if let ChainError::InvalidBlock(_) = err { - let mut block_hashes_with_invalid_ancestor: Vec = vec![]; - if let Some(index) = block_batch_hashes - .iter() - .position(|x| x == &failed_block_hash) - { - block_hashes_with_invalid_ancestor = block_batch_hashes[index..].to_vec(); - } - - for hash in block_hashes_with_invalid_ancestor { - self.store - .set_latest_valid_ancestor(hash, batch_failure.last_valid_hash) - .await?; - } - // We also set with having an invalid ancestor all the hashes remaining which are descendants as well. - for header in &self.current_headers { - self.store - .set_latest_valid_ancestor(header.hash(), batch_failure.last_valid_hash) - .await?; - } - } - } - return Err(err.into()); - } - - self.store - .forkchoice_update( - Some(numbers_and_hashes), - last_block_number, - last_block_hash, - None, - None, - ) - .await?; - - let execution_time: f64 = execution_start.elapsed().as_millis() as f64 / 1000.0; - let blocks_per_second = blocks_len as f64 / execution_time; - - info!( - "[SYNCING] Executed & stored {} blocks in {:.3} seconds.\n\ - Started at block with hash {} (number {}).\n\ - Finished at block with hash {} (number {}).\n\ - Blocks per second: {:.3}", - blocks_len, - execution_time, - first_block_hash, - first_block_number, - last_block_hash, - last_block_number, - blocks_per_second - ); - // } - Ok((finished, sync_head_found)) - } -} - impl SnapBlockSyncState { fn new(store: Store) -> Self { Self { @@ -800,48 +662,22 @@ impl SnapBlockSyncState { self.store.add_block_headers(block_headers_vec).await?; Ok(()) } - - /// Converts self into a FullSync state. - /// Clears SnapSync checkpoints from the Store - /// In the rare case that block headers were stored in a previous iteration, these will be fetched and saved to the FullSync state for full retrieval and execution - async fn into_fullsync(self) -> Result { - // For all collected hashes we must also have the corresponding headers stored - // As this switch will only happen when the sync_head is 64 blocks away or less from our latest block - // The headers to fetch will be at most 64, and none in the most common case - let mut current_headers = Vec::new(); - for hash in self.block_hashes { - let header = self - .store - .get_block_header_by_hash(hash)? - .ok_or(SyncError::CorruptDB)?; - current_headers.push(header); - } - self.store.clear_snap_state().await?; - Ok(FullBlockSyncState { - current_headers, - current_blocks: Vec::new(), - store: self.store, - }) - } } impl Syncer { async fn snap_sync( &mut self, store: &Store, - block_sync_state: &mut BlockSyncState, + block_sync_state: &mut SnapBlockSyncState, ) -> Result<(), SyncError> { // snap-sync: launch tasks to fetch blocks and state in parallel // - Fetch each block's body and its receipt via eth p2p requests // - Fetch the pivot block's state via snap p2p requests // - Execute blocks after the pivot (like in full-sync) - let pivot_hash = match block_sync_state { - BlockSyncState::Full(_) => return Err(SyncError::NotInSnapSync), - BlockSyncState::Snap(snap_block_sync_state) => snap_block_sync_state - .block_hashes - .last() - .ok_or(SyncError::NoBlockHeaders)?, - }; + let pivot_hash = block_sync_state + .block_hashes + .last() + .ok_or(SyncError::NoBlockHeaders)?; let mut pivot_header = store .get_block_header_by_hash(*pivot_hash)? .ok_or(SyncError::CorruptDB)?; @@ -1149,16 +985,13 @@ impl Syncer { store.add_block(block).await?; - let numbers_and_hashes = match block_sync_state { - BlockSyncState::Full(_) => return Err(SyncError::NotInSnapSync), - BlockSyncState::Snap(snap_block_sync_state) => snap_block_sync_state - .block_hashes - .iter() - .rev() - .enumerate() - .map(|(i, hash)| (pivot_header.number - i as u64, *hash)) - .collect::>(), - }; + let numbers_and_hashes = block_sync_state + .block_hashes + .iter() + .rev() + .enumerate() + .map(|(i, hash)| (pivot_header.number - i as u64, *hash)) + .collect::>(); store .forkchoice_update( @@ -1215,7 +1048,7 @@ pub async fn update_pivot( block_number: u64, block_timestamp: u64, peers: &mut PeerHandler, - block_sync_state: &mut BlockSyncState, + block_sync_state: &mut SnapBlockSyncState, ) -> Result { // We multiply the estimation by 0.9 in order to account for missing slots (~9% in tesnets) let new_pivot_block_number = block_number @@ -1253,17 +1086,13 @@ pub async fn update_pivot( // Reward peer peers.peer_table.record_success(&peer_id).await?; info!("Succesfully updated pivot"); - if let BlockSyncState::Snap(sync_state) = block_sync_state { - let block_headers = peers - .request_block_headers(block_number + 1, pivot.hash()) - .await? - .ok_or(SyncError::NoBlockHeaders)?; - sync_state - .process_incoming_headers(block_headers.into_iter()) - .await?; - } else { - return Err(SyncError::NotInSnapSync); - } + let block_headers = peers + .request_block_headers(block_number + 1, pivot.hash()) + .await? + .ok_or(SyncError::NoBlockHeaders)?; + block_sync_state + .process_incoming_headers(block_headers.into_iter()) + .await?; *METRICS.sync_head_hash.lock().await = pivot.hash(); return Ok(pivot.clone()); } diff --git a/crates/networking/rpc/engine/fork_choice.rs b/crates/networking/rpc/engine/fork_choice.rs index a4c97dcdc3..81632613f7 100644 --- a/crates/networking/rpc/engine/fork_choice.rs +++ b/crates/networking/rpc/engine/fork_choice.rs @@ -201,6 +201,11 @@ async fn handle_forkchoice( .get_latest_valid_ancestor(head_block.parent_hash) .await? { + // Invalidate the child too + context + .storage + .set_latest_valid_ancestor(head_block.hash(), latest_valid_hash) + .await?; return Ok(( None, ForkChoiceResponse::from(PayloadStatus::invalid_with( diff --git a/crates/networking/rpc/engine/payload.rs b/crates/networking/rpc/engine/payload.rs index b5ab76c3a8..233a80dbd1 100644 --- a/crates/networking/rpc/engine/payload.rs +++ b/crates/networking/rpc/engine/payload.rs @@ -571,6 +571,11 @@ async fn validate_ancestors( .get_latest_valid_ancestor(block.header.parent_hash) .await? { + // Invalidate child too + context + .storage + .set_latest_valid_ancestor(block.header.hash(), latest_valid_hash) + .await?; return Ok(Some(PayloadStatus::invalid_with( latest_valid_hash, "Parent header has been previously invalidated.".into(), diff --git a/crates/storage/api.rs b/crates/storage/api.rs index 22cefb8f3e..3aa3aa447f 100644 --- a/crates/storage/api.rs +++ b/crates/storage/api.rs @@ -362,4 +362,17 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { &self, account_codes: Vec<(H256, Bytes)>, ) -> Result<(), StoreError>; + + /// Add a batch of headers downloaded during fullsync + async fn add_fullsync_batch(&self, headers: Vec) -> Result<(), StoreError>; + + /// Read a batch of headers downloaded during fullsync + async fn read_fullsync_batch( + &self, + start: BlockNumber, + limit: u64, + ) -> Result, StoreError>; + + /// Clear all headers downloaded during fullsync + async fn clear_fullsync_headers(&self) -> Result<(), StoreError>; } diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 4a63df85e7..af8f79ce6a 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -1333,6 +1333,25 @@ impl Store { ) -> Result<(), StoreError> { self.engine.write_account_code_batch(account_codes).await } + + /// Add a batch of headers downloaded during fullsync + pub async fn add_fullsync_batch(&self, headers: Vec) -> Result<(), StoreError> { + self.engine.add_fullsync_batch(headers).await + } + + /// Read a batch of headers downloaded during fullsync + pub async fn read_fullsync_batch( + &self, + start: BlockNumber, + limit: u64, + ) -> Result, StoreError> { + self.engine.read_fullsync_batch(start, limit).await + } + + /// Clear all headers downloaded during fullsync + pub async fn clear_fullsync_headers(&self) -> Result<(), StoreError> { + self.engine.clear_fullsync_headers().await + } } pub struct AccountProof { diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index 6668561abe..368ee60d96 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -47,6 +47,8 @@ pub struct StoreInner { invalid_ancestors: HashMap, // Stores current Snap State snap_state: SnapState, + // Stores fetched headers during a fullsync + fullsync_headers: HashMap, } #[derive(Default, Debug)] @@ -695,6 +697,37 @@ impl StoreEngine for Store { Ok(()) } + + async fn add_fullsync_batch(&self, headers: Vec) -> Result<(), StoreError> { + self.inner()? + .fullsync_headers + .extend(headers.into_iter().map(|h| (h.number, h))); + Ok(()) + } + + async fn read_fullsync_batch( + &self, + start: BlockNumber, + limit: u64, + ) -> Result, StoreError> { + let store = self.inner()?; + (start..start + limit) + .map(|ref n| { + store + .fullsync_headers + .get(n) + .cloned() + .ok_or(StoreError::Custom(format!( + "Missing fullsync header for block {n}" + ))) + }) + .collect::, _>>() + } + + async fn clear_fullsync_headers(&self) -> Result<(), StoreError> { + self.inner()?.fullsync_headers.clear(); + Ok(()) + } } impl Debug for Store { diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index 2564a49341..7c65e14e87 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -104,6 +104,11 @@ const CF_PENDING_BLOCKS: &str = "pending_blocks"; /// - [`Vec`] = `BlockHashRLP::from(latest_valid).bytes().clone()` const CF_INVALID_ANCESTORS: &str = "invalid_ancestors"; +/// Block headers downloaded during fullsync column family: [`u8;_`] => [`Vec`] +/// - [`u8;_`] = `block_number.to_le_bytes()` +/// - [`Vec`] = `BlockHeaderRLP::from(block.header.clone()).bytes().clone()` +const CF_FULLSYNC_HEADERS: &str = "fullsync_headers"; + #[derive(Debug)] pub struct Store { db: Arc>, @@ -166,6 +171,7 @@ impl Store { CF_TRIE_NODES, CF_PENDING_BLOCKS, CF_INVALID_ANCESTORS, + CF_FULLSYNC_HEADERS, ]; // Get existing column families to know which ones to drop later @@ -1479,6 +1485,46 @@ impl StoreEngine for Store { self.write_batch_async(batch_ops).await } + + async fn add_fullsync_batch(&self, headers: Vec) -> Result<(), StoreError> { + let mut batch_ops = Vec::new(); + + for header in headers { + let number_value = header.number.to_le_bytes().to_vec(); + let header_value = BlockHeaderRLP::from(header).bytes().clone(); + + batch_ops.push((CF_FULLSYNC_HEADERS.to_string(), number_value, header_value)); + } + + self.write_batch_async(batch_ops).await + } + + async fn read_fullsync_batch( + &self, + start: BlockNumber, + limit: u64, + ) -> Result, StoreError> { + self.read_bulk_async( + CF_FULLSYNC_HEADERS, + (start..start + limit).map(|n| n.to_le_bytes()).collect(), + |bytes| { + BlockHeaderRLP::from_bytes(bytes) + .to() + .map_err(StoreError::from) + }, + ) + .await + } + + async fn clear_fullsync_headers(&self) -> Result<(), StoreError> { + let db = self.db.clone(); + + tokio::task::spawn_blocking(move || { + db.drop_cf(CF_FULLSYNC_HEADERS).map_err(StoreError::from) + }) + .await + .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))? + } } /// Open column families diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index e17e1ac585..582e0380c7 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -33,19 +33,18 @@ async fn main() { info!(""); run_test(&cmd_path, no_reorgs_full_sync_smoke_test).await; + run_test(&cmd_path, test_reorg_back_to_base).await; - // TODO: uncomment once #4676 is fixed - // run_test(&cmd_path, test_reorg_back_to_base).await; - // // This test is flaky 50% of the time, check that it runs correctly 30 times in a row - // // TODO(#4775): make it deterministic - // for _ in 0..30 { - // run_test(&cmd_path, test_chain_split).await; - // } - // run_test(&cmd_path, test_one_block_reorg_and_back).await; - // run_test(&cmd_path, test_reorg_back_to_base_with_common_ancestor).await; - // run_test(&cmd_path, test_storage_slots_reorg).await; - - // run_test(&cmd_path, test_many_blocks_reorg).await; + // This test is flaky 50% of the time, check that it runs correctly multiple times in a row + // TODO(#4775): make it deterministic + for _ in 0..10 { + run_test(&cmd_path, test_chain_split).await; + } + + run_test(&cmd_path, test_one_block_reorg_and_back).await; + run_test(&cmd_path, test_reorg_back_to_base_with_common_ancestor).await; + run_test(&cmd_path, test_storage_slots_reorg).await; + run_test(&cmd_path, test_many_blocks_reorg).await; } async fn get_ethrex_version(cmd_path: &Path) -> String { @@ -104,7 +103,6 @@ async fn no_reorgs_full_sync_smoke_test(simulator: Arc>) { node1.update_forkchoice(&base_chain).await; } -#[expect(unused)] async fn test_reorg_back_to_base(simulator: Arc>) { let mut simulator = simulator.lock().await; @@ -124,7 +122,6 @@ async fn test_reorg_back_to_base(simulator: Arc>) { node0.update_forkchoice(&base_chain).await; } -#[expect(unused)] async fn test_reorg_back_to_base_with_common_ancestor(simulator: Arc>) { let mut simulator = simulator.lock().await; @@ -149,7 +146,6 @@ async fn test_reorg_back_to_base_with_common_ancestor(simulator: Arc>) { let mut simulator = simulator.lock().await; @@ -172,7 +168,6 @@ async fn test_chain_split(simulator: Arc>) { node0.update_forkchoice(&base_chain).await; } -#[expect(unused)] async fn test_one_block_reorg_and_back(simulator: Arc>) { let mut simulator = simulator.lock().await; let signer: Signer = LocalSigner::new( @@ -243,7 +238,6 @@ async fn test_one_block_reorg_and_back(simulator: Arc>) { assert_eq!(new_balance, initial_balance); } -#[expect(unused)] async fn test_many_blocks_reorg(simulator: Arc>) { let mut simulator = simulator.lock().await; let signer: Signer = LocalSigner::new( @@ -315,7 +309,6 @@ async fn test_many_blocks_reorg(simulator: Arc>) { assert_eq!(new_balance, initial_balance + transfer_amount); } -#[expect(unused)] async fn test_storage_slots_reorg(simulator: Arc>) { let mut simulator = simulator.lock().await; // Initcode for deploying a contract that receives two `bytes32` parameters and sets `storage[param0] = param1`