Peer Scoring + Gossipsub Issues in a 22-Node Prod Network (Message Not Propagating) #6098
-
Hey folks 👋 I'm running a network of 22 libp2p nodes in production, and I’m running into a few issues I can’t quite debug. 🧠 Context
🐛 Problems I'm Facing1. Constant Incoming Connection FailuresI'm consistently seeing this on both bootstrap and non-bootstrap nodes:
It seems to originate from libp2p-quic, but no concrete reason or debug context is provided in the logs. Any idea what causes this? 2. Gossipsub Messages Not Reaching All NodesEven though a message is successfully gossiped and received by the bootstrap node, it doesn't seem to reach some nodes, despite them being well-connected (18+ peers). Here’s what’s strange:
Is it possible that the gossipsub implementation filters out low-score peers during gossip propagation? If yes, where can I find a reliable doc on this behavior? Peer scores output:
I also see this log on my node:
3. Is One Bootstrap Node Enough?One of my teammates suggested that this could be a scale issue — that things worked fine in our 6-node dev cluster, but in production with 22 nodes, we need more bootstrap nodes. I'm not convinced though:
Thoughts? 🙏 What I’m Looking For
Thanks in advance — I’m pretty new to libp2p and any help is super appreciated. Happy to provide any more logs/configs if helpful. my code: // network.rs
use crate::behaviour::MyBehaviour;
use libp2p::gossipsub::IdentTopic;
use libp2p::identity::Keypair;
use libp2p::multiaddr::Protocol;
use libp2p::{noise, tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder};
use messages::NETWORK_TOPIC;
use std::error::Error;
use std::time::Duration;
use tracing::info;
/// The `init_swarm` function in Rust initializes a libp2p swarm with specified configurations.
///
/// Arguments:
///
/// * `keypair`: The `keypair` parameter is an optional `Keypair` that can be provided to the
/// `init_swarm` function. If a `Keypair` is provided, it will be used for identity within the libp2p
/// swarm. If no `Keypair` is provided
/// * `bootstrap_addresses`: The `bootstrap_addresses` parameter is an optional vector of tuples
/// containing the PeerId and Multiaddr of bootstrap nodes. These nodes are used to initially connect to
/// the network and discover other peers. If provided, the swarm will attempt to connect to these
/// bootstrap nodes during initialization.
/// * `port`: The `port` parameter in the `init_swarm` function is a String that represents the port
/// number on which the libp2p swarm will listen for incoming connections. It specifies the network port
/// that the swarm will use for communication with other peers on the network.
///
/// Returns:
///
/// The function `init_swarm` returns a `Result` containing a `Swarm<MyBehaviour>` or a `Box<dyn
/// Error>`.
#[tracing::instrument(skip(keypair))]
pub async fn init_swarm(keypair: Option<Keypair>, bootstrap_addresses: Option<Vec<(PeerId, Multiaddr)>>, multi_addr: Multiaddr) -> Result<Swarm<MyBehaviour>, Box<dyn Error>> {
// If keypair is provided, use it for identity
let builder = if let Some(keypair) = keypair.clone() {
SwarmBuilder::with_existing_identity(keypair)
} else {
SwarmBuilder::with_new_identity()
};
let mut config = libp2p::quic::Config::new(&keypair.unwrap().clone());
config.max_idle_timeout = 300;
config.keep_alive_interval = Duration::from_millis(100);
// Create a libp2p swarm with configuration
let mut swarm = builder
.with_tokio()
.with_tcp(tcp::Config::default(), noise::Config::new, yamux::Config::default)?
.with_quic_config(|_| config)
//.with_quic()
.with_relay_client(noise::Config::new, yamux::Config::default)?
.with_behaviour(|keypair, relay_behaviour| {
if bootstrap_addresses.is_none() {
info!("Bootstrap Peer ID :{}", keypair.public().to_peer_id());
}
MyBehaviour::new(keypair.clone(), relay_behaviour).unwrap()
})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();
let port = multi_addr.iter().find_map(|p| if let Protocol::Udp(p) = p { Some(p) } else { None }).expect("UDP port not found in multiaddr");
let listen_address = format!("/ip4/0.0.0.0/udp/{}/quic-v1", port);
info!("Listen Address :{:?}", listen_address);
// Add bootstrap nodes if provided
if let Some(ref bootstrap_addresses) = bootstrap_addresses {
for (peer_id, multi_addr) in bootstrap_addresses {
swarm.behaviour_mut().kademlia.add_address(peer_id, multi_addr.clone());
swarm.dial(multi_addr.clone())?;
}
swarm.behaviour_mut().kademlia.bootstrap()?;
}
// Subscribe to the topic (can be done after swarm creation)
swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(NETWORK_TOPIC))?;
swarm.listen_on(listen_address.parse()?)?;
info!("External Address: {:?}", multi_addr);
swarm.add_external_address(multi_addr);
Ok(swarm)
}
#[tracing::instrument(skip(swarm))]
pub async fn subscribe(swarm: &mut Swarm<MyBehaviour>, topic: IdentTopic) -> Result<(), Box<dyn Error>> {
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
Ok(())
}
#[tracing::instrument(skip(swarm))]
pub async fn unsubscribe(swarm: &mut Swarm<MyBehaviour>, topic: IdentTopic) -> Result<(), Box<dyn Error>> {
swarm.behaviour_mut().gossipsub.unsubscribe(&topic)?;
Ok(())
} behavior: use libp2p::gossipsub::{IdentTopic, PeerScoreParams, PeerScoreThresholds, TopicScoreParams};
use libp2p::identify;
use libp2p::identity::Keypair;
use libp2p::kad::store::MemoryStore;
use libp2p::kad::Mode;
use libp2p::request_response::Config;
use libp2p::swarm::StreamProtocol;
use libp2p::{gossipsub, kad, swarm::NetworkBehaviour};
use libp2p::{
relay,
request_response::{cbor, ProtocolSupport},
};
use messages::message::Message;
use messages::types::NodeResponse;
use messages::{GOSSIP_SUB_PROTOCOL_NAME, IDENTIFY_PROTOCOL_NAME, KAD_PROTOCOL_NAME, NETWORK_TOPIC, REQUEST_RESPONSE_PROTOCOL_NAME};
use std::time::Duration;
use tokio::io;
/// Time in seconds for caching duplicate messages in Gossipsub.
const DUPLICATE_CACHE_TIME: u64 = 10;
/// Interval in seconds for Gossipsub heartbeat.
const HEART_BEAT_INTERVAL: u64 = 5;
/// Maximum number of messages that can be sent per RPC.
const MAX_MESSAGES_PER_RPC: usize = 10000;
/// `MyBehaviour` implements custom network behaviors for Gossipsub, Kademlia, Relay, and Request-Response protocols.
///
/// # Fields
/// - `gossipsub`: Manages Gossipsub pub/sub messaging behavior.
/// - `kademlia`: Handles Kademlia DHT behavior for peer-to-peer node discovery.
/// - `relay_client`: Handles client-side relay behavior for relayed communications.
/// - `request_response_behaviour`: Manages request/response protocol using CBOR encoding between nodes.
#[derive(NetworkBehaviour)]
pub struct MyBehaviour {
pub gossipsub: gossipsub::Behaviour,
pub kademlia: kad::Behaviour<MemoryStore>,
relay_client: relay::client::Behaviour,
pub request_response_behaviour: cbor::Behaviour<Message, NodeResponse>,
pub ping: libp2p::ping::Behaviour,
pub identify: identify::Behaviour,
}
impl MyBehaviour {
/// Creates a new instance of `MyBehaviour` with custom configurations for Gossipsub and Kademlia.
///
/// # Arguments
/// - `key`: The node's `Keypair` used for generating peer identity and signing messages.
/// - `relay_behaviour`: Relay behavior for handling relayed connections.
///
/// # Returns
/// A result containing `MyBehaviour` if successful, or an error wrapped in a `Box<dyn std::error::Error>`.
///
/// # Errors
/// This function will return an error if there is an issue with Gossipsub or Kademlia configuration.
pub fn new(key: Keypair, relay_behaviour: relay::client::Behaviour) -> Result<Self, Box<dyn std::error::Error>> {
// Generate the peer ID from the provided Keypair
let peer_id = key.public().to_peer_id();
// Define a custom message ID function for Gossipsub messages using a SHA-256 hash
let message_id_fn = |message: &gossipsub::Message| {
let s = mishti_crypto::hash256(&message.data);
gossipsub::MessageId::from(s)
};
// Set up Gossipsub configuration with heartbeat interval and message validation
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(HEART_BEAT_INTERVAL))
.validation_mode(gossipsub::ValidationMode::Strict)
.duplicate_cache_time(Duration::from_secs(DUPLICATE_CACHE_TIME))
.message_id_fn(message_id_fn)
.max_messages_per_rpc(Some(MAX_MESSAGES_PER_RPC))
.protocol_id_prefix(GOSSIP_SUB_PROTOCOL_NAME)
.build()
.map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?;
// Set up Peer Scoring
let mut peer_score_params = PeerScoreParams::default();
peer_score_params.topics.insert(IdentTopic::new(NETWORK_TOPIC).hash(), TopicScoreParams::default());
let peer_score_thresholds = PeerScoreThresholds::default();
// Create Gossipsub behavior with message authenticity signed using the provided key
let mut gossipsub = gossipsub::Behaviour::new(gossipsub::MessageAuthenticity::Signed(key.clone()), gossipsub_config)?;
gossipsub.with_peer_score(peer_score_params, peer_score_thresholds).expect("Valid score params and thresholds");
// Set up Kademlia configuration with a query timeout of 5 minutes
let mut kad_config = kad::Config::new(StreamProtocol::new(KAD_PROTOCOL_NAME));
kad_config.set_query_timeout(Duration::from_secs(30));
kad_config.set_replication_factor(std::num::NonZero::new(4).unwrap());
kad_config.set_record_filtering(kad::StoreInserts::FilterBoth);
kad_config.disjoint_query_paths(true);
// Create an in-memory Kademlia DHT store
let store = kad::store::MemoryStore::new(peer_id);
let mut kademlia = kad::Behaviour::with_config(peer_id, store, kad_config);
kademlia.set_mode(Some(Mode::Server));
// Initialize and return the behavior with Gossipsub, Kademlia, Relay, and Request-Response protocols
Ok(Self {
gossipsub,
kademlia,
relay_client: relay_behaviour,
request_response_behaviour: cbor::Behaviour::new([(StreamProtocol::new(REQUEST_RESPONSE_PROTOCOL_NAME), ProtocolSupport::Full)], Config::default()),
ping: libp2p::ping::Behaviour::new(libp2p::ping::Config::new().with_interval(Duration::from_secs(300)).with_timeout(Duration::from_secs(20))),
identify: identify::Behaviour::new(identify::Config::new(IDENTIFY_PROTOCOL_NAME.to_string(), key.public())),
})
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 6 replies
-
From the top of my head, I think the closing connections are likely to be the base for the other errors as well. Gossipsub penalizes peers that don't reliably send messages, and if the connections frequently close messages are more likely to fail and thus peers receive a bad score. Pointing out again #6089 (comment): you have a very low idle quic connection timeout. With many open connections you might not be able to send out packets simultaneously on all connections fast enough to keep the connection alive. Does the error persist if you increase the quic timeouts / reset them to default ? |
Beta Was this translation helpful? Give feedback.
Is there a reason why you changed so many config options in gossipsub and kademlia? E.g., you changed the gossipsub heartbeat interval from 1s to 5s.
As a first step, I would recommend you to just use the default config in all protocols. They are set to reasonable values that should satisfy most use-cases. Changing them without deeper insights in the protocol can lead to unexpected behavior, so I would only do that if it's really needed.