From bde19cacd81df08bd5ac8d8396fe62ac0af2ef8c Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Sun, 29 Jun 2025 12:20:24 +0530 Subject: [PATCH 1/4] added flood publishing --- libp2p/pubsub/gossipsub.py | 81 ++++++++++++++++++++++---------------- libp2p/tools/constants.py | 1 + tests/utils/factories.py | 3 ++ 3 files changed, 51 insertions(+), 34 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 839d67198..0d701e00e 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -100,6 +100,8 @@ class GossipSub(IPubsubRouter, Service): prune_back_off: int unsubscribe_back_off: int + flood_publish: bool + def __init__( self, protocols: Sequence[TProtocol], @@ -118,6 +120,7 @@ def __init__( px_peers_count: int = 16, prune_back_off: int = 60, unsubscribe_back_off: int = 10, + flood_publish: bool = False, ) -> None: self.protocols = list(protocols) self.pubsub = None @@ -158,6 +161,8 @@ def __init__( self.prune_back_off = prune_back_off self.unsubscribe_back_off = unsubscribe_back_off + self.flood_publish = flood_publish + async def run(self) -> None: self.manager.run_daemon_task(self.heartbeat) if len(self.direct_peers) > 0: @@ -294,42 +299,50 @@ def _get_peers_to_send( if topic not in self.pubsub.peer_topics: continue - # direct peers - _direct_peers: set[ID] = {_peer for _peer in self.direct_peers} - send_to.update(_direct_peers) - - # floodsub peers - floodsub_peers: set[ID] = { - peer_id - for peer_id in self.pubsub.peer_topics[topic] - if self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID - } - send_to.update(floodsub_peers) - - # gossipsub peers - gossipsub_peers: set[ID] = set() - if topic in self.mesh: - gossipsub_peers = self.mesh[topic] + if self.flood_publish and msg_forwarder == self.pubsub.my_id: + for peer in self.pubsub.peer_topics[topic]: + # TODO: add score threshold check when peer scoring is implemented + # if direct peer then skip score check + send_to.add(peer) else: - # When we publish to a topic that we have not subscribe to, we randomly - # pick `self.degree` number of peers who have subscribed to the topic - # and add them as our `fanout` peers. - topic_in_fanout: bool = topic in self.fanout - fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set() - fanout_size = len(fanout_peers) - if not topic_in_fanout or ( - topic_in_fanout and fanout_size < self.degree - ): - if topic in self.pubsub.peer_topics: - # Combine fanout peers with selected peers - fanout_peers.update( - self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree - fanout_size, fanout_peers + # direct peers + direct_peers: set[ID] = {_peer for _peer in self.direct_peers} + send_to.update(direct_peers) + + # floodsub peers + floodsub_peers: set[ID] = { + peer_id + for peer_id in self.pubsub.peer_topics[topic] + if self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID + } + send_to.update(floodsub_peers) + + # gossipsub peers + gossipsub_peers: set[ID] = set() + if topic in self.mesh: + gossipsub_peers = self.mesh[topic] + else: + # When we publish to a topic that we have not subscribe to, we + # randomly pick `self.degree` number of peers who have subscribed + # to the topic and add them as our `fanout` peers. + topic_in_fanout: bool = topic in self.fanout + fanout_peers: set[ID] = ( + self.fanout[topic] if topic_in_fanout else set() + ) + fanout_size = len(fanout_peers) + if not topic_in_fanout or ( + topic_in_fanout and fanout_size < self.degree + ): + if topic in self.pubsub.peer_topics: + # Combine fanout peers with selected peers + fanout_peers.update( + self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree - fanout_size, fanout_peers + ) ) - ) - self.fanout[topic] = fanout_peers - gossipsub_peers = fanout_peers - send_to.update(gossipsub_peers) + self.fanout[topic] = fanout_peers + gossipsub_peers = fanout_peers + send_to.update(gossipsub_peers) # Excludes `msg_forwarder` and `origin` yield from send_to.difference([msg_forwarder, origin]) diff --git a/libp2p/tools/constants.py b/libp2p/tools/constants.py index f7d367e70..4c495696b 100644 --- a/libp2p/tools/constants.py +++ b/libp2p/tools/constants.py @@ -45,6 +45,7 @@ class GossipsubParams(NamedTuple): px_peers_count: int = 16 prune_back_off: int = 60 unsubscribe_back_off: int = 10 + flood_publish: bool = False GOSSIPSUB_PARAMS = GossipsubParams() diff --git a/tests/utils/factories.py b/tests/utils/factories.py index 75639e369..b4419e462 100644 --- a/tests/utils/factories.py +++ b/tests/utils/factories.py @@ -576,6 +576,7 @@ async def create_batch_with_gossipsub( px_peers_count: int = GOSSIPSUB_PARAMS.px_peers_count, prune_back_off: int = GOSSIPSUB_PARAMS.prune_back_off, unsubscribe_back_off: int = GOSSIPSUB_PARAMS.unsubscribe_back_off, + flood_publish: bool = GOSSIPSUB_PARAMS.flood_publish, security_protocol: TProtocol | None = None, muxer_opt: TMuxerOptions | None = None, msg_id_constructor: None @@ -600,6 +601,7 @@ async def create_batch_with_gossipsub( px_peers_count=px_peers_count, prune_back_off=prune_back_off, unsubscribe_back_off=unsubscribe_back_off, + flood_publish=flood_publish, ) else: gossipsubs = GossipsubFactory.create_batch( @@ -618,6 +620,7 @@ async def create_batch_with_gossipsub( px_peers_count=px_peers_count, prune_back_off=prune_back_off, unsubscribe_back_off=unsubscribe_back_off, + flood_publish=flood_publish, ) async with cls._create_batch_with_router( From 75a3749af924adf57347665c0341cf2e06533f70 Mon Sep 17 00:00:00 2001 From: Khwahish29 Date: Mon, 30 Jun 2025 12:28:24 +0530 Subject: [PATCH 2/4] added tests for flood publising --- newsfragments/713.feature.rst | 1 + tests/core/pubsub/test_gossipsub.py | 43 +++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) create mode 100644 newsfragments/713.feature.rst diff --git a/newsfragments/713.feature.rst b/newsfragments/713.feature.rst new file mode 100644 index 000000000..601911688 --- /dev/null +++ b/newsfragments/713.feature.rst @@ -0,0 +1 @@ +Added flood publishing. \ No newline at end of file diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 03276a781..ed8aff013 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -590,3 +590,46 @@ async def test_sparse_connect(): f"received the message. Ideally all nodes should receive it, but at " f"minimum {min_required} required for sparse network scalability." ) + + +@pytest.mark.trio +async def test_flood_publish(): + async with PubsubFactory.create_batch_with_gossipsub( + 6, + degree=2, + degree_low=1, + degree_high=3, + flood_publish=False, + ) as pubsubs_gsub: + routers: list[GossipSub] = [] + for pubsub in pubsubs_gsub: + assert isinstance(pubsub.router, GossipSub) + routers.append(pubsub.router) + hosts = [ps.host for ps in pubsubs_gsub] + + topic = "flood_test_topic" + queues = [await pubsub.subscribe(topic) for pubsub in pubsubs_gsub] + + # connect host 0 to all other hosts + await one_to_all_connect(hosts, 0) + + # wait for connections to be established + await trio.sleep(1) + + # publish a message from the first host + msg_content = b"flood_msg" + await pubsubs_gsub[0].publish(topic, msg_content) + + # wait for messages to propagate + await trio.sleep(0.5) + + print(routers[0].mesh[topic]) + if routers[0].pubsub: + print(routers[0].pubsub.peer_topics) + + # verify all nodes received the message + for queue in queues: + msg = await queue.get() + assert msg.data == msg_content, ( + f"node did not receive expected message: {msg.data}" + ) From 47809042e6eda12f3235d1c123af753510e304d2 Mon Sep 17 00:00:00 2001 From: Khwahish29 Date: Mon, 30 Jun 2025 12:31:36 +0530 Subject: [PATCH 3/4] fix lint --- newsfragments/713.feature.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/newsfragments/713.feature.rst b/newsfragments/713.feature.rst index 601911688..6c0bb3bc0 100644 --- a/newsfragments/713.feature.rst +++ b/newsfragments/713.feature.rst @@ -1 +1 @@ -Added flood publishing. \ No newline at end of file +Added flood publishing. From ed673401aadf9669e38ddcd85681f03a443cc30b Mon Sep 17 00:00:00 2001 From: Khwahish29 Date: Tue, 8 Jul 2025 14:31:51 +0530 Subject: [PATCH 4/4] resolved merge conflicts --- tests/core/pubsub/test_gossipsub.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 6e369c359..35014cd25 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -634,6 +634,8 @@ async def test_flood_publish(): assert msg.data == msg_content, ( f"node did not receive expected message: {msg.data}" ) + + async def test_connect_some_with_fewer_hosts_than_degree(): """Test connect_some when there are fewer hosts than degree.""" # Create 3 hosts with degree=5 @@ -793,4 +795,4 @@ async def test_single_host(): connected_peers = len(pubsubs_fsub[0].peers) assert connected_peers == 0, ( f"Single host has {connected_peers} connections, expected 0" - ) \ No newline at end of file + )