Skip to content

Commit 58d211b

Browse files
committed
fix sending partial messages to peers not supporting them
1 parent 2220ae4 commit 58d211b

File tree

3 files changed

+129
-167
lines changed

3 files changed

+129
-167
lines changed

protocols/gossipsub/src/behaviour.rs

Lines changed: 121 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,7 @@ where
658658

659659
let topic_hash = raw_message.topic.clone();
660660

661-
let recipient_peers = self.get_publish_peers(&topic_hash, true);
661+
let recipient_peers = self.get_publish_peers(&topic_hash, |_, _| true);
662662

663663
// If the message isn't a duplicate and we have sent it to some peers add it to the
664664
// duplicate cache and memcache.
@@ -715,129 +715,101 @@ where
715715
Ok(msg_id)
716716
}
717717

718-
// Get Peers from the mesh or fanout to publish a message to.
719-
// If `exclude_partial_only` set, filter out peers who only want partial messages for the topic.
718+
// Get Peers from the mesh or fanout to publish a message to
719+
// filtered out further by the provided `f` callback.
720720
fn get_publish_peers(
721721
&mut self,
722722
topic_hash: &TopicHash,
723-
exclude_partial_only: bool,
723+
f: impl Fn(&PeerId, &PeerDetails) -> bool,
724724
) -> HashSet<PeerId> {
725-
let mesh_n = self.config.mesh_n_for_topic(topic_hash);
726-
727725
let peers_on_topic = self
728726
.connected_peers
729727
.iter()
730-
.filter(|(_, peer)| {
731-
#[cfg(feature = "partial_messages")]
732-
{
733-
if exclude_partial_only && peer.partial_only_topics.contains(topic_hash) {
734-
return false;
735-
}
736-
}
737-
let _ = peer;
738-
true
739-
})
740-
.map(|(peer_id, _)| peer_id)
741-
.peekable();
742-
743-
let mut recipient_peers = HashSet::new();
744-
if self.config.flood_publish() {
745-
// Forward to all peers above score and all explicit peers
746-
recipient_peers.extend(peers_on_topic.filter(|p| {
747-
self.explicit_peers.contains(*p)
728+
.filter(|(_, peer)| peer.topics.contains(topic_hash))
729+
.filter(|(peer_id, _)| {
730+
self.explicit_peers.contains(*peer_id)
748731
|| !self
749732
.peer_score
750-
.below_threshold(p, |ts| ts.publish_threshold)
733+
.below_threshold(peer_id, |ts| ts.publish_threshold)
751734
.0
752-
}));
753-
} else {
754-
match self.mesh.get(topic_hash) {
755-
// Mesh peers
756-
Some(mesh_peers) => {
757-
// We have a mesh set. We want to make sure to publish to at least `mesh_n`
758-
// peers (if possible).
759-
let needed_extra_peers = mesh_n.saturating_sub(mesh_peers.len());
760-
761-
if needed_extra_peers > 0 {
762-
// We don't have `mesh_n` peers in our mesh, we will randomly select extras
763-
// and publish to them.
764-
765-
// Get a random set of peers that are appropriate to send messages too.
766-
let peer_list = get_random_peers(
767-
&self.connected_peers,
768-
topic_hash,
769-
needed_extra_peers,
770-
exclude_partial_only,
771-
|peer| {
772-
!mesh_peers.contains(peer)
773-
&& !self.explicit_peers.contains(peer)
774-
&& !self
775-
.peer_score
776-
.below_threshold(peer, |ts| ts.publish_threshold)
777-
.0
778-
},
779-
);
780-
recipient_peers.extend(peer_list);
781-
}
735+
});
782736

783-
recipient_peers.extend(mesh_peers);
784-
}
785-
// Gossipsub peers
786-
None => {
787-
tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
788-
// `fanout_peers` is always non-empty if it's `Some`.
789-
let fanout_peers = self
790-
.fanout
791-
.get(topic_hash)
792-
.filter(|peers| !peers.is_empty());
793-
// If we have fanout peers add them to the map.
794-
if let Some(peers) = fanout_peers {
795-
for peer in peers {
796-
recipient_peers.insert(*peer);
797-
}
798-
} else {
799-
// We have no fanout peers, select mesh_n of them and add them to the fanout
800-
let new_peers = get_random_peers(
801-
&self.connected_peers,
802-
topic_hash,
803-
mesh_n,
804-
exclude_partial_only,
805-
|p| {
806-
!self.explicit_peers.contains(p)
807-
&& !self
808-
.peer_score
809-
.below_threshold(p, |ts| ts.publish_threshold)
810-
.0
811-
},
812-
);
813-
// Add the new peers to the fanout and recipient peers
814-
self.fanout.insert(topic_hash.clone(), new_peers.clone());
815-
for peer in new_peers {
816-
tracing::debug!(%peer, "Peer added to fanout");
817-
recipient_peers.insert(peer);
818-
}
819-
}
820-
// We are publishing to fanout peers - update the time we published
821-
self.fanout_last_pub
822-
.insert(topic_hash.clone(), Instant::now());
737+
// Forward to all peers above score and all explicit peers
738+
if self.config.flood_publish() {
739+
return peers_on_topic
740+
.filter(|(peer_id, peer_details)| f(peer_id, peer_details))
741+
.map(|(peer_id, _)| *peer_id)
742+
.collect();
743+
}
744+
745+
let mesh_n = self.config.mesh_n_for_topic(topic_hash);
746+
let mut recipient_peers = HashSet::new();
747+
// Explicit peers that are part of the topic and Floodsub peers.
748+
recipient_peers.extend(
749+
peers_on_topic
750+
.clone()
751+
.filter(|(peer_id, peer)| {
752+
self.explicit_peers.contains(peer_id) || peer.kind == PeerKind::Floodsub
753+
})
754+
.map(|(peer_id, _)| *peer_id),
755+
);
756+
757+
match self.mesh.get(topic_hash) {
758+
// Mesh peers
759+
Some(mesh_peers) => {
760+
// We have a mesh set. We want to make sure to publish to at least `mesh_n`
761+
// peers (if possible).
762+
let mesh_peers = peers_on_topic
763+
.clone()
764+
.filter_map(|(peer_id, _)| mesh_peers.get(peer_id))
765+
.copied()
766+
.collect::<Vec<PeerId>>();
767+
768+
let needed_extra_peers = mesh_n.saturating_sub(mesh_peers.len());
769+
if needed_extra_peers > 0 {
770+
// We don't have `mesh_n` peers in our mesh, we will randomly select extras
771+
// and publish to them.
772+
773+
// Get a random set of peers that are appropriate to send messages too.
774+
let peer_list =
775+
get_random_peers(peers_on_topic, topic_hash, needed_extra_peers, |_, _| {
776+
true
777+
});
778+
recipient_peers.extend(peer_list);
823779
}
824-
}
825780

826-
// Explicit peers that are part of the topic
827-
recipient_peers
828-
.extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
781+
recipient_peers.extend(mesh_peers);
782+
}
783+
// Gossipsub peers
784+
None => {
785+
tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
786+
let fanout_peers = peers_on_topic
787+
.clone()
788+
.filter_map(|(peer_id, _)| {
789+
self.fanout
790+
.get(topic_hash)
791+
.and_then(|fanout| fanout.get(peer_id))
792+
})
793+
.copied()
794+
.collect::<Vec<PeerId>>();
829795

830-
// Floodsub peers
831-
for (peer, connections) in &self.connected_peers {
832-
if connections.kind == PeerKind::Floodsub
833-
&& connections.topics.contains(topic_hash)
834-
&& !self
835-
.peer_score
836-
.below_threshold(peer, |ts| ts.publish_threshold)
837-
.0
838-
{
839-
recipient_peers.insert(*peer);
796+
// If we have fanout peers add them to the map.
797+
if !fanout_peers.is_empty() {
798+
recipient_peers.extend(fanout_peers);
799+
} else {
800+
// We have no fanout peers, select mesh_n of them and add them to the fanout
801+
let new_peers =
802+
get_random_peers(peers_on_topic, topic_hash, mesh_n, |_, _| true);
803+
// Add the new peers to the fanout and recipient peers
804+
self.fanout.insert(topic_hash.clone(), new_peers.clone());
805+
for peer in new_peers {
806+
tracing::debug!(%peer, "Peer added to fanout");
807+
recipient_peers.insert(peer);
808+
}
840809
}
810+
// We are publishing to fanout peers - update the time we published
811+
self.fanout_last_pub
812+
.insert(topic_hash.clone(), Instant::now());
841813
}
842814
}
843815

@@ -854,7 +826,9 @@ where
854826

855827
let group_id = partial_message.group_id().as_ref().to_vec();
856828

857-
let recipient_peers = self.get_publish_peers(&topic_hash, false);
829+
let recipient_peers = self.get_publish_peers(&topic_hash, |_, peer| {
830+
peer.partial_only_topics.contains(&topic_hash)
831+
});
858832
let metadata = partial_message.parts_metadata().as_ref().to_vec();
859833
for peer_id in recipient_peers.iter() {
860834
// TODO: this can be optimized, we are going to get the peer again on `send_message`
@@ -1151,12 +1125,11 @@ where
11511125
&self.connected_peers,
11521126
topic_hash,
11531127
mesh_n - added_peers.len(),
1154-
true,
1155-
|peer| {
1156-
!added_peers.contains(peer)
1157-
&& !self.explicit_peers.contains(peer)
1158-
&& !self.peer_score.below_threshold(peer, |_| 0.0).0
1159-
&& !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1128+
|peer_id, _| {
1129+
!added_peers.contains(peer_id)
1130+
&& !self.explicit_peers.contains(peer_id)
1131+
&& !self.peer_score.below_threshold(peer_id, |_| 0.0).0
1132+
&& !self.backoffs.is_backoff_with_slack(topic_hash, peer_id)
11601133
},
11611134
);
11621135

@@ -1246,8 +1219,9 @@ where
12461219
&self.connected_peers,
12471220
topic_hash,
12481221
self.config.prune_peers(),
1249-
true,
1250-
|p| p != peer && !self.peer_score.below_threshold(p, |_| 0.0).0,
1222+
|peer_id, _| {
1223+
peer_id != peer && !self.peer_score.below_threshold(peer_id, |_| 0.0).0
1224+
},
12511225
)
12521226
.into_iter()
12531227
.map(|p| PeerInfo { peer_id: Some(p) })
@@ -2419,12 +2393,11 @@ where
24192393
&self.connected_peers,
24202394
topic_hash,
24212395
desired_peers,
2422-
true,
2423-
|peer| {
2424-
!peers.contains(peer)
2425-
&& !explicit_peers.contains(peer)
2426-
&& !backoffs.is_backoff_with_slack(topic_hash, peer)
2427-
&& scores.get(peer).map(|r| r.score).unwrap_or_default() >= 0.0
2396+
|peer_id, _| {
2397+
!peers.contains(peer_id)
2398+
&& !explicit_peers.contains(peer_id)
2399+
&& !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2400+
&& scores.get(peer_id).map(|r| r.score).unwrap_or_default() >= 0.0
24282401
},
24292402
);
24302403
for peer in &peer_list {
@@ -2527,8 +2500,7 @@ where
25272500
&self.connected_peers,
25282501
topic_hash,
25292502
needed,
2530-
false,
2531-
|peer_id| {
2503+
|peer_id, _| {
25322504
!peers.contains(peer_id)
25332505
&& !explicit_peers.contains(peer_id)
25342506
&& !backoffs.is_backoff_with_slack(topic_hash, peer_id)
@@ -2604,8 +2576,7 @@ where
26042576
&self.connected_peers,
26052577
topic_hash,
26062578
self.config.opportunistic_graft_peers(),
2607-
false,
2608-
|peer_id| {
2579+
|peer_id, _| {
26092580
!peers.contains(peer_id)
26102581
&& !explicit_peers.contains(peer_id)
26112582
&& !backoffs.is_backoff_with_slack(topic_hash, peer_id)
@@ -2701,8 +2672,7 @@ where
27012672
&self.connected_peers,
27022673
topic_hash,
27032674
needed_peers,
2704-
false,
2705-
|peer_id| {
2675+
|peer_id, _| {
27062676
!peers.contains(peer_id)
27072677
&& !explicit_peers.contains(peer_id)
27082678
&& !self
@@ -2816,15 +2786,19 @@ where
28162786
)
28172787
};
28182788
// get gossip_lazy random peers
2819-
let to_msg_peers =
2820-
get_random_peers_dynamic(&self.connected_peers, topic_hash, false, n_map, |peer| {
2821-
!peers.contains(peer)
2822-
&& !self.explicit_peers.contains(peer)
2789+
let to_msg_peers = get_random_peers_dynamic(
2790+
self.connected_peers.iter(),
2791+
topic_hash,
2792+
n_map,
2793+
|peer_id, _| {
2794+
!peers.contains(peer_id)
2795+
&& !self.explicit_peers.contains(peer_id)
28232796
&& !self
28242797
.peer_score
2825-
.below_threshold(peer, |ts| ts.gossip_threshold)
2798+
.below_threshold(peer_id, |ts| ts.gossip_threshold)
28262799
.0
2827-
});
2800+
},
2801+
);
28282802

28292803
tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
28302804

@@ -3787,28 +3761,17 @@ fn peer_removed_from_mesh(
37873761
/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
37883762
/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
37893763
/// that gets as input the number of filtered peers.
3790-
#[allow(unused, reason = "partial is used with partial_messages feature")]
3791-
fn get_random_peers_dynamic(
3792-
connected_peers: &HashMap<PeerId, PeerDetails>,
3764+
fn get_random_peers_dynamic<'a>(
3765+
peers: impl IntoIterator<Item = (&'a PeerId, &'a PeerDetails)>,
37933766
topic_hash: &TopicHash,
3794-
// If we want to exclude partial only peers.
3795-
exclude_partial: bool,
37963767
// maps the number of total peers to the number of selected peers
37973768
n_map: impl Fn(usize) -> usize,
3798-
mut f: impl FnMut(&PeerId) -> bool,
3769+
f: impl Fn(&PeerId, &PeerDetails) -> bool,
37993770
) -> BTreeSet<PeerId> {
3800-
let mut gossip_peers = connected_peers
3801-
.iter()
3802-
.filter_map(|(peer_id, peer)| {
3803-
#[cfg(feature = "partial_messages")]
3804-
{
3805-
if exclude_partial && peer.partial_only_topics.contains(topic_hash) {
3806-
return None;
3807-
}
3808-
}
3809-
Some((peer_id, peer))
3810-
})
3811-
.filter(|(peer_id, _)| f(peer_id))
3771+
let mut gossip_peers = peers
3772+
.into_iter()
3773+
.filter(|(_, p)| p.topics.contains(topic_hash))
3774+
.filter(|(peer_id, peer_details)| f(peer_id, peer_details))
38123775
.filter(|(_, p)| p.kind.is_gossipsub())
38133776
.map(|(peer_id, _)| *peer_id)
38143777
.collect::<Vec<PeerId>>();
@@ -3831,15 +3794,13 @@ fn get_random_peers_dynamic(
38313794

38323795
/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
38333796
/// filtered by the function `f`.
3834-
#[allow(unused, reason = "partial is used with partial_messages feature")]
3835-
fn get_random_peers(
3836-
connected_peers: &HashMap<PeerId, PeerDetails>,
3797+
fn get_random_peers<'a>(
3798+
peers: impl IntoIterator<Item = (&'a PeerId, &'a PeerDetails)>,
38373799
topic_hash: &TopicHash,
38383800
n: usize,
3839-
exclude_partial: bool,
3840-
f: impl FnMut(&PeerId) -> bool,
3801+
f: impl Fn(&PeerId, &PeerDetails) -> bool,
38413802
) -> BTreeSet<PeerId> {
3842-
get_random_peers_dynamic(connected_peers, topic_hash, exclude_partial, |_| n, f)
3803+
get_random_peers_dynamic(peers, topic_hash, |_| n, f)
38433804
}
38443805

38453806
/// Validates the combination of signing, privacy and message validation to ensure the

0 commit comments

Comments
 (0)