Skip to content

fix(kad): enforce a timeout for inbound substreams #6009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ libp2p-floodsub = { version = "0.47.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.50.0", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.47.0", path = "protocols/identify" }
libp2p-identity = { version = "0.2.12" }
libp2p-kad = { version = "0.49.0", path = "protocols/kad" }
libp2p-kad = { version = "0.49.1", path = "protocols/kad" }
libp2p-mdns = { version = "0.48.0", path = "protocols/mdns" }
libp2p-memory-connection-limits = { version = "0.5.0", path = "misc/memory-connection-limits" }
libp2p-metrics = { version = "0.17.0", path = "misc/metrics" }
Expand Down Expand Up @@ -121,7 +121,8 @@ libp2p-yamux = { version = "0.47.0", path = "muxers/yamux" }
asynchronous-codec = { version = "0.7.0" }
env_logger = "0.11"
futures = "0.3.30"
futures-bounded = { version = "0.2.4" }
# TODO: replace with version = "0.3.1" once released upstream
futures-bounded = { git = "https://github.com/thomaseizinger/rust-futures-bounded", rev = "012803d343b5c604e65d3c238a8cd7a145616447", features = ["futures-timer"] }
futures-rustls = { version = "0.26.0", default-features = false }
getrandom = "0.2"
if-watch = "3.2.1"
Expand Down
4 changes: 2 additions & 2 deletions protocols/autonat/src/v2/client/handler/dial_back.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use futures::channel::oneshot;
use futures_bounded::StreamSet;
use futures_bounded::{Delay, StreamSet};
use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
use libp2p_swarm::{
handler::{ConnectionEvent, FullyNegotiatedInbound, ListenUpgradeError},
Expand All @@ -22,7 +22,7 @@ pub struct Handler {
impl Handler {
pub(crate) fn new() -> Self {
Self {
inbound: StreamSet::new(Duration::from_secs(5), 2),
inbound: StreamSet::new(|| Delay::futures_timer(Duration::from_secs(5)), 2),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/autonat/src/v2/client/handler/dial_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use futures::{channel::oneshot, AsyncWrite};
use futures_bounded::FuturesMap;
use futures_bounded::{Delay, FuturesMap};
use libp2p_core::{
upgrade::{DeniedUpgrade, ReadyUpgrade},
Multiaddr,
Expand Down Expand Up @@ -91,7 +91,7 @@ impl Handler {
pub(crate) fn new() -> Self {
Self {
queued_events: VecDeque::new(),
outbound: FuturesMap::new(Duration::from_secs(10), 10),
outbound: FuturesMap::new(|| Delay::futures_timer(Duration::from_secs(10)), 10),
queued_streams: VecDeque::default(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/autonat/src/v2/server/handler/dial_back.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use futures::{AsyncRead, AsyncWrite};
use futures_bounded::FuturesSet;
use futures_bounded::{Delay, FuturesSet};
use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
use libp2p_swarm::{
handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedOutbound},
Expand All @@ -33,7 +33,7 @@ impl Handler {
Self {
pending_nonce: Some(cmd),
requested_substream_nonce: None,
outbound: FuturesSet::new(Duration::from_secs(10), 5),
outbound: FuturesSet::new(|| Delay::futures_timer(Duration::from_secs(10)), 5),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/autonat/src/v2/server/handler/dial_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::{
channel::{mpsc, oneshot},
AsyncRead, AsyncWrite, SinkExt, StreamExt,
};
use futures_bounded::FuturesSet;
use futures_bounded::{Delay, FuturesSet};
use libp2p_core::{
upgrade::{DeniedUpgrade, ReadyUpgrade},
Multiaddr,
Expand Down Expand Up @@ -64,7 +64,7 @@ where
observed_multiaddr,
dial_back_cmd_sender,
dial_back_cmd_receiver,
inbound: FuturesSet::new(Duration::from_secs(10), 10),
inbound: FuturesSet::new(|| Delay::futures_timer(Duration::from_secs(10)), 10),
rng,
}
}
Expand Down
11 changes: 9 additions & 2 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::{

use either::Either;
use futures::future;
use futures_bounded::Delay;
use libp2p_core::{
multiaddr::Multiaddr,
upgrade::{DeniedUpgrade, ReadyUpgrade},
Expand Down Expand Up @@ -87,8 +88,14 @@ impl Handler {
Self {
endpoint,
queued_events: Default::default(),
inbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
outbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
inbound_stream: futures_bounded::FuturesSet::new(
|| Delay::futures_timer(Duration::from_secs(10)),
1,
),
outbound_stream: futures_bounded::FuturesSet::new(
|| Delay::futures_timer(Duration::from_secs(10)),
1,
),
holepunch_candidates,
attempts: 0,
}
Expand Down
2 changes: 1 addition & 1 deletion protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl Handler {
remote_peer_id,
events: SmallVec::new(),
active_streams: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
|| futures_bounded::Delay::futures_timer(STREAM_TIMEOUT),
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
trigger_next_identify: Delay::new(Duration::ZERO),
Expand Down
5 changes: 5 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.49.1

- Enforce an inbound substream timeout in the kad substream handler.
See [PR 6009](https://github.com/libp2p/rust-libp2p/pull/6009).

## 0.49.0

- Remove no longer constructed GetRecordError::QuorumFailed.
Expand Down
2 changes: 1 addition & 1 deletion protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-kad"
edition.workspace = true
rust-version = { workspace = true }
description = "Kademlia protocol for libp2p"
version = "0.49.0"
version = "0.49.1"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
93 changes: 55 additions & 38 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use std::{
};

use either::Either;
use futures::{channel::oneshot, prelude::*, stream::SelectAll};
use futures::{channel::oneshot, prelude::*};
use futures_bounded::{Delay, StreamSet};
use libp2p_core::{upgrade, ConnectedPoint};
use libp2p_identity::PeerId;
use libp2p_swarm::{
Expand Down Expand Up @@ -77,7 +78,8 @@ pub struct Handler {
pending_messages: VecDeque<(KadRequestMsg, QueryId)>,

/// List of active inbound substreams with the state they are in.
inbound_substreams: SelectAll<InboundSubstreamState>,
/// The streams are typed `InboundSubstreamState`, but the set uses the item type.
inbound_substreams: StreamSet<ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent>>,

/// The connected endpoint of the connection that the handler
/// is associated with.
Expand Down Expand Up @@ -119,8 +121,6 @@ enum InboundSubstreamState {
PendingFlush(UniqueConnecId, KadInStreamSink<Stream>),
/// The substream is being closed.
Closing(KadInStreamSink<Stream>),
/// The substream was cancelled in favor of a new one.
Cancelled,

Poisoned {
phantom: PhantomData<QueryId>,
Expand Down Expand Up @@ -173,9 +173,6 @@ impl InboundSubstreamState {
| InboundSubstreamState::Closing(substream) => {
*self = InboundSubstreamState::Closing(substream);
}
InboundSubstreamState::Cancelled => {
*self = InboundSubstreamState::Cancelled;
}
InboundSubstreamState::Poisoned { .. } => unreachable!(),
}
}
Expand Down Expand Up @@ -461,9 +458,12 @@ impl Handler {
endpoint,
remote_peer_id,
next_connec_unique_id: UniqueConnecId(0),
inbound_substreams: Default::default(),
inbound_substreams: StreamSet::new(
move || Delay::futures_timer(substreams_timeout),
MAX_NUM_STREAMS,
),
outbound_substreams: futures_bounded::FuturesTupleSet::new(
substreams_timeout,
move || Delay::futures_timer(substreams_timeout),
MAX_NUM_STREAMS,
),
pending_streams: Default::default(),
Expand Down Expand Up @@ -518,38 +518,45 @@ impl Handler {
});
}

if self.inbound_substreams.len() == MAX_NUM_STREAMS {
if let Some(s) = self.inbound_substreams.iter_mut().find(|s| {
matches!(
s,
// An inbound substream waiting to be reused.
InboundSubstreamState::WaitingMessage { first: false, .. }
)
}) {
*s = InboundSubstreamState::Cancelled;
let connec_unique_id = self.next_connec_unique_id;
self.next_connec_unique_id.0 += 1;
let new_substream = InboundSubstreamState::WaitingMessage {
first: true,
connection_id: connec_unique_id,
substream: protocol,
};

if self.inbound_substreams.len() >= MAX_NUM_STREAMS {
if let Some(s) = self
.inbound_substreams
.iter_mut_of_type::<InboundSubstreamState>()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This weird construct is required because the underlying API (in SelectAll) doesn't allow typed iteration.

.find(|s| {
matches!(
**s,
// An inbound substream waiting to be reused.
InboundSubstreamState::WaitingMessage { first: false, .. }
)
})
{
*s.get_mut() = new_substream;
Copy link
Author

@teor2345 teor2345 Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because StreamSet only allows try_push(), we can't go over the stream limit, even temporarily. So the new stream has to replace the old one immediately. This also means we don't need the Canceled state.

This code is more robust, because old code could have subtle bugs. For example, if we added two streams over the limit in a row (so we had 33 streams), the equality check (== 32) would fail, and allow us to add even more streams over the 32 limit.

tracing::debug!(
peer=?self.remote_peer_id,
"New inbound substream to peer exceeds inbound substream limit. \
Removed older substream waiting to be reused."
Replacing older substream that was waiting to be reused."
)
} else {
tracing::warn!(
peer=?self.remote_peer_id,
"New inbound substream to peer exceeds inbound substream limit. \
No older substream waiting to be reused. Dropping new substream."
);
return;
}
} else {
self.inbound_substreams
.try_push(new_substream)
.map_err(|_| ())
.expect("Just checked that stream set is not full; qed");
}

let connec_unique_id = self.next_connec_unique_id;
self.next_connec_unique_id.0 += 1;
self.inbound_substreams
.push(InboundSubstreamState::WaitingMessage {
first: true,
connection_id: connec_unique_id,
substream: protocol,
});
}

/// Takes the given [`KadRequestMsg`] and composes it into an outbound request-response protocol
Expand Down Expand Up @@ -616,15 +623,15 @@ impl ConnectionHandler for Handler {
HandlerIn::Reset(request_id) => {
if let Some(state) = self
.inbound_substreams
.iter_mut()
.find(|state| match state {
.iter_mut_of_type::<InboundSubstreamState>()
.find(|state| match **state {
InboundSubstreamState::WaitingBehaviour(conn_id, _, _) => {
conn_id == &request_id.connec_unique_id
conn_id == request_id.connec_unique_id
}
_ => false,
})
{
state.close();
state.get_mut().close();
}
}
HandlerIn::FindNodeReq { key, query_id } => {
Expand Down Expand Up @@ -763,8 +770,16 @@ impl ConnectionHandler for Handler {
Poll::Pending => {}
}

if let Poll::Ready(Some(event)) = self.inbound_substreams.poll_next_unpin(cx) {
return Poll::Ready(event);
if let Poll::Ready(Some(event_result)) = self.inbound_substreams.poll_next_unpin(cx) {
match event_result {
Ok(event) => return Poll::Ready(event),
Err(_stream_set_timeout) => {
tracing::trace!(
"Inbound substream timed out waiting for peer, send, or close"
);
continue;
}
}
}
Comment on lines -766 to 783
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure what to do with an inbound substream timeout.
We could:

  • explicitly close the substream
  • return some synthetic inbound timeout event
  • something else???


if self.outbound_substreams.len() < MAX_NUM_STREAMS {
Expand Down Expand Up @@ -848,8 +863,11 @@ fn compute_new_protocol_status(

impl Handler {
fn answer_pending_request(&mut self, request_id: RequestId, mut msg: KadResponseMsg) {
for state in self.inbound_substreams.iter_mut() {
match state.try_answer_with(request_id, msg) {
for state in self
.inbound_substreams
.iter_mut_of_type::<InboundSubstreamState>()
{
match state.get_mut().try_answer_with(request_id, msg) {
Ok(()) => return,
Err(m) => {
msg = m;
Expand Down Expand Up @@ -1006,7 +1024,6 @@ impl futures::Stream for InboundSubstreamState {
}
},
InboundSubstreamState::Poisoned { .. } => unreachable!(),
InboundSubstreamState::Cancelled => return Poll::Ready(None),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion protocols/perf/src/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
};

use futures::FutureExt;
use futures_bounded::Delay;
use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
use libp2p_swarm::{
handler::{
Expand All @@ -49,7 +50,7 @@ impl Handler {
pub fn new() -> Self {
Self {
inbound: futures_bounded::FuturesSet::new(
crate::RUN_TIMEOUT,
|| Delay::futures_timer(crate::RUN_TIMEOUT),
crate::MAX_PARALLEL_RUNS_PER_CONNECTION,
),
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/relay/src/behaviour/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,11 @@ impl Handler {
pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler {
Handler {
inbound_workers: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
|| futures_bounded::Delay::futures_timer(STREAM_TIMEOUT),
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
outbound_workers: futures_bounded::FuturesMap::new(
STREAM_TIMEOUT,
|| futures_bounded::Delay::futures_timer(STREAM_TIMEOUT),
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
endpoint,
Expand Down
8 changes: 4 additions & 4 deletions protocols/relay/src/priv_client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,19 @@ impl Handler {
queued_events: Default::default(),
pending_streams: Default::default(),
inflight_reserve_requests: futures_bounded::FuturesTupleSet::new(
STREAM_TIMEOUT,
|| futures_bounded::Delay::futures_timer(STREAM_TIMEOUT),
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
inflight_inbound_circuit_requests: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
|| futures_bounded::Delay::futures_timer(STREAM_TIMEOUT),
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet::new(
STREAM_TIMEOUT,
|| futures_bounded::Delay::futures_timer(STREAM_TIMEOUT),
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
inflight_outbound_circuit_deny_requests: futures_bounded::FuturesSet::new(
DENYING_CIRCUIT_TIMEOUT,
|| futures_bounded::Delay::futures_timer(DENYING_CIRCUIT_TIMEOUT),
MAX_NUMBER_DENYING_CIRCUIT,
),
reservation: Reservation::None,
Expand Down
Loading