Skip to content

Commit 4041ba7

Browse files
jonastheisgreged93
andauthored
feat(e2e tests): invalid block penalizes peer (#228)
* Add NetworkHandle to RollupManagerCommand * finish can_penalize_peer_for_invalid_block test * fix test after merge * fix linter * fix linter * remove unrelated formatting changes * fix after merge * address review comments * Fixes after merge --------- Co-authored-by: greg <[email protected]>
1 parent f8680a0 commit 4041ba7

File tree

8 files changed

+140
-19
lines changed

8 files changed

+140
-19
lines changed

crates/engine/src/future/result.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::*;
22

33
/// A type that represents the result of the engine driver future.
4+
#[derive(Debug)]
45
pub(crate) enum EngineDriverFutureResult {
56
BlockImport(
67
Result<

crates/manager/src/manager/command.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
use super::{RollupManagerEvent, RollupManagerStatus};
22

3+
use reth_network_api::FullNetwork;
4+
use reth_scroll_node::ScrollNetworkPrimitives;
35
use reth_tokio_util::EventStream;
6+
use scroll_network::ScrollNetworkHandle;
47
use tokio::sync::oneshot;
58

69
/// The commands that can be sent to the rollup manager.
710
#[derive(Debug)]
8-
pub enum RollupManagerCommand {
11+
pub enum RollupManagerCommand<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> {
912
/// Command to build a new block.
1013
BuildBlock,
1114
/// Returns an event stream for rollup manager events.
1215
EventListener(oneshot::Sender<EventStream<RollupManagerEvent>>),
1316
/// Report the current status of the manager via the oneshot channel.
1417
Status(oneshot::Sender<RollupManagerStatus>),
18+
/// Returns the network handle.
19+
NetworkHandle(oneshot::Sender<ScrollNetworkHandle<N>>),
1520
}

crates/manager/src/manager/handle.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,25 @@
11
use super::{RollupManagerCommand, RollupManagerEvent};
2+
use reth_network_api::FullNetwork;
3+
use reth_scroll_node::ScrollNetworkPrimitives;
24
use reth_tokio_util::EventStream;
5+
use scroll_network::ScrollNetworkHandle;
36
use tokio::sync::{mpsc, oneshot};
47

58
/// The handle used to send commands to the rollup manager.
69
#[derive(Debug, Clone)]
7-
pub struct RollupManagerHandle {
10+
pub struct RollupManagerHandle<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> {
811
/// The channel used to send commands to the rollup manager.
9-
to_manager_tx: mpsc::Sender<RollupManagerCommand>,
12+
to_manager_tx: mpsc::Sender<RollupManagerCommand<N>>,
1013
}
1114

12-
impl RollupManagerHandle {
15+
impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> RollupManagerHandle<N> {
1316
/// Create a new rollup manager handle.
14-
pub const fn new(to_manager_tx: mpsc::Sender<RollupManagerCommand>) -> Self {
17+
pub const fn new(to_manager_tx: mpsc::Sender<RollupManagerCommand<N>>) -> Self {
1518
Self { to_manager_tx }
1619
}
1720

1821
/// Sends a command to the rollup manager.
19-
pub async fn send_command(&self, command: RollupManagerCommand) {
22+
pub async fn send_command(&self, command: RollupManagerCommand<N>) {
2023
let _ = self.to_manager_tx.send(command).await;
2124
}
2225

@@ -25,6 +28,15 @@ impl RollupManagerHandle {
2528
self.send_command(RollupManagerCommand::BuildBlock).await;
2629
}
2730

31+
/// Sends a command to the rollup manager to get the network handle.
32+
pub async fn get_network_handle(
33+
&self,
34+
) -> Result<ScrollNetworkHandle<N>, oneshot::error::RecvError> {
35+
let (tx, rx) = oneshot::channel();
36+
self.send_command(RollupManagerCommand::NetworkHandle(tx)).await;
37+
rx.await
38+
}
39+
2840
/// Sends a command to the rollup manager to fetch an event listener for the rollup node
2941
/// manager.
3042
pub async fn get_event_listener(

crates/manager/src/manager/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ pub struct RollupNodeManager<
7878
CS,
7979
> {
8080
/// The handle receiver used to receive commands.
81-
handle_rx: Receiver<RollupManagerCommand>,
81+
handle_rx: Receiver<RollupManagerCommand<N>>,
8282
/// The chain spec used by the rollup node.
8383
chain_spec: Arc<CS>,
8484
/// The network manager that manages the scroll p2p network.
@@ -164,7 +164,7 @@ where
164164
sequencer: Option<Sequencer<L1MP>>,
165165
signer: Option<SignerHandle>,
166166
block_time: Option<u64>,
167-
) -> (Self, RollupManagerHandle) {
167+
) -> (Self, RollupManagerHandle<N>) {
168168
let (handle_tx, handle_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
169169
let indexer = Indexer::new(database.clone(), chain_spec.clone());
170170
let derivation_pipeline = DerivationPipeline::new(l1_provider, database);
@@ -450,6 +450,11 @@ where
450450
RollupManagerCommand::Status(tx) => {
451451
tx.send(this.status()).expect("Failed to send status to handle");
452452
}
453+
RollupManagerCommand::NetworkHandle(tx) => {
454+
let network_handle = this.network.handle();
455+
tx.send(network_handle.clone())
456+
.expect("Failed to send network handle to handle");
457+
}
453458
}
454459
}
455460

crates/node/src/add_ons/handle.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,31 @@
1+
use reth_network_api::FullNetwork;
12
use reth_node_api::FullNodeComponents;
23
use reth_node_builder::rpc::{RpcHandle, RpcHandleProvider};
34
use reth_rpc_eth_api::EthApiTypes;
5+
use reth_scroll_node::ScrollNetworkPrimitives;
46
use rollup_node_manager::RollupManagerHandle;
57
#[cfg(feature = "test-utils")]
68
use {rollup_node_watcher::L1Notification, std::sync::Arc, tokio::sync::mpsc::Sender};
79

810
/// A handle for scroll addons, which includes handles for the rollup manager and RPC server.
911
#[derive(Debug, Clone)]
10-
pub struct ScrollAddOnsHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
12+
pub struct ScrollAddOnsHandle<
13+
Node: FullNodeComponents<Network: FullNetwork<Primitives = ScrollNetworkPrimitives>>,
14+
EthApi: EthApiTypes,
15+
> {
1116
/// The handle used to send commands to the rollup manager.
12-
pub rollup_manager_handle: RollupManagerHandle,
17+
pub rollup_manager_handle: RollupManagerHandle<Node::Network>,
1318
/// The handle used to send commands to the RPC server.
1419
pub rpc_handle: RpcHandle<Node, EthApi>,
1520
/// An optional channel used to send `L1Watcher` notifications to the `RollupNodeManager`.
1621
#[cfg(feature = "test-utils")]
1722
pub l1_watcher_tx: Option<Sender<Arc<L1Notification>>>,
1823
}
1924

20-
impl<Node: FullNodeComponents, EthApi: EthApiTypes> RpcHandleProvider<Node, EthApi>
21-
for ScrollAddOnsHandle<Node, EthApi>
25+
impl<
26+
Node: FullNodeComponents<Network: FullNetwork<Primitives = ScrollNetworkPrimitives>>,
27+
EthApi: EthApiTypes,
28+
> RpcHandleProvider<Node, EthApi> for ScrollAddOnsHandle<Node, EthApi>
2229
{
2330
fn rpc_handle(&self) -> &RpcHandle<Node, EthApi> {
2431
&self.rpc_handle

crates/node/src/add_ons/rollup.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl RollupManagerAddOn {
5050
self,
5151
ctx: AddOnsContext<'_, N>,
5252
rpc: RpcHandle<N, EthApi>,
53-
) -> eyre::Result<(RollupManagerHandle, Option<Sender<Arc<L1Notification>>>)>
53+
) -> eyre::Result<(RollupManagerHandle<N::Network>, Option<Sender<Arc<L1Notification>>>)>
5454
where
5555
<<N as FullNodeTypes>::Types as NodeTypes>::ChainSpec:
5656
ChainConfig<Config = ScrollChainConfig> + ScrollHardforks + IsDevChain,

crates/node/src/args.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ impl ScrollRollupNodeConfig {
124124
impl L1MessageProvider,
125125
impl ScrollHardforks + EthChainSpec<Header: BlockHeader> + IsDevChain + Clone + 'static,
126126
>,
127-
RollupManagerHandle,
127+
RollupManagerHandle<N>,
128128
Option<Sender<Arc<L1Notification>>>,
129129
)>
130130
where

crates/node/tests/e2e.rs

Lines changed: 96 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ use alloy_signer::Signer;
66
use alloy_signer_local::PrivateKeySigner;
77
use futures::StreamExt;
88
use reth_chainspec::EthChainSpec;
9-
use reth_network::{NetworkConfigBuilder, PeersInfo};
9+
use reth_network::{NetworkConfigBuilder, Peers, PeersInfo};
1010
use reth_network_api::block::EthWireProvider;
1111
use reth_rpc_api::EthApiServer;
1212
use reth_scroll_chainspec::SCROLL_DEV;
1313
use reth_scroll_node::ScrollNetworkPrimitives;
14+
use reth_scroll_primitives::ScrollBlock;
1415
use reth_tokio_util::EventStream;
1516
use rollup_node::{
1617
constants::SCROLL_GAS_LIMIT,
@@ -22,16 +23,19 @@ use rollup_node::{
2223
GasPriceOracleArgs, L1ProviderArgs, NetworkArgs as ScrollNetworkArgs, RollupNodeContext,
2324
ScrollRollupNodeConfig, SequencerArgs,
2425
};
25-
use rollup_node_manager::{RollupManagerCommand, RollupManagerEvent, RollupManagerHandle};
26+
use rollup_node_manager::{RollupManagerCommand, RollupManagerEvent};
2627
use rollup_node_primitives::{sig_encode_hash, BatchCommitData, ConsensusUpdate};
2728
use rollup_node_providers::BlobSource;
2829
use rollup_node_sequencer::L1MessageInclusionMode;
2930
use rollup_node_watcher::L1Notification;
3031
use scroll_alloy_consensus::TxL1Message;
3132
use scroll_network::{NewBlockWithPeer, SCROLL_MAINNET};
3233
use scroll_wire::{ScrollWireConfig, ScrollWireProtocolHandler};
33-
use std::{path::PathBuf, sync::Arc};
34-
use tokio::sync::{oneshot, Mutex};
34+
use std::{path::PathBuf, sync::Arc, time::Duration};
35+
use tokio::{
36+
sync::{oneshot, Mutex},
37+
time,
38+
};
3539
use tracing::trace;
3640

3741
#[tokio::test]
@@ -63,7 +67,7 @@ async fn can_bridge_l1_messages() -> eyre::Result<()> {
6367
let (mut nodes, _tasks, _wallet) = setup_engine(node_args, 1, chain_spec, false, false).await?;
6468
let node = nodes.pop().unwrap();
6569

66-
let rnm_handle: RollupManagerHandle = node.inner.add_ons_handle.rollup_manager_handle.clone();
70+
let rnm_handle = node.inner.add_ons_handle.rollup_manager_handle.clone();
6771
let mut rnm_events = rnm_handle.get_event_listener().await?;
6872
let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.clone().unwrap();
6973

@@ -167,6 +171,93 @@ async fn can_sequence_and_gossip_blocks() {
167171
.await;
168172
}
169173

174+
#[tokio::test]
175+
async fn can_penalize_peer_for_invalid_block() {
176+
reth_tracing::init_test_tracing();
177+
178+
// create 2 nodes
179+
let chain_spec = (*SCROLL_DEV).clone();
180+
let rollup_manager_args = ScrollRollupNodeConfig {
181+
test: true,
182+
network_args: ScrollNetworkArgs {
183+
enable_eth_scroll_wire_bridge: true,
184+
enable_scroll_wire: true,
185+
sequencer_url: None,
186+
},
187+
database_args: DatabaseArgs { path: Some(PathBuf::from("sqlite::memory:")) },
188+
l1_provider_args: L1ProviderArgs::default(),
189+
engine_driver_args: EngineDriverArgs::default(),
190+
sequencer_args: SequencerArgs {
191+
sequencer_enabled: true,
192+
block_time: 0,
193+
l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0),
194+
payload_building_duration: 1000,
195+
..SequencerArgs::default()
196+
},
197+
beacon_provider_args: BeaconProviderArgs {
198+
blob_source: BlobSource::Mock,
199+
..Default::default()
200+
},
201+
signer_args: Default::default(),
202+
gas_price_oracle_args: GasPriceOracleArgs::default(),
203+
consensus_args: ConsensusArgs::noop(),
204+
};
205+
206+
let (nodes, _tasks, _) =
207+
setup_engine(rollup_manager_args, 2, chain_spec, false, false).await.unwrap();
208+
209+
let node0_rmn_handle = nodes[0].inner.add_ons_handle.rollup_manager_handle.clone();
210+
let node0_network_handle = node0_rmn_handle.get_network_handle().await.unwrap();
211+
let node0_id = node0_network_handle.inner().peer_id();
212+
213+
let node1_rnm_handle = nodes[1].inner.add_ons_handle.rollup_manager_handle.clone();
214+
let node1_network_handle = node1_rnm_handle.get_network_handle().await.unwrap();
215+
216+
// get initial reputation of node0 from pov of node1
217+
let initial_reputation =
218+
node1_network_handle.inner().reputation_by_id(*node0_id).await.unwrap().unwrap();
219+
assert_eq!(initial_reputation, 0);
220+
221+
// create invalid block
222+
let block = ScrollBlock::default();
223+
224+
// send invalid block from node0 to node1. We don't care about the signature here since we use a
225+
// NoopConsensus in the test.
226+
node0_network_handle.announce_block(block, Signature::new(U256::from(1), U256::from(1), false));
227+
228+
eventually(
229+
Duration::from_secs(5),
230+
Duration::from_millis(10),
231+
"Peer0 reputation should be lower after sending invalid block",
232+
|| async {
233+
// check that the node0 is penalized on node1
234+
let slashed_reputation =
235+
node1_network_handle.inner().reputation_by_id(*node0_id).await.unwrap().unwrap();
236+
slashed_reputation < initial_reputation
237+
},
238+
)
239+
.await;
240+
}
241+
242+
/// Helper function to wait until a predicate is true or a timeout occurs.
243+
pub async fn eventually<F, Fut>(timeout: Duration, tick: Duration, message: &str, mut predicate: F)
244+
where
245+
F: FnMut() -> Fut,
246+
Fut: std::future::Future<Output = bool>,
247+
{
248+
let mut interval = time::interval(tick);
249+
let start = time::Instant::now();
250+
loop {
251+
if predicate().await {
252+
return;
253+
}
254+
255+
assert!(start.elapsed() <= timeout, "Timeout while waiting for condition: {message}");
256+
257+
interval.tick().await;
258+
}
259+
}
260+
170261
#[allow(clippy::large_stack_frames)]
171262
#[tokio::test]
172263
async fn can_sequence_and_gossip_transactions() {

0 commit comments

Comments
 (0)