Skip to content

Commit 5509e7b

Browse files
authored
fix: l1 message indexing (#295)
* fix l1 message indexing * update queue hash boundary logic * update queue hash boundary logic * update queue hash boundary logic * update queue hash boundary logic * address comments * update comment
1 parent 12a3246 commit 5509e7b

File tree

8 files changed

+117
-48
lines changed

8 files changed

+117
-48
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/chain-orchestrator/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ reth-scroll-forks.workspace = true
6363
# reth
6464
reth-eth-wire-types.workspace = true
6565
reth-network-peers.workspace = true
66+
reth-tracing.workspace = true
6667

6768
# misc
6869
arbitrary.workspace = true

crates/chain-orchestrator/src/lib.rs

Lines changed: 59 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use rollup_node_primitives::{
1515
};
1616
use rollup_node_watcher::L1Notification;
1717
use scroll_alloy_consensus::TxL1Message;
18-
use scroll_alloy_hardforks::{ScrollHardfork, ScrollHardforks};
18+
use scroll_alloy_hardforks::ScrollHardforks;
1919
use scroll_alloy_network::Scroll;
2020
use scroll_db::{Database, DatabaseError, DatabaseOperations, L1MessageStart, UnwindResult};
2121
use scroll_network::NewBlockWithPeer;
@@ -79,6 +79,8 @@ pub struct ChainOrchestrator<ChainSpec, BC, P> {
7979
chain_buffer_size: usize,
8080
/// A boolean to represent if the L1 has been synced.
8181
l1_synced: bool,
82+
/// The L1 message queue index at which the V2 L1 message queue was enabled.
83+
l1_v2_message_queue_start_index: u64,
8284
/// The waker to notify when the engine driver should be polled.
8385
waker: AtomicWaker,
8486
}
@@ -97,6 +99,7 @@ impl<
9799
l2_client: P,
98100
optimistic_sync_threshold: u64,
99101
chain_buffer_size: usize,
102+
l1_v2_message_queue_start_index: u64,
100103
) -> Result<Self, ChainOrchestratorError> {
101104
let chain = init_chain_from_db(&database, &l2_client, chain_buffer_size).await?;
102105
Ok(Self {
@@ -117,6 +120,7 @@ impl<
117120
optimistic_sync_threshold,
118121
chain_buffer_size,
119122
l1_synced: false,
123+
l1_v2_message_queue_start_index,
120124
waker: AtomicWaker::new(),
121125
})
122126
}
@@ -534,15 +538,14 @@ impl<
534538
Box::pin(Self::handle_batch_commit(self.database.clone(), batch)),
535539
))
536540
}
537-
L1Notification::L1Message { message, block_number, block_timestamp } => {
541+
L1Notification::L1Message { message, block_number, block_timestamp: _ } => {
538542
ChainOrchestratorFuture::HandleL1Message(self.handle_metered(
539543
ChainOrchestratorItem::L1Message,
540544
Box::pin(Self::handle_l1_message(
545+
self.l1_v2_message_queue_start_index,
541546
self.database.clone(),
542-
self.chain_spec.clone(),
543547
message,
544548
block_number,
545-
block_timestamp,
546549
)),
547550
))
548551
}
@@ -623,33 +626,15 @@ impl<
623626

624627
/// Handles an L1 message by inserting it into the database.
625628
async fn handle_l1_message(
629+
l1_v2_message_queue_start_index: u64,
626630
database: Arc<Database>,
627-
chain_spec: Arc<ChainSpec>,
628631
l1_message: TxL1Message,
629632
l1_block_number: u64,
630-
block_timestamp: u64,
631633
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
632634
let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index);
633-
634-
let queue_hash = if chain_spec
635-
.scroll_fork_activation(ScrollHardfork::EuclidV2)
636-
.active_at_timestamp_or_number(block_timestamp, l1_block_number) &&
637-
l1_message.queue_index > 0
638-
{
639-
let index = l1_message.queue_index - 1;
640-
let prev_queue_hash = database
641-
.get_l1_message_by_index(index)
642-
.await?
643-
.map(|m| m.queue_hash)
644-
.ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))?;
645-
646-
let mut input = prev_queue_hash.unwrap_or_default().to_vec();
647-
input.append(&mut l1_message.tx_hash().to_vec());
648-
Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK)
649-
} else {
650-
None
651-
};
652-
635+
let queue_hash =
636+
compute_l1_message_queue_hash(&database, &l1_message, l1_v2_message_queue_start_index)
637+
.await?;
653638
let l1_message = L1MessageEnvelope::new(l1_message, l1_block_number, None, queue_hash);
654639
database.insert_l1_message(l1_message).await?;
655640
Ok(Some(event))
@@ -700,6 +685,39 @@ impl<
700685
}
701686
}
702687

688+
/// Computes the queue hash by taking the previous queue hash and performing a 2-to-1 hash with the
689+
/// current transaction hash using keccak. It then applies a mask to the last 32 bits as these bits
690+
/// are used to store the timestamp at which the message was enqueued in the contract. For the first
691+
/// message in the queue, the previous queue hash is zero. If the L1 message queue index is before
692+
/// migration to `L1MessageQueueV2`, the queue hash will be None.
693+
///
694+
/// The solidity contract (`L1MessageQueueV2.sol`) implementation is defined here: <https://github.com/scroll-tech/scroll-contracts/blob/67c1bde19c1d3462abf8c175916a2bb3c89530e4/src/L1/rollup/L1MessageQueueV2.sol#L379-L403>
695+
async fn compute_l1_message_queue_hash(
696+
database: &Arc<Database>,
697+
l1_message: &TxL1Message,
698+
l1_v2_message_queue_start_index: u64,
699+
) -> Result<Option<alloy_primitives::FixedBytes<32>>, ChainOrchestratorError> {
700+
let queue_hash = if l1_message.queue_index == l1_v2_message_queue_start_index {
701+
let mut input = B256::default().to_vec();
702+
input.append(&mut l1_message.tx_hash().to_vec());
703+
Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK)
704+
} else if l1_message.queue_index > l1_v2_message_queue_start_index {
705+
let index = l1_message.queue_index - 1;
706+
let mut input = database
707+
.get_l1_message_by_index(index)
708+
.await?
709+
.map(|m| m.queue_hash)
710+
.ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))?
711+
.unwrap_or_default()
712+
.to_vec();
713+
input.append(&mut l1_message.tx_hash().to_vec());
714+
Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK)
715+
} else {
716+
None
717+
};
718+
Ok(queue_hash)
719+
}
720+
703721
async fn init_chain_from_db<P: Provider<Scroll> + 'static>(
704722
database: &Arc<Database>,
705723
l2_client: &P,
@@ -954,6 +972,7 @@ mod test {
954972

955973
const TEST_OPTIMISTIC_SYNC_THRESHOLD: u64 = 100;
956974
const TEST_CHAIN_BUFFER_SIZE: usize = 2000;
975+
const TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY: u64 = 953885;
957976

958977
/// A headers+bodies client that stores the headers and bodies in memory, with an artificial
959978
/// soft bodies response limit that is set to 20 by default.
@@ -1105,6 +1124,7 @@ mod test {
11051124
.expect("Failed to parse mainnet genesis block");
11061125
assertor.push_success(&mainnet_genesis);
11071126
let provider = ProviderBuilder::<_, _, Scroll>::default().connect_mocked_client(assertor);
1127+
11081128
let db = Arc::new(setup_test_db().await);
11091129
(
11101130
ChainOrchestrator::new(
@@ -1114,6 +1134,7 @@ mod test {
11141134
provider,
11151135
TEST_OPTIMISTIC_SYNC_THRESHOLD,
11161136
TEST_CHAIN_BUFFER_SIZE,
1137+
TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY,
11171138
)
11181139
.await
11191140
.unwrap(),
@@ -1274,6 +1295,8 @@ mod test {
12741295

12751296
#[tokio::test]
12761297
async fn test_handle_l1_message() {
1298+
reth_tracing::init_test_tracing();
1299+
12771300
// Instantiate chain orchestrator and db
12781301
let (mut chain_orchestrator, db) = setup_test_chain_orchestrator().await;
12791302

@@ -1283,7 +1306,7 @@ mod test {
12831306
let mut u = Unstructured::new(&bytes);
12841307

12851308
let message = TxL1Message {
1286-
queue_index: i64::arbitrary(&mut u).unwrap().unsigned_abs(),
1309+
queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY - 1,
12871310
..Arbitrary::arbitrary(&mut u).unwrap()
12881311
};
12891312
let block_number = u64::arbitrary(&mut u).unwrap();
@@ -1309,15 +1332,18 @@ mod test {
13091332

13101333
// insert the previous L1 message in database.
13111334
chain_orchestrator.handle_l1_notification(L1Notification::L1Message {
1312-
message: TxL1Message { queue_index: 1062109, ..Default::default() },
1335+
message: TxL1Message {
1336+
queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY,
1337+
..Default::default()
1338+
},
13131339
block_number: 1475588,
13141340
block_timestamp: 1745305199,
13151341
});
13161342
let _ = chain_orchestrator.next().await.unwrap().unwrap();
13171343

13181344
// <https://sepolia.scrollscan.com/tx/0xd80cd61ac5d8665919da19128cc8c16d3647e1e2e278b931769e986d01c6b910>
13191345
let message = TxL1Message {
1320-
queue_index: 1062110,
1346+
queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY + 1,
13211347
gas_limit: 168000,
13221348
to: address!("Ba50f5340FB9F3Bd074bD638c9BE13eCB36E603d"),
13231349
value: U256::ZERO,
@@ -1336,7 +1362,7 @@ mod test {
13361362
db.get_l1_message_by_index(message.queue_index).await.unwrap().unwrap();
13371363

13381364
assert_eq!(
1339-
b256!("5e48ae1092c7f912849b9935f4e66870d2034b24fb2016f506e6754900000000"),
1365+
b256!("b2331b9010aac89f012d648fccc1f0a9aa5ef7b7b2afe21be297dd1a00000000"),
13401366
l1_message_result.queue_hash.unwrap()
13411367
);
13421368
}
@@ -1380,19 +1406,19 @@ mod test {
13801406
queue_hash: None,
13811407
l1_block_number: 1,
13821408
l2_block_number: None,
1383-
..Arbitrary::arbitrary(&mut u).unwrap()
1409+
transaction: TxL1Message { queue_index: 1, ..Arbitrary::arbitrary(&mut u).unwrap() },
13841410
};
13851411
let l1_message_block_20 = L1MessageEnvelope {
13861412
queue_hash: None,
13871413
l1_block_number: 20,
13881414
l2_block_number: None,
1389-
..Arbitrary::arbitrary(&mut u).unwrap()
1415+
transaction: TxL1Message { queue_index: 2, ..Arbitrary::arbitrary(&mut u).unwrap() },
13901416
};
13911417
let l1_message_block_30 = L1MessageEnvelope {
13921418
queue_hash: None,
13931419
l1_block_number: 30,
13941420
l2_block_number: None,
1395-
..Arbitrary::arbitrary(&mut u).unwrap()
1421+
transaction: TxL1Message { queue_index: 3, ..Arbitrary::arbitrary(&mut u).unwrap() },
13961422
};
13971423

13981424
// Index L1 messages

crates/derivation-pipeline/benches/pipeline.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ async fn setup_pipeline(
6767
// construct the pipeline.
6868
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
6969
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
70-
DerivationPipeline::new(mock_l1_provider, db)
70+
DerivationPipeline::new(mock_l1_provider, db, u64::MAX)
7171
}
7272

7373
fn benchmark_pipeline_derivation(c: &mut Criterion) {

crates/derivation-pipeline/src/lib.rs

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ pub struct DerivationPipeline<P> {
5656
database: Arc<Database>,
5757
/// A L1 provider.
5858
l1_provider: P,
59+
/// The L1 message queue index at which the V2 L1 message queue was enabled.
60+
l1_v2_message_queue_start_index: u64,
5961
/// The queue of batches to handle.
6062
batch_queue: VecDeque<WithFinalizedBlockNumber<Arc<BatchInfo>>>,
6163
/// The queue of polled attributes.
@@ -90,10 +92,15 @@ where
9092
P: L1Provider + Clone + Send + Sync + 'static,
9193
{
9294
/// Returns a new instance of the [`DerivationPipeline`].
93-
pub fn new(l1_provider: P, database: Arc<Database>) -> Self {
95+
pub fn new(
96+
l1_provider: P,
97+
database: Arc<Database>,
98+
l1_v2_message_queue_start_index: u64,
99+
) -> Self {
94100
Self {
95101
database,
96102
l1_provider,
103+
l1_v2_message_queue_start_index,
97104
batch_queue: Default::default(),
98105
pipeline_future: None,
99106
attributes_queue: Default::default(),
@@ -119,6 +126,7 @@ where
119126
let database = self.database.clone();
120127
let metrics = self.metrics.clone();
121128
let provider = self.l1_provider.clone();
129+
let l1_v2_message_queue_start_index = self.l1_v2_message_queue_start_index;
122130

123131
if let Some(info) = self.batch_queue.pop_front() {
124132
let block_number = info.number;
@@ -136,8 +144,9 @@ where
136144
.ok_or((info.clone(), DerivationPipelineError::UnknownBatch(index)))?;
137145

138146
// derive the attributes and attach the corresponding batch info.
139-
let attrs =
140-
derive(batch, provider, database).await.map_err(|err| (info.clone(), err))?;
147+
let attrs = derive(batch, provider, database, l1_v2_message_queue_start_index)
148+
.await
149+
.map_err(|err| (info.clone(), err))?;
141150

142151
// update metrics.
143152
metrics.derived_blocks.increment(attrs.len() as u64);
@@ -245,6 +254,7 @@ pub async fn derive<L1P: L1Provider + Sync + Send, L2P: BlockDataProvider + Sync
245254
batch: BatchCommitData,
246255
l1_provider: L1P,
247256
l2_provider: L2P,
257+
l1_v2_message_queue_start_index: u64,
248258
) -> Result<Vec<ScrollPayloadAttributes>, DerivationPipelineError> {
249259
// fetch the blob then decode the input batch.
250260
let blob = if let Some(hash) = batch.blob_versioned_hash {
@@ -260,10 +270,18 @@ pub async fn derive<L1P: L1Provider + Sync + Send, L2P: BlockDataProvider + Sync
260270
if let Some(index) = data.queue_index_start() {
261271
l1_provider.set_queue_index_cursor(index);
262272
} else if let Some(hash) = data.prev_l1_message_queue_hash() {
263-
l1_provider.set_hash_cursor(*hash).await;
264-
// we skip the first l1 message, as we are interested in the one starting after
265-
// prev_l1_message_queue_hash.
266-
let _ = l1_provider.next_l1_message().await.map_err(Into::into)?;
273+
// If the message queue hash is zero then we should use the V2 L1 message queue start index.
274+
// We must apply this branch logic because we do not have a L1 message associated with a
275+
// queue hash of ZERO (we only compute a queue hash for the first L1 message of the V2
276+
// contract).
277+
if hash == &B256::ZERO {
278+
l1_provider.set_queue_index_cursor(l1_v2_message_queue_start_index);
279+
} else {
280+
l1_provider.set_hash_cursor(*hash).await;
281+
// we skip the first l1 message, as we are interested in the one starting after
282+
// prev_l1_message_queue_hash.
283+
let _ = l1_provider.next_l1_message().await.map_err(Into::into)?;
284+
}
267285
} else {
268286
return Err(DerivationPipelineError::MissingL1MessageQueueCursor)
269287
}
@@ -411,6 +429,7 @@ mod tests {
411429
)),
412430
database: db,
413431
l1_provider: mock_l1_provider,
432+
l1_v2_message_queue_start_index: u64::MAX,
414433
batch_queue: [
415434
WithFinalizedBlockNumber::new(
416435
0,
@@ -470,7 +489,7 @@ mod tests {
470489
// construct the pipeline.
471490
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
472491
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
473-
let mut pipeline = DerivationPipeline::new(mock_l1_provider, db.clone());
492+
let mut pipeline = DerivationPipeline::new(mock_l1_provider, db.clone(), u64::MAX);
474493

475494
// as long as we don't call `push_batch`, pipeline should not return attributes.
476495
pipeline.push_batch(BatchInfo { index: 12, hash: Default::default() }, 0);
@@ -537,7 +556,7 @@ mod tests {
537556
// construct the pipeline.
538557
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
539558
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
540-
let mut pipeline = DerivationPipeline::new(mock_l1_provider, db);
559+
let mut pipeline = DerivationPipeline::new(mock_l1_provider, db, u64::MAX);
541560

542561
// as long as we don't call `push_batch`, pipeline should not return attributes.
543562
pipeline.push_batch(BatchInfo { index: 12, hash: Default::default() }, 0);
@@ -596,7 +615,7 @@ mod tests {
596615
let l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
597616
let l2_provider = MockL2Provider;
598617

599-
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?;
618+
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?;
600619
let attribute =
601620
attributes.iter().find(|a| a.payload_attributes.timestamp == 1696935384).unwrap();
602621

@@ -695,7 +714,7 @@ mod tests {
695714
let l2_provider = MockL2Provider;
696715

697716
// derive attributes and extract l1 messages.
698-
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?;
717+
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?;
699718
let derived_l1_messages: Vec<_> = attributes
700719
.into_iter()
701720
.filter_map(|a| a.transactions)
@@ -749,7 +768,7 @@ mod tests {
749768
let l2_provider = MockL2Provider;
750769

751770
// derive attributes and extract l1 messages.
752-
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?;
771+
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?;
753772
let derived_l1_messages: Vec<_> = attributes
754773
.into_iter()
755774
.filter_map(|a| a.transactions)
@@ -863,7 +882,7 @@ mod tests {
863882
};
864883
let l2_provider = MockL2Provider;
865884

866-
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?;
885+
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?;
867886

868887
let attribute = attributes.last().unwrap();
869888
let expected = ScrollPayloadAttributes {
@@ -918,6 +937,7 @@ mod tests {
918937
)),
919938
database: db,
920939
l1_provider: mock_l1_provider,
940+
l1_v2_message_queue_start_index: u64::MAX,
921941
batch_queue: batches.collect(),
922942
attributes_queue: attributes,
923943
waker: None,

0 commit comments

Comments
 (0)