Skip to content
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
57810ee
Download all headers new to old as first step of fullsync
fmoletta Oct 8, 2025
81866ce
fix
fmoletta Oct 9, 2025
c02da60
Some fixes + debug
fmoletta Oct 13, 2025
678d29b
fix
fmoletta Oct 13, 2025
67da9ca
Invalidate child when checking for invalid parent
fmoletta Oct 13, 2025
e2a93d6
Tidy up code
fmoletta Oct 13, 2025
931de19
Avoid writing header batch to DB if we only have 1 header batch to pr…
fmoletta Oct 13, 2025
b5ce864
Add InMemory impl
fmoletta Oct 13, 2025
524055c
Wipe header table after sync is done'
fmoletta Oct 13, 2025
acca858
fix
fmoletta Oct 13, 2025
181bc04
Clippy
fmoletta Oct 14, 2025
26d5066
fix
fmoletta Oct 14, 2025
41475ab
Tidy up code
fmoletta Oct 14, 2025
f8c2936
fix(snap->full) launch full sync cycle instead of relying on BlockSta…
fmoletta Oct 14, 2025
e0845fe
Remove BlockSyncState::Full
fmoletta Oct 14, 2025
116ec2c
Remove BlockSyncState enum
fmoletta Oct 14, 2025
c9be0e6
Doc new methods
fmoletta Oct 14, 2025
dda6c75
Normalize tracing
fmoletta Oct 14, 2025
4f2eb9d
[REVERTME] more agressive logging + unwraps
fmoletta Oct 14, 2025
648970e
Fix
fmoletta Oct 14, 2025
a3cc994
fix
fmoletta Oct 14, 2025
8733ab2
fix
fmoletta Oct 14, 2025
8a70cc5
Revert "[REVERTME] more agressive logging + unwraps"
fmoletta Oct 14, 2025
90ddd87
fix
fmoletta Oct 14, 2025
12d7ba4
upgrade log
fmoletta Oct 14, 2025
1739c43
[REVERTME] debug
fmoletta Oct 15, 2025
24f4f61
Merge branch 'main' of github.com:lambdaclass/ethrex into full-sync-d…
fmoletta Oct 15, 2025
119c5cf
Fix table name
fmoletta Oct 16, 2025
9d8128d
Revert "[REVERTME] debug"
fmoletta Oct 16, 2025
339c369
Merge branch 'main' of github.com:lambdaclass/ethrex into rewind
fmoletta Oct 16, 2025
88f6513
Fix: retry if given empty block headers
fmoletta Oct 16, 2025
a23be6e
fmt
fmoletta Oct 16, 2025
593e273
Use drop_cf to clear fullsync headers table
fmoletta Oct 16, 2025
7b2575b
Merge branch 'main' of github.com:lambdaclass/ethrex into full-sync-d…
fmoletta Oct 16, 2025
b82100b
Remove limit on engine-withdrawals hive test
fmoletta Oct 16, 2025
5678b09
fix(lang)
fmoletta Oct 16, 2025
5a736d5
chore: uncomment passing reorg tests
MegaRedHand Oct 16, 2025
211bedd
Revert "Use drop_cf to clear fullsync headers table"
MegaRedHand Oct 16, 2025
2b9bcb2
refactor: check we got headers first
MegaRedHand Oct 17, 2025
b7acb61
refactor: remove unreachable error
MegaRedHand Oct 17, 2025
21e3b13
chore: fix typo in comment
MegaRedHand Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
},
},
snap::encodable_to_proof,
sync::{AccountStorageRoots, BlockSyncState, block_is_stale, update_pivot},
sync::{AccountStorageRoots, SnapBlockSyncState, block_is_stale, update_pivot},
utils::{
AccountsWithStorage, dump_accounts_to_file, dump_storages_to_file,
get_account_state_snapshot_file, get_account_storages_snapshot_file,
Expand Down Expand Up @@ -469,11 +469,13 @@ impl PeerHandler {
)
.await
{
if are_block_headers_chained(&block_headers, &order) {
if are_block_headers_chained(&block_headers, &order)
&& !block_headers.is_empty()
{
return Ok(Some(block_headers));
} else {
warn!(
"[SYNCING] Received invalid headers from peer, penalizing peer {peer_id}"
"[SYNCING] Received empty/invalid headers from peer, penalizing peer {peer_id}"
);
}
}
Expand Down Expand Up @@ -678,7 +680,7 @@ impl PeerHandler {
limit: H256,
account_state_snapshots_dir: &Path,
pivot_header: &mut BlockHeader,
block_sync_state: &mut BlockSyncState,
block_sync_state: &mut SnapBlockSyncState,
) -> Result<(), PeerHandlerError> {
METRICS
.current_step
Expand Down
613 changes: 221 additions & 392 deletions crates/networking/p2p/sync.rs

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions crates/networking/rpc/engine/fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ async fn handle_forkchoice(
.get_latest_valid_ancestor(head_block.parent_hash)
.await?
{
// Invalidate the child too
context
.storage
.set_latest_valid_ancestor(head_block.hash(), latest_valid_hash)
.await?;
return Ok((
None,
ForkChoiceResponse::from(PayloadStatus::invalid_with(
Expand Down
5 changes: 5 additions & 0 deletions crates/networking/rpc/engine/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,11 @@ async fn validate_ancestors(
.get_latest_valid_ancestor(block.header.parent_hash)
.await?
{
// Invalidate child too
context
.storage
.set_latest_valid_ancestor(block.header.hash(), latest_valid_hash)
.await?;
return Ok(Some(PayloadStatus::invalid_with(
latest_valid_hash,
"Parent header has been previously invalidated.".into(),
Expand Down
13 changes: 13 additions & 0 deletions crates/storage/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,4 +362,17 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {
&self,
account_codes: Vec<(H256, Bytes)>,
) -> Result<(), StoreError>;

/// Add a batch of headers downloaded during fullsync
async fn add_fullsync_batch(&self, headers: Vec<BlockHeader>) -> Result<(), StoreError>;

/// Read a batch of headers downloaded during fullsync
async fn read_fullsync_batch(
&self,
start: BlockNumber,
limit: u64,
) -> Result<Vec<BlockHeader>, StoreError>;

/// Clear all headers downloaded during fullsync
async fn clear_fullsync_headers(&self) -> Result<(), StoreError>;
}
19 changes: 19 additions & 0 deletions crates/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,25 @@ impl Store {
) -> Result<(), StoreError> {
self.engine.write_account_code_batch(account_codes).await
}

/// Add a batch of headers downloaded during fullsync
pub async fn add_fullsync_batch(&self, headers: Vec<BlockHeader>) -> Result<(), StoreError> {
self.engine.add_fullsync_batch(headers).await
}

/// Read a batch of headers downloaded during fullsync
pub async fn read_fullsync_batch(
&self,
start: BlockNumber,
limit: u64,
) -> Result<Vec<BlockHeader>, StoreError> {
self.engine.read_fullsync_batch(start, limit).await
}

/// Clear all headers downloaded during fullsync
pub async fn clear_fullsync_headers(&self) -> Result<(), StoreError> {
self.engine.clear_fullsync_headers().await
}
}

pub struct AccountProof {
Expand Down
33 changes: 33 additions & 0 deletions crates/storage/store_db/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub struct StoreInner {
invalid_ancestors: HashMap<BlockHash, BlockHash>,
// Stores current Snap State
snap_state: SnapState,
// Stores fetched headers during a fullsync
fullsync_headers: HashMap<BlockNumber, BlockHeader>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is expected to have less than < 10000~ it might be better to use a btree too here

}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -695,6 +697,37 @@ impl StoreEngine for Store {

Ok(())
}

async fn add_fullsync_batch(&self, headers: Vec<BlockHeader>) -> Result<(), StoreError> {
self.inner()?
.fullsync_headers
.extend(headers.into_iter().map(|h| (h.number, h)));
Ok(())
}

async fn read_fullsync_batch(
&self,
start: BlockNumber,
limit: u64,
) -> Result<Vec<BlockHeader>, StoreError> {
let store = self.inner()?;
(start..start + limit)
.map(|ref n| {
store
.fullsync_headers
.get(n)
.cloned()
.ok_or(StoreError::Custom(format!(
"Missing fullsync header for block {n}"
)))
})
.collect::<Result<Vec<_>, _>>()
}

async fn clear_fullsync_headers(&self) -> Result<(), StoreError> {
self.inner()?.fullsync_headers.clear();
Ok(())
}
}

impl Debug for Store {
Expand Down
46 changes: 46 additions & 0 deletions crates/storage/store_db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ const CF_PENDING_BLOCKS: &str = "pending_blocks";
/// - [`Vec<u8>`] = `BlockHashRLP::from(latest_valid).bytes().clone()`
const CF_INVALID_ANCESTORS: &str = "invalid_ancestors";

/// Block headers downloaded during fullsync column family: [`u8;_`] => [`Vec<u8>`]
/// - [`u8;_`] = `block_number.to_le_bytes()`
/// - [`Vec<u8>`] = `BlockHeaderRLP::from(block.header.clone()).bytes().clone()`
const CF_FULLSYNC_HEADERS: &str = "fullsync_headers";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be merged with the main headers table. I opened #4903 to check that.


#[derive(Debug)]
pub struct Store {
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
Expand Down Expand Up @@ -166,6 +171,7 @@ impl Store {
CF_TRIE_NODES,
CF_PENDING_BLOCKS,
CF_INVALID_ANCESTORS,
CF_FULLSYNC_HEADERS,
];

// Get existing column families to know which ones to drop later
Expand Down Expand Up @@ -1479,6 +1485,46 @@ impl StoreEngine for Store {

self.write_batch_async(batch_ops).await
}

async fn add_fullsync_batch(&self, headers: Vec<BlockHeader>) -> Result<(), StoreError> {
let mut batch_ops = Vec::new();

for header in headers {
let number_value = header.number.to_le_bytes().to_vec();
let header_value = BlockHeaderRLP::from(header).bytes().clone();

batch_ops.push((CF_FULLSYNC_HEADERS.to_string(), number_value, header_value));
}

self.write_batch_async(batch_ops).await
}

async fn read_fullsync_batch(
&self,
start: BlockNumber,
limit: u64,
) -> Result<Vec<BlockHeader>, StoreError> {
self.read_bulk_async(
CF_FULLSYNC_HEADERS,
(start..start + limit).map(|n| n.to_le_bytes()).collect(),
|bytes| {
BlockHeaderRLP::from_bytes(bytes)
.to()
.map_err(StoreError::from)
},
)
.await
}

async fn clear_fullsync_headers(&self) -> Result<(), StoreError> {
let db = self.db.clone();

tokio::task::spawn_blocking(move || {
db.drop_cf(CF_FULLSYNC_HEADERS).map_err(StoreError::from)
})
.await
.map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
}
}

/// Open column families
Expand Down