Skip to content
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
57810ee
Download all headers new to old as first step of fullsync
fmoletta Oct 8, 2025
81866ce
fix
fmoletta Oct 9, 2025
c02da60
Some fixes + debug
fmoletta Oct 13, 2025
678d29b
fix
fmoletta Oct 13, 2025
67da9ca
Invalidate child when checking for invalid parent
fmoletta Oct 13, 2025
e2a93d6
Tidy up code
fmoletta Oct 13, 2025
931de19
Avoid writing header batch to DB if we only have 1 header batch to pr…
fmoletta Oct 13, 2025
b5ce864
Add InMemory impl
fmoletta Oct 13, 2025
524055c
Wipe header table after sync is done'
fmoletta Oct 13, 2025
acca858
fix
fmoletta Oct 13, 2025
181bc04
Clippy
fmoletta Oct 14, 2025
26d5066
fix
fmoletta Oct 14, 2025
41475ab
Tidy up code
fmoletta Oct 14, 2025
f8c2936
fix(snap->full) launch full sync cycle instead of relying on BlockSta…
fmoletta Oct 14, 2025
e0845fe
Remove BlockSyncState::Full
fmoletta Oct 14, 2025
116ec2c
Remove BlockSyncState enum
fmoletta Oct 14, 2025
c9be0e6
Doc new methods
fmoletta Oct 14, 2025
dda6c75
Normalize tracing
fmoletta Oct 14, 2025
4f2eb9d
[REVERTME] more agressive logging + unwraps
fmoletta Oct 14, 2025
648970e
Fix
fmoletta Oct 14, 2025
a3cc994
fix
fmoletta Oct 14, 2025
8733ab2
fix
fmoletta Oct 14, 2025
8a70cc5
Revert "[REVERTME] more agressive logging + unwraps"
fmoletta Oct 14, 2025
90ddd87
fix
fmoletta Oct 14, 2025
12d7ba4
upgrade log
fmoletta Oct 14, 2025
1739c43
[REVERTME] debug
fmoletta Oct 15, 2025
24f4f61
Merge branch 'main' of github.com:lambdaclass/ethrex into full-sync-d…
fmoletta Oct 15, 2025
119c5cf
Fix table name
fmoletta Oct 16, 2025
9d8128d
Revert "[REVERTME] debug"
fmoletta Oct 16, 2025
339c369
Merge branch 'main' of github.com:lambdaclass/ethrex into rewind
fmoletta Oct 16, 2025
88f6513
Fix: retry if given empty block headers
fmoletta Oct 16, 2025
a23be6e
fmt
fmoletta Oct 16, 2025
593e273
Use drop_cf to clear fullsync headers table
fmoletta Oct 16, 2025
7b2575b
Merge branch 'main' of github.com:lambdaclass/ethrex into full-sync-d…
fmoletta Oct 16, 2025
b82100b
Remove limit on engine-withdrawals hive test
fmoletta Oct 16, 2025
5678b09
fix(lang)
fmoletta Oct 16, 2025
5a736d5
chore: uncomment passing reorg tests
MegaRedHand Oct 16, 2025
211bedd
Revert "Use drop_cf to clear fullsync headers table"
MegaRedHand Oct 16, 2025
2b9bcb2
refactor: check we got headers first
MegaRedHand Oct 17, 2025
b7acb61
refactor: remove unreachable error
MegaRedHand Oct 17, 2025
21e3b13
chore: fix typo in comment
MegaRedHand Oct 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr-main_l1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ jobs:
artifact_prefix: engine_paris
- name: "Engine withdrawal tests"
simulation: ethereum/engine
limit: "engine-withdrawals/Corrupted Block Hash Payload|Empty Withdrawals|engine-withdrawals test loader|GetPayloadBodies|GetPayloadV2 Block Value|Max Initcode Size|Sync after 2 blocks - Withdrawals on Genesis|Withdraw many accounts|Withdraw to a single account|Withdraw to two accounts|Withdraw zero amount|Withdraw many accounts|Withdrawals Fork on Block 1 - 1 Block Re-Org|Withdrawals Fork on Block 1 - 8 Block Re-Org NewPayload|Withdrawals Fork on Block 2|Withdrawals Fork on Block 3|Withdrawals Fork on Block 8 - 10 Block Re-Org NewPayload|Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org [^S]|Withdrawals Fork on Canonical Block 8 / Side Block 9 - 10 Block Re-Org [^S]"
limit: "engine-withdrawals"
artifact_prefix: engine_withdrawals
# Investigate this test
# - name: "Sync"
Expand Down
10 changes: 6 additions & 4 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
},
},
snap::encodable_to_proof,
sync::{AccountStorageRoots, BlockSyncState, block_is_stale, update_pivot},
sync::{AccountStorageRoots, SnapBlockSyncState, block_is_stale, update_pivot},
utils::{
AccountsWithStorage, dump_accounts_to_file, dump_storages_to_file,
get_account_state_snapshot_file, get_account_storages_snapshot_file,
Expand Down Expand Up @@ -469,11 +469,13 @@ impl PeerHandler {
)
.await
{
if are_block_headers_chained(&block_headers, &order) {
if !block_headers.is_empty()
&& are_block_headers_chained(&block_headers, &order)
Comment on lines +472 to +473
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a change in behavior from later versions we noticed. Before, empty responses were never forwarded to the user, but somewhere down the line it changed, producing errors that were previously unreachable.

{
return Ok(Some(block_headers));
} else {
warn!(
"[SYNCING] Received invalid headers from peer, penalizing peer {peer_id}"
"[SYNCING] Received empty/invalid headers from peer, penalizing peer {peer_id}"
);
}
}
Expand Down Expand Up @@ -678,7 +680,7 @@ impl PeerHandler {
limit: H256,
account_state_snapshots_dir: &Path,
pivot_header: &mut BlockHeader,
block_sync_state: &mut BlockSyncState,
block_sync_state: &mut SnapBlockSyncState,
) -> Result<(), PeerHandlerError> {
METRICS
.current_step
Expand Down
613 changes: 221 additions & 392 deletions crates/networking/p2p/sync.rs

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions crates/networking/rpc/engine/fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ async fn handle_forkchoice(
.get_latest_valid_ancestor(head_block.parent_hash)
.await?
{
// Invalidate the child too
context
.storage
.set_latest_valid_ancestor(head_block.hash(), latest_valid_hash)
.await?;
return Ok((
None,
ForkChoiceResponse::from(PayloadStatus::invalid_with(
Expand Down
5 changes: 5 additions & 0 deletions crates/networking/rpc/engine/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,11 @@ async fn validate_ancestors(
.get_latest_valid_ancestor(block.header.parent_hash)
.await?
{
// Invalidate child too
context
.storage
.set_latest_valid_ancestor(block.header.hash(), latest_valid_hash)
.await?;
return Ok(Some(PayloadStatus::invalid_with(
latest_valid_hash,
"Parent header has been previously invalidated.".into(),
Expand Down
13 changes: 13 additions & 0 deletions crates/storage/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,4 +362,17 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {
&self,
account_codes: Vec<(H256, Bytes)>,
) -> Result<(), StoreError>;

/// Add a batch of headers downloaded during fullsync
async fn add_fullsync_batch(&self, headers: Vec<BlockHeader>) -> Result<(), StoreError>;

/// Read a batch of headers downloaded during fullsync
async fn read_fullsync_batch(
&self,
start: BlockNumber,
limit: u64,
) -> Result<Vec<BlockHeader>, StoreError>;

/// Clear all headers downloaded during fullsync
async fn clear_fullsync_headers(&self) -> Result<(), StoreError>;
}
19 changes: 19 additions & 0 deletions crates/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,25 @@ impl Store {
) -> Result<(), StoreError> {
self.engine.write_account_code_batch(account_codes).await
}

/// Add a batch of headers downloaded during fullsync
pub async fn add_fullsync_batch(&self, headers: Vec<BlockHeader>) -> Result<(), StoreError> {
self.engine.add_fullsync_batch(headers).await
}

/// Read a batch of headers downloaded during fullsync
pub async fn read_fullsync_batch(
&self,
start: BlockNumber,
limit: u64,
) -> Result<Vec<BlockHeader>, StoreError> {
self.engine.read_fullsync_batch(start, limit).await
}

/// Clear all headers downloaded during fullsync
pub async fn clear_fullsync_headers(&self) -> Result<(), StoreError> {
self.engine.clear_fullsync_headers().await
}
}

pub struct AccountProof {
Expand Down
33 changes: 33 additions & 0 deletions crates/storage/store_db/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub struct StoreInner {
invalid_ancestors: HashMap<BlockHash, BlockHash>,
// Stores current Snap State
snap_state: SnapState,
// Stores fetched headers during a fullsync
fullsync_headers: HashMap<BlockNumber, BlockHeader>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is expected to have less than < 10000~ it might be better to use a btree too here

}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -695,6 +697,37 @@ impl StoreEngine for Store {

Ok(())
}

async fn add_fullsync_batch(&self, headers: Vec<BlockHeader>) -> Result<(), StoreError> {
self.inner()?
.fullsync_headers
.extend(headers.into_iter().map(|h| (h.number, h)));
Ok(())
}

async fn read_fullsync_batch(
&self,
start: BlockNumber,
limit: u64,
) -> Result<Vec<BlockHeader>, StoreError> {
let store = self.inner()?;
(start..start + limit)
.map(|ref n| {
store
.fullsync_headers
.get(n)
.cloned()
.ok_or(StoreError::Custom(format!(
"Missing fullsync header for block {n}"
)))
})
.collect::<Result<Vec<_>, _>>()
}

async fn clear_fullsync_headers(&self) -> Result<(), StoreError> {
self.inner()?.fullsync_headers.clear();
Ok(())
}
}

impl Debug for Store {
Expand Down
58 changes: 58 additions & 0 deletions crates/storage/store_db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ const CF_PENDING_BLOCKS: &str = "pending_blocks";
/// - [`Vec<u8>`] = `BlockHashRLP::from(latest_valid).bytes().clone()`
const CF_INVALID_ANCESTORS: &str = "invalid_ancestors";

/// Block headers downloaded during fullsync column family: [`u8;_`] => [`Vec<u8>`]
/// - [`u8;_`] = `block_number.to_le_bytes()`
/// - [`Vec<u8>`] = `BlockHeaderRLP::from(block.header.clone()).bytes().clone()`
const CF_FULLSYNC_HEADERS: &str = "fullsync_headers";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be merged with the main headers table. I opened #4903 to check that.


#[derive(Debug)]
pub struct Store {
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
Expand Down Expand Up @@ -166,6 +171,7 @@ impl Store {
CF_TRIE_NODES,
CF_PENDING_BLOCKS,
CF_INVALID_ANCESTORS,
CF_FULLSYNC_HEADERS,
];

// Get existing column families to know which ones to drop later
Expand Down Expand Up @@ -1479,6 +1485,58 @@ impl StoreEngine for Store {

self.write_batch_async(batch_ops).await
}

async fn add_fullsync_batch(&self, headers: Vec<BlockHeader>) -> Result<(), StoreError> {
let mut batch_ops = Vec::new();

for header in headers {
let number_value = header.number.to_le_bytes().to_vec();
let header_value = BlockHeaderRLP::from(header).bytes().clone();

batch_ops.push((CF_FULLSYNC_HEADERS.to_string(), number_value, header_value));
}

self.write_batch_async(batch_ops).await
}

async fn read_fullsync_batch(
&self,
start: BlockNumber,
limit: u64,
) -> Result<Vec<BlockHeader>, StoreError> {
self.read_bulk_async(
CF_FULLSYNC_HEADERS,
(start..start + limit).map(|n| n.to_le_bytes()).collect(),
|bytes| {
BlockHeaderRLP::from_bytes(bytes)
.to()
.map_err(StoreError::from)
},
)
.await
}

async fn clear_fullsync_headers(&self) -> Result<(), StoreError> {
let db = self.db.clone();

tokio::task::spawn_blocking(move || {
let cf = db
.cf_handle(CF_FULLSYNC_HEADERS)
.ok_or_else(|| StoreError::Custom("Column family not found".to_string()))?;

let mut iter = db.iterator_cf(&cf, rocksdb::IteratorMode::Start);
let mut batch = WriteBatchWithTransaction::default();

while let Some(Ok((key, _))) = iter.next() {
batch.delete_cf(&cf, key);
}

db.write(batch)
.map_err(|e| StoreError::Custom(format!("RocksDB batch write error: {}", e)))
})
.await
.map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
}
}

/// Open column families
Expand Down
29 changes: 11 additions & 18 deletions tooling/reorgs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,18 @@ async fn main() {
info!("");

run_test(&cmd_path, no_reorgs_full_sync_smoke_test).await;
run_test(&cmd_path, test_reorg_back_to_base).await;

// TODO: uncomment once #4676 is fixed
// run_test(&cmd_path, test_reorg_back_to_base).await;
// // This test is flaky 50% of the time, check that it runs correctly 30 times in a row
// // TODO(#4775): make it deterministic
// for _ in 0..30 {
// run_test(&cmd_path, test_chain_split).await;
// }
// run_test(&cmd_path, test_one_block_reorg_and_back).await;
// run_test(&cmd_path, test_reorg_back_to_base_with_common_ancestor).await;
// run_test(&cmd_path, test_storage_slots_reorg).await;

// run_test(&cmd_path, test_many_blocks_reorg).await;
// This test is flaky 50% of the time, check that it runs correctly multiple times in a row
// TODO(#4775): make it deterministic
for _ in 0..10 {
run_test(&cmd_path, test_chain_split).await;
}

run_test(&cmd_path, test_one_block_reorg_and_back).await;
run_test(&cmd_path, test_reorg_back_to_base_with_common_ancestor).await;
run_test(&cmd_path, test_storage_slots_reorg).await;
run_test(&cmd_path, test_many_blocks_reorg).await;
}

async fn get_ethrex_version(cmd_path: &Path) -> String {
Expand Down Expand Up @@ -104,7 +103,6 @@ async fn no_reorgs_full_sync_smoke_test(simulator: Arc<Mutex<Simulator>>) {
node1.update_forkchoice(&base_chain).await;
}

#[expect(unused)]
async fn test_reorg_back_to_base(simulator: Arc<Mutex<Simulator>>) {
let mut simulator = simulator.lock().await;

Expand All @@ -124,7 +122,6 @@ async fn test_reorg_back_to_base(simulator: Arc<Mutex<Simulator>>) {
node0.update_forkchoice(&base_chain).await;
}

#[expect(unused)]
async fn test_reorg_back_to_base_with_common_ancestor(simulator: Arc<Mutex<Simulator>>) {
let mut simulator = simulator.lock().await;

Expand All @@ -149,7 +146,6 @@ async fn test_reorg_back_to_base_with_common_ancestor(simulator: Arc<Mutex<Simul
node0.update_forkchoice(&base_chain).await;
}

#[expect(unused)]
async fn test_chain_split(simulator: Arc<Mutex<Simulator>>) {
let mut simulator = simulator.lock().await;

Expand All @@ -172,7 +168,6 @@ async fn test_chain_split(simulator: Arc<Mutex<Simulator>>) {
node0.update_forkchoice(&base_chain).await;
}

#[expect(unused)]
async fn test_one_block_reorg_and_back(simulator: Arc<Mutex<Simulator>>) {
let mut simulator = simulator.lock().await;
let signer: Signer = LocalSigner::new(
Expand Down Expand Up @@ -243,7 +238,6 @@ async fn test_one_block_reorg_and_back(simulator: Arc<Mutex<Simulator>>) {
assert_eq!(new_balance, initial_balance);
}

#[expect(unused)]
async fn test_many_blocks_reorg(simulator: Arc<Mutex<Simulator>>) {
let mut simulator = simulator.lock().await;
let signer: Signer = LocalSigner::new(
Expand Down Expand Up @@ -315,7 +309,6 @@ async fn test_many_blocks_reorg(simulator: Arc<Mutex<Simulator>>) {
assert_eq!(new_balance, initial_balance + transfer_amount);
}

#[expect(unused)]
async fn test_storage_slots_reorg(simulator: Arc<Mutex<Simulator>>) {
let mut simulator = simulator.lock().await;
// Initcode for deploying a contract that receives two `bytes32` parameters and sets `storage[param0] = param1`
Expand Down
Loading