Skip to content

Add libp2p-connection-tracker for simplified peer connection state management #6046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions misc/peer-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
lru = "0.12.3"
libp2p-identity = { workspace = true, optional = true }
tracing = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
Expand Down
91 changes: 91 additions & 0 deletions misc/peer-store/src/connection_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use libp2p_core::{ConnectedPoint, PeerId};
use libp2p_swarm::ConnectionId;
use std::collections::{HashMap, HashSet};

/// Events emitted by the connection tracker behaviour.
#[derive(Debug, Clone)]
pub enum Event {
/// A peer connected (first connection established).
PeerConnected {
peer_id: PeerId,
connection_id: ConnectionId,
endpoint: ConnectedPoint,
},

/// A peer disconnected (last connection closed).
PeerDisconnected {
peer_id: PeerId,
connection_id: ConnectionId,
},
Comment on lines +9 to +19
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need these events. We already have the swarm-events for established and closed connections, that include the info how many other connections to that peer exist.

}

/// Simple storage for connected peers.
#[derive(Debug, Default)]
pub struct ConnectionStore {
/// Currently connected peers with their connection IDs
connected: HashMap<PeerId, HashSet<ConnectionId>>,
}
Comment on lines +22 to +27
Copy link
Member

@elenaf9 elenaf9 Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be a separate module, or even own structure, given that all it does is just adding and removing peers from a hashmap.
Why not just embed the logic directly in Behavior?


impl ConnectionStore {
/// Create a new connection store.
pub fn new() -> Self {
Self {
connected: HashMap::new(),
}
}

/// Add a new connection for a peer.
/// Returns `true` if this is the first connection to the peer.
pub fn connection_established(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
) -> bool {
let connections = self.connected.entry(*peer_id).or_default();
let is_first_connection = connections.is_empty();
connections.insert(*connection_id);
is_first_connection
}

/// Remove a connection for a peer.
/// Returns `true` if this was the last connection to the peer.
pub fn connection_closed(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
remaining_established: &usize,
) -> bool {
if let Some(connections) = self.connected.get_mut(peer_id) {
connections.remove(connection_id);

if *remaining_established == 0 {
self.connected.remove(peer_id);
return true;
}
}
false
}

/// Check if a peer is currently connected.
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
self.connected.contains_key(peer_id)
}

/// Get all connected peer IDs.
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.connected.keys()
}

/// Get the number of connected peers.
pub fn connected_count(&self) -> usize {
self.connected.len()
}

/// Get the number of connections to a specific peer.
pub fn connection_count(&self, peer_id: &PeerId) -> usize {
self.connected
.get(peer_id)
.map(|connections| connections.len())
.unwrap_or(0)
}
}
1 change: 1 addition & 0 deletions misc/peer-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! or provide information to PeerStore.

mod behaviour;
pub mod connection_store;
pub mod memory_store;
mod store;

Expand Down
85 changes: 74 additions & 11 deletions misc/peer-store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@
//! let behaviour = Behaviour::new(store);
//! ```

use super::Store;
use crate::connection_store::ConnectionStore;
use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::{behaviour::ConnectionEstablished, ConnectionClosed, DialError, FromSwarm};
use lru::LruCache;
use std::{
collections::{HashMap, VecDeque},
num::NonZeroUsize,
task::{Poll, Waker},
};

use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::{behaviour::ConnectionEstablished, DialError, FromSwarm};
use lru::LruCache;

use super::Store;
use tracing::{debug, trace};

/// Event emitted from the [`MemoryStore`] to the [`Swarm`](libp2p_swarm::Swarm).
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -59,6 +59,8 @@ pub struct MemoryStore<T = ()> {
config: Config,
/// Waker for store events.
waker: Option<Waker>,
/// Connection store to track connected peers.
connection_store: ConnectionStore,
}

impl<T> MemoryStore<T> {
Expand All @@ -69,6 +71,7 @@ impl<T> MemoryStore<T> {
records: HashMap::new(),
pending_events: VecDeque::default(),
waker: None,
connection_store: ConnectionStore::new(),
}
}

Expand Down Expand Up @@ -187,6 +190,26 @@ impl<T> MemoryStore<T> {
waker.wake(); // wake up because of update
}
}

/// Check if a peer is currently connected.
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
self.connection_store.is_connected(peer_id)
}

/// Get all currently connected peer IDs.
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.connection_store.connected_peers()
}

/// Get the number of currently connected peers.
pub fn connected_count(&self) -> usize {
self.connection_store.connected_count()
}

/// Get the number of connections to a specific peer.
pub fn connection_count(&self, peer_id: &PeerId) -> usize {
self.connection_store.connection_count(peer_id)
}
}

impl<T> Store for MemoryStore<T> {
Expand All @@ -199,16 +222,56 @@ impl<T> Store for MemoryStore<T> {
}
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
failed_addresses,
endpoint,
..
}) if endpoint.is_dialer() => {
if self.config.remove_addr_on_dial_error {
for failed_addr in *failed_addresses {
self.remove_address_inner(peer_id, failed_addr, false);
}) => {
if endpoint.is_dialer() {
if self.config.remove_addr_on_dial_error {
for failed_addr in *failed_addresses {
self.remove_address_inner(peer_id, failed_addr, false);
}
}
self.add_address_inner(peer_id, endpoint.get_remote_address(), false);
}

trace!(%peer_id, ?connection_id, "Connection established");

let is_first_connection = self
.connection_store
.connection_established(peer_id, connection_id);
Comment on lines +241 to +243
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just check here if ConnectionEstablished { other_established, .. } is 0.


if is_first_connection {
debug!(?peer_id, "Peer connected");
// self.pending_events.push_back(connection_store::Event::PeerConnected {
// peer_id: *peer_id,
// connection_id: *connection_id,
// endpoint: endpoint.clone(),
// });
}
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
remaining_established,
..
}) => {
trace!(%peer_id, ?connection_id, remaining_established, "Connection closed");

let is_last_connection = self.connection_store.connection_closed(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, just check if remaining_established == 0.

peer_id,
connection_id,
remaining_established,
);

if is_last_connection {
debug!(%peer_id, "Peer disconnected");
// self.pending_events.push_back(connection_store::Event::PeerDisconnected {
// peer_id: *peer_id,
// connection_id: *connection_id,
// });
}
self.add_address_inner(peer_id, endpoint.get_remote_address(), false);
}
FromSwarm::DialFailure(info) => {
if !self.config.remove_addr_on_dial_error {
Expand Down
Loading