Skip to content

Commit d6d8d97

Browse files
authored
feat: update batch processing logic (#291)
* update batch processing logic * update comment
1 parent 892a314 commit d6d8d97

File tree

10 files changed

+96
-110
lines changed

10 files changed

+96
-110
lines changed

crates/chain-orchestrator/src/event.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ pub enum ChainOrchestratorEvent {
4343
/// A batch has been finalized returning an optional finalized L2 block. Also returns a
4444
/// [`BatchInfo`] if the finalized event occurred in a finalized L1 block.
4545
BatchFinalized(Option<WithFinalizedBlockNumber<BatchInfo>>, Option<BlockInfo>),
46-
/// An L1 block has been finalized returning the L1 block number, the list of finalized batches
47-
/// and an optional finalized L2 block.
48-
L1BlockFinalized(u64, Vec<BatchInfo>, Option<BlockInfo>),
46+
/// An L1 block has been finalized returning the L1 block number and the list of finalized
47+
/// batches.
48+
L1BlockFinalized(u64, Vec<BatchInfo>),
4949
/// A `L1Message` event has been committed returning the message queue index.
5050
L1MessageCommitted(u64),
5151
/// A reorg has occurred on L1, returning the L1 block number of the new L1 head,

crates/chain-orchestrator/src/lib.rs

Lines changed: 8 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use reth_network_p2p::{BlockClient, BodiesClient};
1111
use reth_scroll_primitives::ScrollBlock;
1212
use rollup_node_primitives::{
1313
BatchCommitData, BatchInfo, BlockInfo, BoundedVec, ChainImport, L1MessageEnvelope,
14-
L2BlockInfoWithL1Messages, WithBlockNumber,
14+
L2BlockInfoWithL1Messages,
1515
};
1616
use rollup_node_watcher::L1Notification;
1717
use scroll_alloy_consensus::TxL1Message;
@@ -21,7 +21,6 @@ use scroll_db::{Database, DatabaseError, DatabaseOperations, L1MessageStart, Unw
2121
use scroll_network::NewBlockWithPeer;
2222
use std::{
2323
collections::{HashMap, VecDeque},
24-
ops::Add,
2524
pin::Pin,
2625
sync::{
2726
atomic::{AtomicU64, Ordering},
@@ -67,8 +66,6 @@ pub struct ChainOrchestrator<ChainSpec, BC, P> {
6766
pending_futures: VecDeque<ChainOrchestratorFuture>,
6867
/// The block number of the L1 finalized block.
6968
l1_finalized_block_number: Arc<AtomicU64>,
70-
/// The block number of the L2 finalized block.
71-
l2_finalized_block_number: Arc<AtomicU64>,
7269
/// The chain specification for the chain orchestrator.
7370
chain_spec: Arc<ChainSpec>,
7471
/// The metrics for the chain orchestrator.
@@ -109,7 +106,6 @@ impl<
109106
database,
110107
pending_futures: Default::default(),
111108
l1_finalized_block_number: Arc::new(AtomicU64::new(0)),
112-
l2_finalized_block_number: Arc::new(AtomicU64::new(0)),
113109
chain_spec,
114110
metrics: ChainOrchestratorItem::iter()
115111
.map(|i| {
@@ -529,7 +525,6 @@ impl<
529525
self.database.clone(),
530526
block_number,
531527
self.l1_finalized_block_number.clone(),
532-
self.l2_finalized_block_number.clone(),
533528
)),
534529
))
535530
}
@@ -551,16 +546,13 @@ impl<
551546
)),
552547
))
553548
}
554-
L1Notification::BatchFinalization { hash, index, block_number } => {
549+
L1Notification::BatchFinalization { hash: _hash, index, block_number } => {
555550
ChainOrchestratorFuture::HandleBatchFinalization(self.handle_metered(
556551
ChainOrchestratorItem::BatchFinalization,
557552
Box::pin(Self::handle_batch_finalization(
558553
self.database.clone(),
559-
hash,
560554
index,
561555
block_number,
562-
self.l1_finalized_block_number.clone(),
563-
self.l2_finalized_block_number.clone(),
564556
)),
565557
))
566558
}
@@ -615,35 +607,18 @@ impl<
615607
database: Arc<Database>,
616608
block_number: u64,
617609
l1_block_number: Arc<AtomicU64>,
618-
l2_block_number: Arc<AtomicU64>,
619610
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
620611
// Set the latest finalized L1 block in the database.
621612
database.set_latest_finalized_l1_block_number(block_number).await?;
622613

623-
// get the finalized batch infos.
624-
// we add 1 to the low finalized l1 block number to avoid fetching the last finalized batch
625-
// a second time.
626-
let low_finalized_l1_block_number =
627-
l1_block_number.load(Ordering::Relaxed).add(1).max(block_number);
628-
let finalized_batches = database
629-
.get_batches_by_finalized_block_range(low_finalized_l1_block_number, block_number)
630-
.await?;
631-
632-
// get the finalized block for the batch.
633-
let finalized_block = if let Some(info) = finalized_batches.last() {
634-
Self::fetch_highest_finalized_block(database, info.hash, l2_block_number).await?
635-
} else {
636-
None
637-
};
614+
// Get all unprocessed batches that have been finalized by this L1 block finalization.
615+
let finalized_batches =
616+
database.fetch_and_update_unprocessed_finalized_batches(block_number).await?;
638617

639-
// update the chain orchestrator l1 block number.
618+
// Update the chain orchestrator L1 block number.
640619
l1_block_number.store(block_number, Ordering::Relaxed);
641620

642-
Ok(Some(ChainOrchestratorEvent::L1BlockFinalized(
643-
block_number,
644-
finalized_batches,
645-
finalized_block,
646-
)))
621+
Ok(Some(ChainOrchestratorEvent::L1BlockFinalized(block_number, finalized_batches)))
647622
}
648623

649624
/// Handles an L1 message by inserting it into the database.
@@ -715,54 +690,13 @@ impl<
715690
/// Handles a batch finalization event by updating the batch input in the database.
716691
async fn handle_batch_finalization(
717692
database: Arc<Database>,
718-
batch_hash: B256,
719693
batch_index: u64,
720694
block_number: u64,
721-
l1_block_number: Arc<AtomicU64>,
722-
l2_block_number: Arc<AtomicU64>,
723695
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
724696
// finalize all batches up to `batch_index`.
725697
database.finalize_batches_up_to_index(batch_index, block_number).await?;
726698

727-
let mut finalized_block = None;
728-
let mut finalized_batch = None;
729-
730-
// check if the block where the batch was finalized is finalized on L1.
731-
let l1_block_number_value = l1_block_number.load(Ordering::Relaxed);
732-
if l1_block_number_value >= block_number {
733-
// fetch the finalized block.
734-
finalized_block =
735-
Self::fetch_highest_finalized_block(database, batch_hash, l2_block_number).await?;
736-
737-
// set the finalized batch info.
738-
finalized_batch =
739-
Some(WithBlockNumber::new(block_number, BatchInfo::new(batch_index, batch_hash)));
740-
}
741-
742-
let event = ChainOrchestratorEvent::BatchFinalized(finalized_batch, finalized_block);
743-
Ok(Some(event))
744-
}
745-
746-
/// Returns the highest finalized block for the provided batch hash. Will return [`None`] if the
747-
/// block number has already been seen by the chain orchestrator.
748-
async fn fetch_highest_finalized_block(
749-
database: Arc<Database>,
750-
batch_hash: B256,
751-
l2_block_number: Arc<AtomicU64>,
752-
) -> Result<Option<BlockInfo>, ChainOrchestratorError> {
753-
let finalized_block = database.get_highest_block_for_batch_hash(batch_hash).await?;
754-
755-
// only return the block if the chain orchestrator hasn't seen it.
756-
// in which case also update the `l2_finalized_block_number` value.
757-
Ok(finalized_block.filter(|info| {
758-
let current_l2_block_number = l2_block_number.load(Ordering::Relaxed);
759-
if info.number > current_l2_block_number {
760-
l2_block_number.store(info.number, Ordering::Relaxed);
761-
true
762-
} else {
763-
false
764-
}
765-
}))
699+
Ok(None)
766700
}
767701
}
768702

crates/database/db/src/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ mod test {
311311

312312
// Fetch the finalized batch for provided height and verify number.
313313
let batch_infos = db
314-
.get_batches_by_finalized_block_range(100, 110)
314+
.fetch_and_update_unprocessed_finalized_batches(110)
315315
.await
316316
.unwrap()
317317
.into_iter()

crates/database/db/src/models/batch_commit.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub struct Model {
1515
calldata: Vec<u8>,
1616
blob_hash: Option<Vec<u8>>,
1717
pub(crate) finalized_block_number: Option<i64>,
18+
processed: bool,
1819
}
1920

2021
/// The relation for the batch input model.
@@ -50,6 +51,7 @@ impl From<BatchCommitData> for ActiveModel {
5051
calldata: ActiveValue::Set(batch_commit.calldata.0.to_vec()),
5152
blob_hash: ActiveValue::Set(batch_commit.blob_versioned_hash.map(|b| b.to_vec())),
5253
finalized_block_number: ActiveValue::Unchanged(None),
54+
processed: ActiveValue::Unchanged(false),
5355
}
5456
}
5557
}

crates/database/db/src/operations.rs

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -107,19 +107,19 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
107107
.map(|x| x.and_then(|x| x.parse::<u64>().ok()))?)
108108
}
109109

110-
/// Get the finalized batches between the provided range \[low; high\].
111-
async fn get_batches_by_finalized_block_range(
110+
/// Fetches unprocessed batches up to the provided finalized L1 block number and updates their
111+
/// status.
112+
async fn fetch_and_update_unprocessed_finalized_batches(
112113
&self,
113-
low: u64,
114-
high: u64,
114+
finalized_l1_block_number: u64,
115115
) -> Result<Vec<BatchInfo>, DatabaseError> {
116-
Ok(models::batch_commit::Entity::find()
117-
.filter(
118-
Condition::all()
119-
.add(models::batch_commit::Column::FinalizedBlockNumber.is_not_null())
120-
.add(models::batch_commit::Column::FinalizedBlockNumber.gte(low))
121-
.add(models::batch_commit::Column::FinalizedBlockNumber.lte(high)),
122-
)
116+
let conditions = Condition::all()
117+
.add(models::batch_commit::Column::FinalizedBlockNumber.is_not_null())
118+
.add(models::batch_commit::Column::FinalizedBlockNumber.lte(finalized_l1_block_number))
119+
.add(models::batch_commit::Column::Processed.eq(false));
120+
121+
let batches = models::batch_commit::Entity::find()
122+
.filter(conditions.clone())
123123
.order_by_asc(models::batch_commit::Column::Index)
124124
.select_only()
125125
.column(models::batch_commit::Column::Index)
@@ -131,7 +131,15 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
131131
x.into_iter()
132132
.map(|(index, hash)| BatchInfo::new(index as u64, B256::from_slice(&hash)))
133133
.collect()
134-
})?)
134+
})?;
135+
136+
models::batch_commit::Entity::update_many()
137+
.col_expr(models::batch_commit::Column::Processed, Expr::value(true))
138+
.filter(conditions)
139+
.exec(self.get_connection())
140+
.await?;
141+
142+
Ok(batches)
135143
}
136144

137145
/// Delete all [`BatchCommitData`]s with a block number greater than the provided block number.
@@ -358,27 +366,22 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
358366
let finalized_block_number = self.get_finalized_l1_block_number().await?.unwrap_or(0);
359367
self.unwind(genesis_hash, finalized_block_number).await?;
360368

361-
// Fetch the latest safe L2 block and the block number where its associated batch was
362-
// finalized.
369+
// Delete all unprocessed batches from the database and return starting l2 safe head and l1
370+
// head.
363371
let safe = if let Some(batch_info) = self
364372
.get_latest_safe_l2_info()
365373
.await?
366374
.map(|(_, batch_info)| batch_info)
367375
.filter(|b| b.index > 1)
368376
{
369-
let batch = self
370-
.get_batch_by_index(batch_info.index)
371-
.await?
372-
.expect("Batch info must be present due to database query arguments");
377+
let previous_batch_index = batch_info.index - 1;
373378
let previous_batch = self
374-
.get_batch_by_index(batch_info.index - 1)
379+
.get_batch_by_index(previous_batch_index)
375380
.await?
376381
.expect("Batch info must be present due to database query arguments");
382+
self.delete_batches_gt_batch_index(previous_batch_index).await?;
377383
let l2_block = self.get_highest_block_for_batch_hash(previous_batch.hash).await?;
378-
(
379-
l2_block,
380-
Some(batch.finalized_block_number.expect("All blocks in database are finalized")),
381-
)
384+
(l2_block, Some(previous_batch.block_number + 1))
382385
} else {
383386
(None, None)
384387
};

crates/database/migration/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod m20250411_072004_add_l2_block;
88
mod m20250616_223947_add_metadata;
99
mod m20250825_093350_remove_unsafe_l2_blocks;
1010
mod m20250829_042803_add_table_indexes;
11+
mod m20250901_102341_add_commit_batch_processed_column;
1112
mod migration_info;
1213
pub use migration_info::{
1314
MigrationInfo, ScrollDevMigrationInfo, ScrollMainnetMigrationInfo, ScrollSepoliaMigrationInfo,
@@ -27,6 +28,7 @@ impl<MI: MigrationInfo + Send + Sync + 'static> MigratorTrait for Migrator<MI> {
2728
Box::new(m20250616_223947_add_metadata::Migration),
2829
Box::new(m20250825_093350_remove_unsafe_l2_blocks::Migration),
2930
Box::new(m20250829_042803_add_table_indexes::Migration),
31+
Box::new(m20250901_102341_add_commit_batch_processed_column::Migration),
3032
]
3133
}
3234
}

crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,5 @@ pub(crate) enum BatchCommit {
6464
Calldata,
6565
BlobHash,
6666
FinalizedBlockNumber,
67+
Processed,
6768
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use super::m20220101_000001_create_batch_commit_table::BatchCommit;
2+
use sea_orm::Statement;
3+
use sea_orm_migration::prelude::*;
4+
5+
#[derive(DeriveMigrationName)]
6+
pub struct Migration;
7+
8+
#[async_trait::async_trait]
9+
impl MigrationTrait for Migration {
10+
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
11+
// Add the processed column to the batch_commit table.
12+
manager
13+
.alter_table(
14+
Table::alter()
15+
.table(BatchCommit::Table)
16+
.add_column(
17+
ColumnDef::new(BatchCommit::Processed).boolean().not_null().default(false),
18+
)
19+
.to_owned(),
20+
)
21+
.await?;
22+
23+
// Backfill the processed column using data sourced from the l2_block table.
24+
manager
25+
.get_connection()
26+
.execute(Statement::from_sql_and_values(
27+
manager.get_database_backend(),
28+
r#"
29+
UPDATE batch_commit
30+
SET processed = 1
31+
WHERE hash IN (SELECT batch_hash FROM l2_block);
32+
"#,
33+
vec![],
34+
))
35+
.await?;
36+
37+
Ok(())
38+
}
39+
40+
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
41+
// drop the processed column on the batch_commit table.
42+
manager
43+
.alter_table(
44+
Table::alter()
45+
.table(BatchCommit::Table)
46+
.drop_column(BatchCommit::Processed)
47+
.to_owned(),
48+
)
49+
.await
50+
}
51+
}

crates/node/src/args.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ impl ScrollRollupNodeConfig {
233233
});
234234
}
235235

236+
tracing::info!(target: "scroll::node::args", fcs = ?fcs, payload_building_duration = ?self.sequencer_args.payload_building_duration, "Starting engine driver");
236237
let engine = EngineDriver::new(
237238
Arc::new(engine_api),
238239
chain_spec.clone(),
@@ -256,7 +257,7 @@ impl ScrollRollupNodeConfig {
256257

257258
let (l1_notification_tx, l1_notification_rx): (Option<Sender<Arc<L1Notification>>>, _) =
258259
if let Some(provider) = l1_provider.filter(|_| !self.test) {
259-
// Determine the start block number for the L1 watcher
260+
tracing::info!(target: "scroll::node::args", ?l1_start_block_number, "Starting L1 watcher");
260261
(None, Some(L1Watcher::spawn(provider, l1_start_block_number, node_config).await))
261262
} else {
262263
// Create a channel for L1 notifications that we can use to inject L1 messages for

crates/node/tests/e2e.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -978,15 +978,7 @@ async fn graceful_shutdown_consolidates_most_recent_batch_on_startup() -> eyre::
978978
let mut rnm_events = handle.get_event_listener().await?;
979979

980980
// Send the second batch again to mimic the watcher behaviour.
981-
l1_notification_tx.send(Arc::new(L1Notification::BatchCommit(batch_0_data.clone()))).await?;
982981
l1_notification_tx.send(Arc::new(L1Notification::BatchCommit(batch_1_data.clone()))).await?;
983-
l1_notification_tx
984-
.send(Arc::new(L1Notification::BatchFinalization {
985-
hash: batch_0_data.hash,
986-
index: batch_0_data.index,
987-
block_number: batch_0_data.block_number,
988-
}))
989-
.await?;
990982
l1_notification_tx
991983
.send(Arc::new(L1Notification::BatchFinalization {
992984
hash: batch_1_data.hash,

0 commit comments

Comments
 (0)