diff --git a/Cargo.lock b/Cargo.lock index 22c82b5f793..bfa3d4c2bc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3088,6 +3088,7 @@ dependencies = [ "lru", "serde_json", "tokio", + "tracing", ] [[package]] diff --git a/misc/peer-store/Cargo.toml b/misc/peer-store/Cargo.toml index 5f6bf598cbf..a3936d1a9ae 100644 --- a/misc/peer-store/Cargo.toml +++ b/misc/peer-store/Cargo.toml @@ -13,6 +13,7 @@ libp2p-core = { workspace = true } libp2p-swarm = { workspace = true } lru = "0.12.3" libp2p-identity = { workspace = true, optional = true } +tracing = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/misc/peer-store/src/connection_store.rs b/misc/peer-store/src/connection_store.rs new file mode 100644 index 00000000000..d2699f7f8f4 --- /dev/null +++ b/misc/peer-store/src/connection_store.rs @@ -0,0 +1,91 @@ +use libp2p_core::{ConnectedPoint, PeerId}; +use libp2p_swarm::ConnectionId; +use std::collections::{HashMap, HashSet}; + +/// Events emitted by the connection tracker behaviour. +#[derive(Debug, Clone)] +pub enum Event { + /// A peer connected (first connection established). + PeerConnected { + peer_id: PeerId, + connection_id: ConnectionId, + endpoint: ConnectedPoint, + }, + + /// A peer disconnected (last connection closed). + PeerDisconnected { + peer_id: PeerId, + connection_id: ConnectionId, + }, +} + +/// Simple storage for connected peers. +#[derive(Debug, Default)] +pub struct ConnectionStore { + /// Currently connected peers with their connection IDs + connected: HashMap>, +} + +impl ConnectionStore { + /// Create a new connection store. + pub fn new() -> Self { + Self { + connected: HashMap::new(), + } + } + + /// Add a new connection for a peer. + /// Returns `true` if this is the first connection to the peer. + pub fn connection_established( + &mut self, + peer_id: &PeerId, + connection_id: &ConnectionId, + ) -> bool { + let connections = self.connected.entry(*peer_id).or_default(); + let is_first_connection = connections.is_empty(); + connections.insert(*connection_id); + is_first_connection + } + + /// Remove a connection for a peer. + /// Returns `true` if this was the last connection to the peer. + pub fn connection_closed( + &mut self, + peer_id: &PeerId, + connection_id: &ConnectionId, + remaining_established: &usize, + ) -> bool { + if let Some(connections) = self.connected.get_mut(peer_id) { + connections.remove(connection_id); + + if *remaining_established == 0 { + self.connected.remove(peer_id); + return true; + } + } + false + } + + /// Check if a peer is currently connected. + pub fn is_connected(&self, peer_id: &PeerId) -> bool { + self.connected.contains_key(peer_id) + } + + /// Get all connected peer IDs. + pub fn connected_peers(&self) -> impl Iterator { + self.connected.keys() + } + + /// Get the number of connected peers. + pub fn connected_count(&self) -> usize { + self.connected.len() + } + + /// Get the number of connections to a specific peer. + pub fn connection_count(&self, peer_id: &PeerId) -> usize { + self.connected + .get(peer_id) + .map(|connections| connections.len()) + .unwrap_or(0) + } +} diff --git a/misc/peer-store/src/lib.rs b/misc/peer-store/src/lib.rs index 31f7eb6497c..2b3ae3c2837 100644 --- a/misc/peer-store/src/lib.rs +++ b/misc/peer-store/src/lib.rs @@ -18,6 +18,7 @@ //! or provide information to PeerStore. mod behaviour; +pub mod connection_store; pub mod memory_store; mod store; diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 8fbe31c6292..d2f260fbc50 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -8,17 +8,17 @@ //! let behaviour = Behaviour::new(store); //! ``` +use super::Store; +use crate::connection_store::ConnectionStore; +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::{behaviour::ConnectionEstablished, ConnectionClosed, DialError, FromSwarm}; +use lru::LruCache; use std::{ collections::{HashMap, VecDeque}, num::NonZeroUsize, task::{Poll, Waker}, }; - -use libp2p_core::{Multiaddr, PeerId}; -use libp2p_swarm::{behaviour::ConnectionEstablished, DialError, FromSwarm}; -use lru::LruCache; - -use super::Store; +use tracing::{debug, trace}; /// Event emitted from the [`MemoryStore`] to the [`Swarm`](libp2p_swarm::Swarm). #[derive(Debug, Clone)] @@ -59,6 +59,8 @@ pub struct MemoryStore { config: Config, /// Waker for store events. waker: Option, + /// Connection store to track connected peers. + connection_store: ConnectionStore, } impl MemoryStore { @@ -69,6 +71,7 @@ impl MemoryStore { records: HashMap::new(), pending_events: VecDeque::default(), waker: None, + connection_store: ConnectionStore::new(), } } @@ -187,6 +190,26 @@ impl MemoryStore { waker.wake(); // wake up because of update } } + + /// Check if a peer is currently connected. + pub fn is_connected(&self, peer_id: &PeerId) -> bool { + self.connection_store.is_connected(peer_id) + } + + /// Get all currently connected peer IDs. + pub fn connected_peers(&self) -> impl Iterator { + self.connection_store.connected_peers() + } + + /// Get the number of currently connected peers. + pub fn connected_count(&self) -> usize { + self.connection_store.connected_count() + } + + /// Get the number of connections to a specific peer. + pub fn connection_count(&self, peer_id: &PeerId) -> usize { + self.connection_store.connection_count(peer_id) + } } impl Store for MemoryStore { @@ -199,16 +222,56 @@ impl Store for MemoryStore { } FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, + connection_id, failed_addresses, endpoint, .. - }) if endpoint.is_dialer() => { - if self.config.remove_addr_on_dial_error { - for failed_addr in *failed_addresses { - self.remove_address_inner(peer_id, failed_addr, false); + }) => { + if endpoint.is_dialer() { + if self.config.remove_addr_on_dial_error { + for failed_addr in *failed_addresses { + self.remove_address_inner(peer_id, failed_addr, false); + } } + self.add_address_inner(peer_id, endpoint.get_remote_address(), false); + } + + trace!(%peer_id, ?connection_id, "Connection established"); + + let is_first_connection = self + .connection_store + .connection_established(peer_id, connection_id); + + if is_first_connection { + debug!(?peer_id, "Peer connected"); + // self.pending_events.push_back(connection_store::Event::PeerConnected { + // peer_id: *peer_id, + // connection_id: *connection_id, + // endpoint: endpoint.clone(), + // }); + } + } + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + remaining_established, + .. + }) => { + trace!(%peer_id, ?connection_id, remaining_established, "Connection closed"); + + let is_last_connection = self.connection_store.connection_closed( + peer_id, + connection_id, + remaining_established, + ); + + if is_last_connection { + debug!(%peer_id, "Peer disconnected"); + // self.pending_events.push_back(connection_store::Event::PeerDisconnected { + // peer_id: *peer_id, + // connection_id: *connection_id, + // }); } - self.add_address_inner(peer_id, endpoint.get_remote_address(), false); } FromSwarm::DialFailure(info) => { if !self.config.remove_addr_on_dial_error {