diff --git a/anchor/network/Cargo.toml b/anchor/network/Cargo.toml index 3aa666c8e..03507d963 100644 --- a/anchor/network/Cargo.toml +++ b/anchor/network/Cargo.toml @@ -44,5 +44,5 @@ version = { workspace = true } [dev-dependencies] libp2p-swarm-test = "0.5.0" message_receiver = { workspace = true } -tokio = { workspace = true, features = ["rt", "macros", "time"] } +tokio = { workspace = true, features = ["rt", "macros", "time", "test-util"] } tracing-subscriber = { workspace = true } diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 692f37763..fa168a6fb 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -129,7 +129,12 @@ impl AnchorBehaviour { discovery }; - let peer_manager = PeerManager::new(network_config); + let peer_manager = { + let slots_per_epoch = E::slots_per_epoch(); + let slot_duration = Duration::from_secs(spec.seconds_per_slot); + let one_epoch_duration = slot_duration * slots_per_epoch as u32; + PeerManager::new(network_config, one_epoch_duration) + }; let handshake = handshake::create_behaviour(local_keypair); diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index f7c07abfb..dd047ee94 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -201,8 +201,13 @@ impl Network { self.handle_handshake_result(result); } } - AnchorBehaviourEvent::PeerManager(peer_manager::Event::ConnectActions(actions)) => { - self.handle_connect_actions(actions); + AnchorBehaviourEvent::PeerManager(peer_manager::Event::Heartbeat(heartbeat)) => { + if let Some(actions) = heartbeat.connect_actions { + self.handle_connect_actions(actions); + } + if heartbeat.check_peer_scores { + self.check_and_block_peers_by_score(); + } } _ => { trace!(event = ?behaviour_event, "Unhandled behaviour event"); @@ -413,6 +418,40 @@ impl Network { } } } + + /// Get the list of currently blocked peers. + pub fn blocked_peers(&self) -> &std::collections::HashSet { + self.swarm.behaviour().peer_manager.blocked_peers() + } + + /// Check gossipsub peer scores and block peers with scores below graylist threshold + pub fn check_and_block_peers_by_score(&mut self) { + use crate::scoring::peer_score_config::GRAYLIST_THRESHOLD; + + let gossipsub = &self.swarm.behaviour().gossipsub; + + // Get all peers with poor scores that should be blocked + let peers_to_block: Vec = self + .swarm + .connected_peers() + .filter_map(|peer_id| { + if let Some(score) = gossipsub.peer_score(peer_id) { + if score < GRAYLIST_THRESHOLD { + Some(*peer_id) + } else { + None + } + } else { + None + } + }) + .collect(); + + // Block the peers (connections will be closed automatically) + for peer_id in peers_to_block { + self.swarm.behaviour_mut().peer_manager.block_peer(peer_id); + } + } } fn build_swarm( diff --git a/anchor/network/src/peer_manager/blocking.rs b/anchor/network/src/peer_manager/blocking.rs new file mode 100644 index 000000000..816f617d2 --- /dev/null +++ b/anchor/network/src/peer_manager/blocking.rs @@ -0,0 +1,401 @@ +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; + +use discv5::libp2p_identity::PeerId; +use libp2p::{ + Multiaddr, allow_block_list, + core::{Endpoint, transport::PortUse}, + swarm::{ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour}, +}; +use tracing::debug; + +use crate::scoring::peer_score_config::RETAIN_SCORE_EPOCH_MULTIPLIER; + +/// Manages peer blocking functionality +pub struct BlockingManager { + /// Block list behaviour for actual connection denial + block_list: allow_block_list::Behaviour, + /// Tracking when peers were blocked for automatic unblocking + blocked_peers_timestamps: HashMap, + /// One epoch duration for calculating retain_score timeout + one_epoch_duration: Duration, +} + +impl BlockingManager { + pub fn new(one_epoch_duration: Duration) -> Self { + Self { + block_list: allow_block_list::Behaviour::::default(), + blocked_peers_timestamps: HashMap::new(), + one_epoch_duration, + } + } + + /// Block a peer and track timestamp for automatic unblocking + pub fn block_peer(&mut self, peer_id: PeerId) -> bool { + if self.block_list.block_peer(peer_id) { + self.blocked_peers_timestamps + .insert(peer_id, tokio::time::Instant::now()); + debug!(?peer_id, "Blocked peer"); + true + } else { + false + } + } + + /// Unblock a peer and remove from tracking + pub fn unblock_peer(&mut self, peer_id: PeerId) -> bool { + let was_removed = self.block_list.unblock_peer(peer_id); + if was_removed { + self.blocked_peers_timestamps.remove(&peer_id); + debug!(?peer_id, "Unblocked peer after retain_score duration"); + } + was_removed + } + + /// Get list of currently blocked peers + pub fn blocked_peers(&self) -> &HashSet { + self.block_list.blocked_peers() + } + + /// Check and unblock peers that have been blocked long enough + pub fn check_and_unblock_expired_peers(&mut self) { + let retain_score_duration = self.one_epoch_duration * RETAIN_SCORE_EPOCH_MULTIPLIER; + let now = tokio::time::Instant::now(); + + let peers_to_unblock: Vec = self + .blocked_peers_timestamps + .iter() + .filter_map(|(&peer_id, &blocked_at)| { + if now.duration_since(blocked_at) >= retain_score_duration { + Some(peer_id) + } else { + None + } + }) + .collect(); + + for peer_id in peers_to_unblock { + self.unblock_peer(peer_id); + } + } + + pub fn blocked_peers_count(&self) -> usize { + self.blocked_peers_timestamps.len() + } + + // Delegation methods for connection handling + pub fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + self.block_list + .handle_pending_inbound_connection(connection_id, local_addr, remote_addr) + } + + pub fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + self.block_list + .handle_established_inbound_connection(connection_id, peer, local_addr, remote_addr) + .map(|_| ()) // Discard the handler, we just want to know if connection is allowed + } + + pub fn handle_pending_outbound_connection( + &mut self, + connection_id: ConnectionId, + maybe_peer: Option, + addresses: &[Multiaddr], + effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + self.block_list.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + ) + } + + pub fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + port_use: PortUse, + ) -> Result<(), ConnectionDenied> { + self.block_list + .handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) + .map(|_| ()) // Discard the handler, we just want to know if connection is allowed + } + + pub fn on_swarm_event(&mut self, event: FromSwarm) { + self.block_list.on_swarm_event(event); + } + + pub fn poll( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> + { + // Forward CloseConnection events from allow_block_list to close connections for blocked + // peers + self.block_list.poll(cx) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use libp2p::identity::Keypair; + + use super::*; + + /// Test helper to create a test BlockingManager + fn create_test_blocking_manager() -> BlockingManager { + let one_epoch_duration = Duration::from_secs(384); // 32 slots * 12 seconds + BlockingManager::new(one_epoch_duration) + } + + /// Test helper to create a test peer ID + fn create_test_peer_id() -> PeerId { + let keypair = Keypair::generate_ed25519(); + keypair.public().to_peer_id() + } + + #[tokio::test(start_paused = true)] + async fn test_peer_blocking() { + let mut blocking_manager = create_test_blocking_manager(); + let peer_id = create_test_peer_id(); + + // Initially, peer should not be blocked + assert!(!blocking_manager.blocked_peers().contains(&peer_id)); + assert!(blocking_manager.blocked_peers_timestamps.is_empty()); + + // Block the peer (always tracks timestamp now) + let was_blocked = blocking_manager.block_peer(peer_id); + assert!(was_blocked); + + // Verify peer is now blocked + assert!(blocking_manager.blocked_peers().contains(&peer_id)); + assert!( + blocking_manager + .blocked_peers_timestamps + .contains_key(&peer_id) + ); + + // Verify the block time was recorded (should be at the current paused time) + let block_time = blocking_manager + .blocked_peers_timestamps + .get(&peer_id) + .unwrap(); + let expected_time = tokio::time::Instant::now(); + assert_eq!(*block_time, expected_time); + } + + #[tokio::test(start_paused = true)] + async fn test_peer_unblocking_after_timeout() { + let mut blocking_manager = create_test_blocking_manager(); + let peer_id = create_test_peer_id(); + + // Block the peer + blocking_manager.block_peer(peer_id); + assert!(blocking_manager.blocked_peers().contains(&peer_id)); + + // Advance time beyond the retain_score period + let retain_score_duration = + blocking_manager.one_epoch_duration * RETAIN_SCORE_EPOCH_MULTIPLIER; + tokio::time::advance(retain_score_duration + Duration::from_secs(1)).await; + + // Check and unblock expired peers + blocking_manager.check_and_unblock_expired_peers(); + + // Verify peer is now unblocked + assert!(!blocking_manager.blocked_peers().contains(&peer_id)); + assert!( + !blocking_manager + .blocked_peers_timestamps + .contains_key(&peer_id) + ); + } + + #[tokio::test(start_paused = true)] + async fn test_peer_not_unblocked_before_timeout() { + let mut blocking_manager = create_test_blocking_manager(); + let peer_id = create_test_peer_id(); + + // Block the peer + blocking_manager.block_peer(peer_id); + assert!(blocking_manager.blocked_peers().contains(&peer_id)); + + // Advance time but not enough to trigger unblocking + let retain_score_duration = + blocking_manager.one_epoch_duration * RETAIN_SCORE_EPOCH_MULTIPLIER; + tokio::time::advance(retain_score_duration - Duration::from_secs(10)).await; + + // Check and unblock expired peers + blocking_manager.check_and_unblock_expired_peers(); + + // Verify peer is still blocked + assert!(blocking_manager.blocked_peers().contains(&peer_id)); + assert!( + blocking_manager + .blocked_peers_timestamps + .contains_key(&peer_id) + ); + } + + #[tokio::test(start_paused = true)] + async fn test_multiple_peers_blocking_and_unblocking() { + let mut blocking_manager = create_test_blocking_manager(); + let peer_id_1 = create_test_peer_id(); + let peer_id_2 = create_test_peer_id(); + let peer_id_3 = create_test_peer_id(); + + // Block peer_1 first + blocking_manager.block_peer(peer_id_1); + + // Advance time a bit + tokio::time::advance(Duration::from_secs(100)).await; + + // Block peer_2 and peer_3 + blocking_manager.block_peer(peer_id_2); + blocking_manager.block_peer(peer_id_3); + + // Verify all are blocked + assert_eq!(blocking_manager.blocked_peers().len(), 3); + assert_eq!(blocking_manager.blocked_peers_timestamps.len(), 3); + + // Advance time enough to unblock only peer_1 (it was blocked earlier) + let retain_score_duration = + blocking_manager.one_epoch_duration * RETAIN_SCORE_EPOCH_MULTIPLIER; + tokio::time::advance(retain_score_duration - Duration::from_secs(50)).await; + + // Check and unblock expired peers + blocking_manager.check_and_unblock_expired_peers(); + + // Only peer_1 should be unblocked + assert!(!blocking_manager.blocked_peers().contains(&peer_id_1)); + assert!(blocking_manager.blocked_peers().contains(&peer_id_2)); + assert!(blocking_manager.blocked_peers().contains(&peer_id_3)); + assert_eq!(blocking_manager.blocked_peers().len(), 2); + assert_eq!(blocking_manager.blocked_peers_timestamps.len(), 2); + } + + #[tokio::test(start_paused = true)] + async fn test_manual_unblock_peer() { + let mut blocking_manager = create_test_blocking_manager(); + let peer_id = create_test_peer_id(); + + // Block the peer + blocking_manager.block_peer(peer_id); + assert!(blocking_manager.blocked_peers().contains(&peer_id)); + + // Manually unblock the peer + let was_unblocked = blocking_manager.unblock_peer(peer_id); + assert!(was_unblocked); + + // Verify peer is now unblocked + assert!(!blocking_manager.blocked_peers().contains(&peer_id)); + assert!( + !blocking_manager + .blocked_peers_timestamps + .contains_key(&peer_id) + ); + + // Trying to unblock again should return false + let was_unblocked_again = blocking_manager.unblock_peer(peer_id); + assert!(!was_unblocked_again); + } + + #[tokio::test] + async fn test_block_and_unblock() { + let mut blocking_manager = create_test_blocking_manager(); + let peer_id = create_test_peer_id(); + + // Test block_peer method (now always tracks timestamps) + let was_blocked = blocking_manager.block_peer(peer_id); + assert!(was_blocked); + assert!(blocking_manager.blocked_peers().contains(&peer_id)); + + // Now block_peer always adds to timestamps tracking + assert!( + blocking_manager + .blocked_peers_timestamps + .contains_key(&peer_id) + ); + + // Test unblock + let was_unblocked = blocking_manager.unblock_peer(peer_id); + assert!(was_unblocked); + assert!(!blocking_manager.blocked_peers().contains(&peer_id)); + } + + #[tokio::test] + async fn test_blocked_peers_count() { + let mut blocking_manager = create_test_blocking_manager(); + let peer_id_1 = create_test_peer_id(); + let peer_id_2 = create_test_peer_id(); + + // Initially no blocked peers + assert_eq!(blocking_manager.blocked_peers_count(), 0); + + // Block one peer (now always adds to timestamp tracking) + blocking_manager.block_peer(peer_id_1); + assert_eq!(blocking_manager.blocked_peers_count(), 1); + + // Block another peer (also adds to timestamp tracking) + blocking_manager.block_peer(peer_id_2); + assert_eq!(blocking_manager.blocked_peers_count(), 2); + + // Unblock one peer + blocking_manager.unblock_peer(peer_id_1); + assert_eq!(blocking_manager.blocked_peers_count(), 1); + } + + #[tokio::test(start_paused = true)] + async fn test_no_double_blocking() { + let mut blocking_manager = create_test_blocking_manager(); + let peer_id = create_test_peer_id(); + + // Block the peer for the first time + blocking_manager.block_peer(peer_id); + assert!(blocking_manager.blocked_peers().contains(&peer_id)); + assert_eq!(blocking_manager.blocked_peers_timestamps.len(), 1); + + let first_block_time = *blocking_manager + .blocked_peers_timestamps + .get(&peer_id) + .unwrap(); + + // Advance time slightly + tokio::time::advance(Duration::from_secs(10)).await; + + // Try to block the same peer again + blocking_manager.block_peer(peer_id); + + // Should still be blocked but time shouldn't change (no double blocking) + assert!(blocking_manager.blocked_peers().contains(&peer_id)); + assert_eq!(blocking_manager.blocked_peers_timestamps.len(), 1); + + let second_block_time = *blocking_manager + .blocked_peers_timestamps + .get(&peer_id) + .unwrap(); + assert_eq!(first_block_time, second_block_time); // Time should not have changed + } +} diff --git a/anchor/network/src/peer_manager/connection.rs b/anchor/network/src/peer_manager/connection.rs index 50ec895f0..179371669 100644 --- a/anchor/network/src/peer_manager/connection.rs +++ b/anchor/network/src/peer_manager/connection.rs @@ -77,7 +77,13 @@ impl ConnectionManager { peer_id: &PeerId, peer_store: &MemoryStore, needed_subnets: &HashSet, + blocked_peers: &HashSet, ) -> bool { + // Don't dial blocked peers + if blocked_peers.contains(peer_id) { + return false; + } + self.connected.len() < self.target_peers || self.qualifies_for_priority(peer_id, peer_store, needed_subnets) } diff --git a/anchor/network/src/peer_manager/discovery.rs b/anchor/network/src/peer_manager/discovery.rs index 3c521a233..5b7326883 100644 --- a/anchor/network/src/peer_manager/discovery.rs +++ b/anchor/network/src/peer_manager/discovery.rs @@ -33,6 +33,7 @@ impl PeerDiscovery { peer_store: &mut MemoryStore, connection_manager: &ConnectionManager, needed_subnets: &HashSet, + blocked_peers: &HashSet, ) -> Option { let id = enr.peer_id(); @@ -43,7 +44,8 @@ impl PeerDiscovery { peer_store.insert_custom_data(&id, enr.clone()); // Check if we should dial this peer - let should_dial = connection_manager.should_dial_peer(&id, peer_store, needed_subnets); + let should_dial = + connection_manager.should_dial_peer(&id, peer_store, needed_subnets, blocked_peers); if should_dial { Some(Self::peer_to_dial_opts(&id, peer_store)) @@ -58,10 +60,16 @@ impl PeerDiscovery { needed_subnets: &mut HashSet, peer_store: &MemoryStore, connection_manager: &ConnectionManager, + blocked_peers: &HashSet, ) -> ConnectActions { needed_subnets.insert(subnet_id); - Self::determine_actions_for_subnets(&[subnet_id], peer_store, connection_manager) + Self::determine_actions_for_subnets( + &[subnet_id], + peer_store, + connection_manager, + blocked_peers, + ) } /// Determine what actions to take for the given subnets @@ -69,6 +77,7 @@ impl PeerDiscovery { subnets: &[SubnetId], peer_store: &MemoryStore, connection_manager: &ConnectionManager, + blocked_peers: &HashSet, ) -> ConnectActions { let mut actions = ConnectActions::none(); let peer_counts = connection_manager.count_peers_for_subnets(subnets, peer_store); @@ -82,6 +91,11 @@ impl PeerDiscovery { .collect::>(); for (peer, record) in Self::candidate_peers(peer_store, &connection_manager.connected) { + // Skip blocked peers + if blocked_peers.contains(peer) { + continue; + } + let Some(enr) = record.get_custom_data() else { continue; }; @@ -120,11 +134,13 @@ impl PeerDiscovery { needed_subnets: &HashSet, peer_store: &MemoryStore, connection_manager: &ConnectionManager, + blocked_peers: &HashSet, ) -> Option { let actions = Self::determine_actions_for_subnets( &needed_subnets.iter().copied().collect::>(), peer_store, connection_manager, + blocked_peers, ); if !actions.is_empty() { diff --git a/anchor/network/src/peer_manager/heartbeat.rs b/anchor/network/src/peer_manager/heartbeat.rs index 584825777..404487925 100644 --- a/anchor/network/src/peer_manager/heartbeat.rs +++ b/anchor/network/src/peer_manager/heartbeat.rs @@ -1,16 +1,19 @@ -use std::{collections::HashSet, time::Duration}; +use std::time::Duration; -use peer_store::memory_store::MemoryStore; -use subnet_service::SubnetId; use tokio::time::{MissedTickBehavior, interval}; -use tracing::info; -use super::{connection::ConnectionManager, discovery::PeerDiscovery, types::ConnectActions}; -use crate::Enr; +use super::types::ConnectActions; /// Interval between heartbeat events in seconds const HEARTBEAT_INTERVAL: u64 = 30; +/// Heartbeat event containing both connection actions and peer score check signal +#[derive(Debug)] +pub struct Event { + pub connect_actions: Option, + pub check_peer_scores: bool, +} + /// Manages periodic heartbeat events and status reporting pub struct HeartbeatManager { heartbeat: tokio::time::Interval, @@ -31,19 +34,4 @@ impl HeartbeatManager { ) -> std::task::Poll { self.heartbeat.poll_tick(cx) } - - /// Log network status and check for needed peer actions - pub fn heartbeat( - needed_subnets: &HashSet, - peer_store: &MemoryStore, - connection_manager: &ConnectionManager, - ) -> Option { - info!( - subnets = needed_subnets.len(), - peers = connection_manager.connected.len(), - "Network status" - ); - - PeerDiscovery::check_subnet_peers(needed_subnets, peer_store, connection_manager) - } } diff --git a/anchor/network/src/peer_manager/mod.rs b/anchor/network/src/peer_manager/mod.rs index dbde1a987..fc1894e80 100644 --- a/anchor/network/src/peer_manager/mod.rs +++ b/anchor/network/src/peer_manager/mod.rs @@ -1,6 +1,7 @@ use std::{ collections::HashSet, task::{Context, Poll}, + time::Duration, }; use discv5::libp2p_identity::PeerId; @@ -15,14 +16,17 @@ use libp2p::{ }; use peer_store::memory_store::{self, MemoryStore}; use subnet_service::SubnetId; +use tracing::info; use crate::{Config, Enr}; +pub mod blocking; pub mod connection; pub mod discovery; pub mod heartbeat; pub mod types; +use blocking::BlockingManager; use connection::ConnectionManager; use discovery::PeerDiscovery; use heartbeat::HeartbeatManager; @@ -33,20 +37,23 @@ pub struct PeerManager { peer_store: peer_store::Behaviour>, connection_manager: ConnectionManager, heartbeat_manager: HeartbeatManager, + blocking_manager: BlockingManager, needed_subnets: HashSet, } impl PeerManager { - pub fn new(config: &Config) -> Self { + pub fn new(config: &Config, one_epoch_duration: Duration) -> Self { let peer_store = peer_store::Behaviour::new(MemoryStore::new(memory_store::Config::default())); let connection_manager = ConnectionManager::new(config); let heartbeat_manager = HeartbeatManager::new(); + let blocking_manager = BlockingManager::new(one_epoch_duration); Self { peer_store, connection_manager, heartbeat_manager, + blocking_manager, needed_subnets: HashSet::new(), } } @@ -58,6 +65,7 @@ impl PeerManager { self.peer_store.store_mut(), &self.connection_manager, &self.needed_subnets, + self.blocking_manager.blocked_peers(), ) } @@ -68,17 +76,45 @@ impl PeerManager { &mut self.needed_subnets, self.peer_store.store(), &self.connection_manager, + self.blocking_manager.blocked_peers(), ) } /// Perform heartbeat and return actions if needed - pub fn heartbeat(&self) -> Option { - HeartbeatManager::heartbeat( + pub fn heartbeat(&mut self) -> Option { + info!( + subnets = self.needed_subnets.len(), + peers = self.connection_manager.connected.len(), + blocked_peers = self.blocking_manager.blocked_peers_count(), + "Network status" + ); + + // Check and unblock peers that have been blocked long enough + self.blocking_manager.check_and_unblock_expired_peers(); + + // Check if any subnets need more peers and return dial/discovery actions + PeerDiscovery::check_subnet_peers( &self.needed_subnets, self.peer_store.store(), &self.connection_manager, + self.blocking_manager.blocked_peers(), ) } + + /// Block a peer and track timestamp for automatic unblocking + pub fn block_peer(&mut self, peer_id: PeerId) -> bool { + self.blocking_manager.block_peer(peer_id) + } + + /// Unblock a peer, allowing it to connect again + pub fn unblock_peer(&mut self, peer_id: PeerId) -> bool { + self.blocking_manager.unblock_peer(peer_id) + } + + /// Get the list of currently blocked peers + pub fn blocked_peers(&self) -> &std::collections::HashSet { + self.blocking_manager.blocked_peers() + } } impl NetworkBehaviour for PeerManager { @@ -91,6 +127,13 @@ impl NetworkBehaviour for PeerManager { local_addr: &Multiaddr, remote_addr: &Multiaddr, ) -> Result<(), ConnectionDenied> { + // Check block list first - delegate to blocking manager + self.blocking_manager.handle_pending_inbound_connection( + connection_id, + local_addr, + remote_addr, + )?; + // Handle peer store first to remember the peer self.peer_store.handle_pending_inbound_connection( connection_id, @@ -113,6 +156,10 @@ impl NetworkBehaviour for PeerManager { local_addr: &Multiaddr, remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { + // Check block list first - delegate to blocking manager + self.blocking_manager + .handle_established_inbound_connection(connection_id, peer, local_addr, remote_addr)?; + // Handle peer store first self.peer_store.handle_established_inbound_connection( connection_id, @@ -142,6 +189,14 @@ impl NetworkBehaviour for PeerManager { addresses: &[Multiaddr], effective_role: Endpoint, ) -> Result, ConnectionDenied> { + // Check block list first - delegate to blocking manager + self.blocking_manager.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + )?; + // Handle connection limits first self.connection_manager.handle_pending_outbound_connection( connection_id, @@ -167,6 +222,16 @@ impl NetworkBehaviour for PeerManager { role_override: Endpoint, port_use: PortUse, ) -> Result, ConnectionDenied> { + // Check block list first - delegate to blocking manager + self.blocking_manager + .handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + )?; + // Handle peer store first self.peer_store.handle_established_outbound_connection( connection_id, @@ -208,6 +273,7 @@ impl NetworkBehaviour for PeerManager { .update_metrics_if_changed(changed_connected); // Delegate to sub-components + self.blocking_manager.on_swarm_event(event); self.connection_manager.on_swarm_event(event); self.peer_store.on_swarm_event(event); } @@ -225,6 +291,14 @@ impl NetworkBehaviour for PeerManager { &mut self, cx: &mut Context<'_>, ) -> Poll>> { + // Check blocking manager events first and forward them + if let Poll::Ready(e) = self.blocking_manager.poll(cx) { + return Poll::Ready( + e.map_out(|never| match never {}) + .map_in(|never| match never {}), + ); + } + // Check connection limits if let Poll::Ready(e) = self.connection_manager.connection_limits.poll(cx) { return Poll::Ready(e.map_out(|never| match never {})); @@ -237,9 +311,11 @@ impl NetworkBehaviour for PeerManager { // Check heartbeat timer if self.heartbeat_manager.poll_tick(cx).is_ready() { - if let Some(actions) = self.heartbeat() { - return Poll::Ready(ToSwarm::GenerateEvent(Event::ConnectActions(actions))); - } + let connect_actions = self.heartbeat(); + return Poll::Ready(ToSwarm::GenerateEvent(Event::Heartbeat(heartbeat::Event { + connect_actions, + check_peer_scores: true, + }))); } Poll::Pending diff --git a/anchor/network/src/peer_manager/types.rs b/anchor/network/src/peer_manager/types.rs index 114af0675..e879024b1 100644 --- a/anchor/network/src/peer_manager/types.rs +++ b/anchor/network/src/peer_manager/types.rs @@ -26,5 +26,5 @@ impl ConnectActions { #[derive(Debug)] pub enum Event { PeerStore(peer_store::Event), - ConnectActions(ConnectActions), + Heartbeat(crate::peer_manager::heartbeat::Event), }