Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
c1e5cc2
feat: chain orchestrator
frisitano Jul 2, 2025
bbe0e8f
Merge branch 'main' into feat/chain-orchestrator
frisitano Jul 2, 2025
4032fa5
lint
frisitano Jul 2, 2025
39802e2
lint
frisitano Jul 2, 2025
c84dc0d
docs lint
frisitano Jul 2, 2025
ea3140c
fix: clone the in-memory chain instead of taking it
frisitano Jul 3, 2025
a28a5b2
Merge branch 'main' into feat/chain-orchestrator
frisitano Jul 3, 2025
fa6952c
update chain consolidation and l1 message validation
frisitano Jul 4, 2025
43cc2be
add missing network block error
frisitano Jul 7, 2025
5dfc813
update fork sync / reconcilliation to use network as opossed to database
frisitano Jul 8, 2025
a50558d
Merge branch 'main' into feat/chain-orchestrator
frisitano Jul 10, 2025
19458f7
fix merege
frisitano Jul 10, 2025
4578d49
refactor and add test cases
frisitano Jul 16, 2025
e6248f5
cleanup
frisitano Jul 16, 2025
626e29b
Merge branch 'main' into feat/chain-orchestrator
frisitano Jul 16, 2025
7a6ebfc
rename chain orchestrator crate
frisitano Jul 16, 2025
7d35185
cleanup
frisitano Jul 17, 2025
74fd7b8
Merge branch 'main' into feat/chain-orchestrator
frisitano Jul 18, 2025
fe75ed5
remove expect for sequencer in rnm
frisitano Jul 21, 2025
f3bab0d
sync test
frisitano Jul 21, 2025
56a0ff3
sync test
frisitano Jul 21, 2025
1320f14
sync test
frisitano Jul 21, 2025
0fa87fe
add error handling for missing paylaod id
frisitano Jul 21, 2025
cecbd03
remove networking for L1 consolidation sync test
frisitano Jul 21, 2025
af4bac9
Merge branch 'main' into feat/chain-orchestrator
frisitano Jul 29, 2025
7df1a59
improve test coverage and fix bugs
frisitano Jul 30, 2025
fa51172
lint
frisitano Jul 30, 2025
e86446d
fix cli NetworkArgs
frisitano Jul 30, 2025
9cdbdf3
add block gap to reorg integration test
frisitano Jul 31, 2025
01a10cf
Merge branch 'main' into feat/chain-orchestrator
frisitano Jul 31, 2025
2f851b1
make test more robust
frisitano Jul 31, 2025
9f88b5d
Merge branch 'main' into feat/chain-orchestrator
frisitano Aug 6, 2025
9a3339a
merge upstream
frisitano Aug 7, 2025
000ade7
refactor
frisitano Aug 7, 2025
00d18f2
address comments
frisitano Aug 11, 2025
2beb7e0
Merge branch 'main' into feat/chain-orchestrator
frisitano Aug 11, 2025
add9933
address comments
frisitano Aug 20, 2025
e3d5453
Merge branch 'main' into feat/chain-orchestrator
frisitano Aug 20, 2025
4383231
merge changes
frisitano Aug 20, 2025
1c24765
Merge branch 'main' into feat/chain-orchestrator
frisitano Aug 25, 2025
5d9aefb
add migration script
frisitano Aug 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .codespellrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[codespell]
skip = .git,target,Cargo.toml,Cargo.lock
skip = .git,target,Cargo.toml,Cargo.lock,docker-compose
ignore-words-list = crate
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 19 additions & 35 deletions crates/database/db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,13 @@ mod test {
db.insert_l1_message(l1_message_2.clone()).await.unwrap();

// collect the L1Messages
let l1_messages =
db.get_l1_messages().await.unwrap().map(|res| res.unwrap()).collect::<Vec<_>>().await;
let l1_messages = db
.get_l1_messages(None)
.await
.unwrap()
.map(|res| res.unwrap())
.collect::<Vec<_>>()
.await;

// Apply the assertions.
assert!(l1_messages.contains(&l1_message_1));
Expand Down Expand Up @@ -422,9 +427,10 @@ mod test {
rand::rng().fill(bytes.as_mut_slice());
let mut u = Unstructured::new(&bytes);

// Initially should return None
let latest_safe = db.get_latest_safe_l2_info().await.unwrap();
assert!(latest_safe.is_none());
// Initially should return the genesis block and hash.
let (latest_safe_block, batch) = db.get_latest_safe_l2_info().await.unwrap().unwrap();
assert_eq!(latest_safe_block.number, 0);
assert_eq!(batch.index, 0);

// Generate and insert a batch
let batch_data = BatchCommitData { index: 100, ..Arbitrary::arbitrary(&mut u).unwrap() };
Expand Down Expand Up @@ -463,32 +469,6 @@ mod test {
assert_eq!(latest_safe, Some((safe_block_2, batch_info)));
}

#[tokio::test]
async fn test_get_latest_l2_block() {
// Set up the test database.
let db = setup_test_db().await;

// Generate unstructured bytes.
let mut bytes = [0u8; 1024];
rand::rng().fill(bytes.as_mut_slice());
let mut u = Unstructured::new(&bytes);

// Insert multiple blocks with increasing block numbers
let mut latest_block = BlockInfo { number: 0, hash: B256::ZERO };
for i in 300..305 {
let block_info = BlockInfo { number: i, hash: B256::arbitrary(&mut u).unwrap() };
latest_block = block_info;

db.insert_block(L2BlockInfoWithL1Messages { block_info, l1_messages: vec![] }, None)
.await
.unwrap();
}

// Should return the block with highest number
let retrieved_latest = db.get_latest_l2_block().await.unwrap();
assert_eq!(retrieved_latest, Some(latest_block));
}

#[tokio::test]
async fn test_delete_l2_blocks_gt_block_number() {
// Set up the test database.
Expand All @@ -499,13 +479,17 @@ mod test {
rand::rng().fill(bytes.as_mut_slice());
let mut u = Unstructured::new(&bytes);

// Insert multiple L2 blocks
// Insert multiple L2 blocks with batch info
let batch_info = BatchInfo { index: 0, hash: B256::default() };
for i in 400..410 {
let block_info = BlockInfo { number: i, hash: B256::arbitrary(&mut u).unwrap() };

db.insert_block(L2BlockInfoWithL1Messages { block_info, l1_messages: vec![] }, None)
.await
.unwrap();
db.insert_block(
L2BlockInfoWithL1Messages { block_info, l1_messages: vec![] },
Some(batch_info),
)
.await
.unwrap();
}

// Delete blocks with number > 405
Expand Down
3 changes: 2 additions & 1 deletion crates/database/db/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::L1MessageStart;
use alloy_eips::BlockId;
use alloy_primitives::B256;

Expand All @@ -15,5 +16,5 @@ pub enum DatabaseError {
BlockNotFound(BlockId),
/// The L1 message was not found in database.
#[error("L1 message at index [{0}] not found in database")]
L1MessageNotFound(u64),
L1MessageNotFound(L1MessageStart),
}
2 changes: 1 addition & 1 deletion crates/database/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod models;
pub use models::*;

mod operations;
pub use operations::{DatabaseOperations, UnwindResult};
pub use operations::{DatabaseOperations, L1MessageStart, UnwindResult};

mod transaction;
pub use transaction::DatabaseTransaction;
Expand Down
132 changes: 104 additions & 28 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, EntityTrait, QueryFilter, QueryOrder, QuerySelect,
Set,
};
use std::fmt;

/// The [`DatabaseOperations`] trait provides methods for interacting with the database.
#[async_trait::async_trait]
Expand Down Expand Up @@ -243,12 +244,30 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.map(Into::into))?)
}

/// Gets an iterator over all [`L1MessageEnvelope`]s in the database.
/// Get an iterator over all [`L1MessageEnvelope`]s in the database starting from the provided
/// `start` point.
async fn get_l1_messages<'a>(
&'a self,
start: Option<L1MessageStart>,
) -> Result<impl Stream<Item = Result<L1MessageEnvelope, DatabaseError>> + 'a, DatabaseError>
{
let queue_index = match start {
Some(L1MessageStart::Index(i)) => i,
Some(L1MessageStart::Hash(ref h)) => {
// Lookup message by hash
let record = models::l1_message::Entity::find()
.filter(models::l1_message::Column::Hash.eq(h.to_vec()))
.one(self.get_connection())
.await?
.ok_or_else(|| DatabaseError::L1MessageNotFound(L1MessageStart::Hash(*h)))?;

record.queue_index as u64
}
None => 0,
};

Ok(models::l1_message::Entity::find()
.filter(models::l1_message::Column::QueueIndex.gte(queue_index))
.stream(self.get_connection())
.await?
.map(|res| Ok(res.map(Into::into)?)))
Expand All @@ -266,6 +285,24 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.map(Into::into))?)
}

/// Get the [`BlockInfo`] and optional [`BatchInfo`] for the provided block hash.
async fn get_l2_block_and_batch_info_by_hash(
&self,
block_hash: B256,
) -> Result<Option<(BlockInfo, Option<BatchInfo>)>, DatabaseError> {
tracing::trace!(target: "scroll::db", ?block_hash, "Fetching L2 block and batch info by hash from database.");
Ok(models::l2_block::Entity::find()
.filter(models::l2_block::Column::BlockHash.eq(block_hash.to_vec()))
.one(self.get_connection())
.await
.map(|x| {
x.map(|x| {
let (block_info, batch_info): (BlockInfo, Option<BatchInfo>) = x.into();
(block_info, batch_info)
})
})?)
}

/// Get a [`BlockInfo`] from the database by its block number.
async fn get_l2_block_info_by_number(
&self,
Expand Down Expand Up @@ -304,14 +341,16 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
})?)
}

/// Get the latest L2 [`BlockInfo`] from the database.
async fn get_latest_l2_block(&self) -> Result<Option<BlockInfo>, DatabaseError> {
tracing::trace!(target: "scroll::db", "Fetching latest L2 block from database.");
/// Get an iterator over all L2 blocks in the database starting from the most recent one.
async fn get_l2_blocks<'a>(
&'a self,
) -> Result<impl Stream<Item = Result<BlockInfo, DatabaseError>> + 'a, DatabaseError> {
tracing::trace!(target: "scroll::db", "Fetching L2 blocks from database.");
Ok(models::l2_block::Entity::find()
.order_by_desc(models::l2_block::Column::BlockNumber)
.one(self.get_connection())
.await
.map(|x| x.map(|x| x.block_info()))?)
.stream(self.get_connection())
.await?
.map(|res| Ok(res.map(|res| res.block_info())?)))
}

/// Prepare the database on startup and return metadata used for other components in the
Expand Down Expand Up @@ -382,33 +421,49 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.rows_affected)?)
}

/// Insert multiple blocks into the database.
async fn insert_blocks(
&self,
blocks: Vec<L2BlockInfoWithL1Messages>,
batch_info: Option<BatchInfo>,
) -> Result<(), DatabaseError> {
for block in blocks {
self.insert_block(block, batch_info).await?;
}
Ok(())
}

/// Insert a new block in the database.
async fn insert_block(
&self,
block_info: L2BlockInfoWithL1Messages,
batch_info: Option<BatchInfo>,
) -> Result<(), DatabaseError> {
tracing::trace!(
target: "scroll::db",
batch_hash = ?batch_info.as_ref().map(|b| b.hash),
batch_index = batch_info.as_ref().map(|b| b.index),
block_number = block_info.block_info.number,
block_hash = ?block_info.block_info.hash,
"Inserting block into database."
);
let l2_block: models::l2_block::ActiveModel = (block_info.block_info, batch_info).into();
models::l2_block::Entity::insert(l2_block)
.on_conflict(
OnConflict::column(models::l2_block::Column::BlockNumber)
.update_columns([
models::l2_block::Column::BlockHash,
models::l2_block::Column::BatchHash,
models::l2_block::Column::BatchIndex,
])
.to_owned(),
)
.exec(self.get_connection())
.await?;
// We only insert safe blocks into the database, we do not persist unsafe blocks.
if batch_info.is_some() {
tracing::trace!(
target: "scroll::db",
batch_hash = ?batch_info.as_ref().map(|b| b.hash),
batch_index = batch_info.as_ref().map(|b| b.index),
block_number = block_info.block_info.number,
block_hash = ?block_info.block_info.hash,
"Inserting block into database."
);
let l2_block: models::l2_block::ActiveModel =
(block_info.block_info, batch_info).into();
models::l2_block::Entity::insert(l2_block)
.on_conflict(
OnConflict::column(models::l2_block::Column::BlockNumber)
.update_columns([
models::l2_block::Column::BlockHash,
models::l2_block::Column::BatchHash,
models::l2_block::Column::BatchIndex,
])
.to_owned(),
)
.exec(self.get_connection())
.await?;
}

tracing::trace!(
target: "scroll::db",
Expand Down Expand Up @@ -517,6 +572,27 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
}
}

/// This type defines the start of an L1 message stream.
///
/// It can either be an index, which is the queue index of the first message to return, or a hash,
/// which is the hash of the first message to return.
#[derive(Debug, Clone)]
pub enum L1MessageStart {
/// Start from the provided queue index.
Index(u64),
/// Start from the provided queue hash.
Hash(B256),
}

impl fmt::Display for L1MessageStart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Index(index) => write!(f, "Index({index})"),
Self::Hash(hash) => write!(f, "Hash({hash:#x})"),
}
}
}

/// The result of [`DatabaseOperations::unwind`].
#[derive(Debug)]
pub struct UnwindResult {
Expand Down
1 change: 1 addition & 0 deletions crates/database/migration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ reth-chainspec.workspace = true
sea-orm = { workspace = true, features = ["sqlx-sqlite", "runtime-tokio-native-tls", "macros"] }
sha2 = "0.10.9"
tracing.workspace = true
reth-scroll-chainspec.workspace = true

[dependencies.sea-orm-migration]
version = "1.1.0"
Expand Down
2 changes: 1 addition & 1 deletion crates/database/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl<MI: MigrationInfo + Send + Sync + 'static> MigratorTrait for Migrator<MI> {
Box::new(m20250304_125946_add_l1_msg_table::Migration),
Box::new(m20250408_132123_add_header_metadata::Migration),
Box::new(m20250408_150338_load_header_metadata::Migration::<MI>(Default::default())),
Box::new(m20250411_072004_add_l2_block::Migration),
Box::new(m20250411_072004_add_l2_block::Migration::<MI>(Default::default())),
Box::new(m20250616_223947_add_metadata::Migration),
]
}
Expand Down
Loading
Loading