Skip to content

Commit 520523b

Browse files
authored
feat(kad): Limit number of active outbound streams (#3287)
Limit number of active outbound streams to not exceed configured number of streams. Resolves #3236.
1 parent 687fba8 commit 520523b

File tree

2 files changed

+47
-36
lines changed

2 files changed

+47
-36
lines changed

protocols/kad/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@
66

77
- Remove lifetime from `RecordStore` and use GATs instead. See [PR 3239].
88

9+
- Limit number of active outbound streams to 32. See [PR 3287].
10+
911
- Bump MSRV to 1.65.0.
1012

1113
[PR 3239]: https://github.com/libp2p/rust-libp2p/pull/3239
14+
[PR 3287]: https://github.com/libp2p/rust-libp2p/pull/3287
1215

1316
# 0.42.1
1417

protocols/kad/src/handler.rs

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ use libp2p_swarm::{
3636
KeepAlive, NegotiatedSubstream, SubstreamProtocol,
3737
};
3838
use log::trace;
39+
use std::collections::VecDeque;
3940
use std::task::Waker;
4041
use std::{
4142
error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration,
4243
};
4344

44-
const MAX_NUM_INBOUND_SUBSTREAMS: usize = 32;
45+
const MAX_NUM_SUBSTREAMS: usize = 32;
4546

4647
/// A prototype from which [`KademliaHandler`]s can be constructed.
4748
pub struct KademliaHandlerProto<T> {
@@ -93,6 +94,14 @@ pub struct KademliaHandler<TUserData> {
9394
/// List of active outbound substreams with the state they are in.
9495
outbound_substreams: SelectAll<OutboundSubstreamState<TUserData>>,
9596

97+
/// Number of outbound streams being upgraded right now.
98+
num_requested_outbound_streams: usize,
99+
100+
/// List of outbound substreams that are waiting to become active next.
101+
/// Contains the request we want to send, and the user data if we expect an answer.
102+
requested_streams:
103+
VecDeque<SubstreamProtocol<KademliaProtocolConfig, (KadRequestMsg, Option<TUserData>)>>,
104+
96105
/// List of active inbound substreams with the state they are in.
97106
inbound_substreams: SelectAll<InboundSubstreamState<TUserData>>,
98107

@@ -139,9 +148,6 @@ pub struct KademliaHandlerConfig {
139148

140149
/// State of an active outbound substream.
141150
enum OutboundSubstreamState<TUserData> {
142-
/// We haven't started opening the outgoing substream yet.
143-
/// Contains the request we want to send, and the user data if we expect an answer.
144-
PendingOpen(SubstreamProtocol<KademliaProtocolConfig, (KadRequestMsg, Option<TUserData>)>),
145151
/// Waiting to send a message to the remote.
146152
PendingSend(
147153
KadOutStreamSink<NegotiatedSubstream>,
@@ -524,6 +530,8 @@ where
524530
next_connec_unique_id: UniqueConnecId(0),
525531
inbound_substreams: Default::default(),
526532
outbound_substreams: Default::default(),
533+
num_requested_outbound_streams: 0,
534+
requested_streams: Default::default(),
527535
keep_alive,
528536
protocol_status: ProtocolStatus::Unconfirmed,
529537
}
@@ -543,6 +551,7 @@ where
543551
.push(OutboundSubstreamState::PendingSend(
544552
protocol, msg, user_data,
545553
));
554+
self.num_requested_outbound_streams -= 1;
546555
if let ProtocolStatus::Unconfirmed = self.protocol_status {
547556
// Upon the first successfully negotiated substream, we know that the
548557
// remote is configured with the same protocol name and we want
@@ -572,7 +581,7 @@ where
572581
self.protocol_status = ProtocolStatus::Confirmed;
573582
}
574583

575-
if self.inbound_substreams.len() == MAX_NUM_INBOUND_SUBSTREAMS {
584+
if self.inbound_substreams.len() == MAX_NUM_SUBSTREAMS {
576585
if let Some(s) = self.inbound_substreams.iter_mut().find(|s| {
577586
matches!(
578587
s,
@@ -624,6 +633,7 @@ where
624633
self.outbound_substreams
625634
.push(OutboundSubstreamState::ReportError(error.into(), user_data));
626635
}
636+
self.num_requested_outbound_streams -= 1;
627637
}
628638
}
629639

@@ -667,23 +677,21 @@ where
667677
}
668678
KademliaHandlerIn::FindNodeReq { key, user_data } => {
669679
let msg = KadRequestMsg::FindNode { key };
670-
self.outbound_substreams
671-
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
672-
self.config.protocol_config.clone(),
673-
(msg, Some(user_data)),
674-
)));
680+
self.requested_streams.push_back(SubstreamProtocol::new(
681+
self.config.protocol_config.clone(),
682+
(msg, Some(user_data)),
683+
));
675684
}
676685
KademliaHandlerIn::FindNodeRes {
677686
closer_peers,
678687
request_id,
679688
} => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }),
680689
KademliaHandlerIn::GetProvidersReq { key, user_data } => {
681690
let msg = KadRequestMsg::GetProviders { key };
682-
self.outbound_substreams
683-
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
684-
self.config.protocol_config.clone(),
685-
(msg, Some(user_data)),
686-
)));
691+
self.requested_streams.push_back(SubstreamProtocol::new(
692+
self.config.protocol_config.clone(),
693+
(msg, Some(user_data)),
694+
));
687695
}
688696
KademliaHandlerIn::GetProvidersRes {
689697
closer_peers,
@@ -698,27 +706,24 @@ where
698706
),
699707
KademliaHandlerIn::AddProvider { key, provider } => {
700708
let msg = KadRequestMsg::AddProvider { key, provider };
701-
self.outbound_substreams
702-
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
703-
self.config.protocol_config.clone(),
704-
(msg, None),
705-
)));
709+
self.requested_streams.push_back(SubstreamProtocol::new(
710+
self.config.protocol_config.clone(),
711+
(msg, None),
712+
));
706713
}
707714
KademliaHandlerIn::GetRecord { key, user_data } => {
708715
let msg = KadRequestMsg::GetValue { key };
709-
self.outbound_substreams
710-
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
711-
self.config.protocol_config.clone(),
712-
(msg, Some(user_data)),
713-
)));
716+
self.requested_streams.push_back(SubstreamProtocol::new(
717+
self.config.protocol_config.clone(),
718+
(msg, Some(user_data)),
719+
));
714720
}
715721
KademliaHandlerIn::PutRecord { record, user_data } => {
716722
let msg = KadRequestMsg::PutValue { record };
717-
self.outbound_substreams
718-
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
719-
self.config.protocol_config.clone(),
720-
(msg, Some(user_data)),
721-
)));
723+
self.requested_streams.push_back(SubstreamProtocol::new(
724+
self.config.protocol_config.clone(),
725+
(msg, Some(user_data)),
726+
));
722727
}
723728
KademliaHandlerIn::GetRecordRes {
724729
record,
@@ -775,6 +780,15 @@ where
775780
return Poll::Ready(event);
776781
}
777782

783+
let num_in_progress_outbound_substreams =
784+
self.outbound_substreams.len() + self.num_requested_outbound_streams;
785+
if num_in_progress_outbound_substreams < MAX_NUM_SUBSTREAMS {
786+
if let Some(protocol) = self.requested_streams.pop_front() {
787+
self.num_requested_outbound_streams += 1;
788+
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol });
789+
}
790+
}
791+
778792
if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() {
779793
// We destroyed all substreams in this function.
780794
self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout);
@@ -853,12 +867,6 @@ where
853867

854868
loop {
855869
match std::mem::replace(this, OutboundSubstreamState::Poisoned) {
856-
OutboundSubstreamState::PendingOpen(protocol) => {
857-
*this = OutboundSubstreamState::Done;
858-
return Poll::Ready(Some(ConnectionHandlerEvent::OutboundSubstreamRequest {
859-
protocol,
860-
}));
861-
}
862870
OutboundSubstreamState::PendingSend(mut substream, msg, user_data) => {
863871
match substream.poll_ready_unpin(cx) {
864872
Poll::Ready(Ok(())) => match substream.start_send_unpin(msg) {

0 commit comments

Comments
 (0)