Skip to content

Commit 143a60f

Browse files
authored
Extract Indexer as a separate task (#266)
* refactor the indexer as a separate tokio task to mitigate contention issues * add and remove .rs files * clean up
1 parent 526cc3c commit 143a60f

File tree

7 files changed

+251
-231
lines changed

7 files changed

+251
-231
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/indexer/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ metrics.workspace = true
3131
metrics-derive.workspace = true
3232
strum = "0.27.1"
3333
thiserror.workspace = true
34-
tokio.workspace = true
34+
tokio = { workspace = true, features = ["full"] }
35+
tokio-stream.workspace = true
36+
tracing.workspace = true
3537

3638
[dev-dependencies]
3739
alloy-primitives = { workspace = true, features = ["arbitrary"] }

crates/indexer/src/action.rs

Lines changed: 0 additions & 53 deletions
This file was deleted.

crates/indexer/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,7 @@ pub enum IndexerError {
99
/// An error occurred while trying to fetch the L2 block from the database.
1010
#[error("L2 block not found - block number: {0}")]
1111
L2BlockNotFound(u64),
12+
/// The Indexer command channel was closed.
13+
#[error("Indexer command channel closed")]
14+
CommandChannelClosed,
1215
}

crates/indexer/src/handle.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use super::{IndexerError, IndexerEvent};
2+
use futures::{stream::StreamExt, Stream};
3+
use rollup_node_primitives::{BatchInfo, L2BlockInfoWithL1Messages};
4+
use rollup_node_watcher::L1Notification;
5+
use tokio::sync::mpsc::UnboundedSender;
6+
use tokio_stream::wrappers::UnboundedReceiverStream;
7+
8+
/// The commands that can be sent to the indexer.
9+
#[derive(Debug)]
10+
pub enum IndexerCommand {
11+
/// A command to handle an L1 notification.
12+
L1Notification(L1Notification),
13+
/// A command to handle a block.
14+
Block {
15+
/// The L2 block with L1 messages.
16+
block: L2BlockInfoWithL1Messages,
17+
/// Optional batch information.
18+
batch: Option<BatchInfo>,
19+
},
20+
}
21+
22+
/// The handle for the indexer, allowing to send commands and receive events.
23+
#[derive(Debug)]
24+
pub struct IndexerHandle {
25+
command_tx: UnboundedSender<IndexerCommand>,
26+
event_rx: UnboundedReceiverStream<Result<IndexerEvent, IndexerError>>,
27+
}
28+
29+
impl IndexerHandle {
30+
/// Creates a new [`IndexerHandle`] instance.
31+
pub const fn new(
32+
request_tx: UnboundedSender<IndexerCommand>,
33+
event_rx: UnboundedReceiverStream<Result<IndexerEvent, IndexerError>>,
34+
) -> Self {
35+
Self { command_tx: request_tx, event_rx }
36+
}
37+
38+
/// Sends a command to the indexer.
39+
pub fn handle_block(
40+
&self,
41+
block: L2BlockInfoWithL1Messages,
42+
batch: Option<BatchInfo>,
43+
) -> Result<(), IndexerError> {
44+
self.command_tx
45+
.send(IndexerCommand::Block { block, batch })
46+
.map_err(|_| IndexerError::CommandChannelClosed)
47+
}
48+
49+
/// Sends a L1 notification to the indexer.
50+
pub fn handle_l1_notification(&self, notification: L1Notification) -> Result<(), IndexerError> {
51+
self.command_tx
52+
.send(IndexerCommand::L1Notification(notification))
53+
.map_err(|_| IndexerError::CommandChannelClosed)
54+
}
55+
}
56+
57+
impl Stream for IndexerHandle {
58+
type Item = Result<IndexerEvent, IndexerError>;
59+
60+
fn poll_next(
61+
self: std::pin::Pin<&mut Self>,
62+
cx: &mut std::task::Context<'_>,
63+
) -> std::task::Poll<Option<Self::Item>> {
64+
self.get_mut().event_rx.poll_next_unpin(cx)
65+
}
66+
}

0 commit comments

Comments
 (0)