Skip to content

Commit ca5b919

Browse files
committed
Improve relay selection to prioritize active reservations
1 parent b46dae7 commit ca5b919

File tree

4 files changed

+103
-5
lines changed

4 files changed

+103
-5
lines changed

libp2p/relay/circuit_v2/dcutr.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,9 @@ async def _have_direct_connection(self, peer_id: ID) -> bool:
524524

525525
# Handle both single connection and list of connections
526526
connections: list[INetConn] = (
527-
[conn_or_conns] if not isinstance(conn_or_conns, list) else conn_or_conns
527+
list(conn_or_conns)
528+
if not isinstance(conn_or_conns, list)
529+
else conn_or_conns
528530
)
529531

530532
# Check if any connection is direct (not relayed)

libp2p/relay/circuit_v2/transport.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ def __init__(
9292
stream_timeout=config.timeouts.discovery_stream_timeout,
9393
peer_protocol_timeout=config.timeouts.peer_protocol_timeout,
9494
)
95+
self.relay_counter = 0 # for round robin load balancing
9596

9697
async def dial(
9798
self,
@@ -221,9 +222,25 @@ async def _select_relay(self, peer_info: PeerInfo) -> ID | None:
221222
# Get a relay from the list of discovered relays
222223
relays = self.discovery.get_relays()
223224
if relays:
224-
# TODO: Implement more sophisticated relay selection
225-
# For now, just return the first available relay
226-
return relays[0]
225+
# Prioritize relays with active reservations
226+
relays_with_reservations = []
227+
other_relays = []
228+
229+
for relay_id in relays:
230+
relay_info = self.discovery.get_relay_info(relay_id)
231+
if relay_info and relay_info.has_reservation:
232+
relays_with_reservations.append(relay_id)
233+
else:
234+
other_relays.append(relay_id)
235+
236+
# Return first available relay with reservation, or fallback to others
237+
self.relay_counter += 1
238+
if relays_with_reservations:
239+
return relays_with_reservations[
240+
(self.relay_counter - 1) % len(relays_with_reservations)
241+
]
242+
elif other_relays:
243+
return other_relays[(self.relay_counter - 1) % len(other_relays)]
227244

228245
# Wait and try discovery
229246
await trio.sleep(1)

tests/conftest.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import pytest
22

3+
34
@pytest.fixture
45
def security_protocol():
5-
return None
6+
return None

tests/core/relay/test_circuit_v2_transport.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
StreamEOF,
1212
StreamReset,
1313
)
14+
from libp2p.peer.peerinfo import PeerInfo
1415
from libp2p.relay.circuit_v2.config import (
1516
RelayConfig,
1617
)
@@ -344,3 +345,80 @@ async def test_circuit_v2_transport_relay_limits():
344345

345346
# Test successful - transports were initialized with the correct limits
346347
logger.info("Transport limit test successful")
348+
349+
350+
@pytest.mark.trio
351+
async def test_circuit_v2_transport_relay_selection():
352+
"""Test relay round robin load balancing and reservation priority"""
353+
async with HostFactory.create_batch_and_listen(5) as hosts:
354+
client1_host, relay_host1, relay_host2, relay_host3, target_host = hosts
355+
356+
# Setup relay with strict limits
357+
limits = RelayLimits(
358+
duration=DEFAULT_RELAY_LIMITS.duration,
359+
data=DEFAULT_RELAY_LIMITS.data,
360+
max_circuit_conns=DEFAULT_RELAY_LIMITS.max_circuit_conns,
361+
max_reservations=DEFAULT_RELAY_LIMITS.max_reservations,
362+
)
363+
364+
# Register test handler on target
365+
test_protocol = "/test/echo/1.0.0"
366+
target_host.set_stream_handler(TProtocol(test_protocol), echo_stream_handler)
367+
target_host_info = PeerInfo(target_host.get_id(), target_host.get_addrs())
368+
369+
client_config = RelayConfig()
370+
371+
# Client setup
372+
client1_protocol = CircuitV2Protocol(client1_host, limits, allow_hop=False)
373+
client1_discovery = RelayDiscovery(
374+
host=client1_host,
375+
auto_reserve=False,
376+
discovery_interval=client_config.discovery_interval,
377+
max_relays=client_config.max_relays,
378+
)
379+
380+
client1_transport = CircuitV2Transport(
381+
client1_host, client1_protocol, client_config
382+
)
383+
client1_transport.discovery = client1_discovery
384+
# Add relay to discovery
385+
relay_id1 = relay_host1.get_id()
386+
relay_id2 = relay_host2.get_id()
387+
relay_id3 = relay_host3.get_id()
388+
389+
# Connect all peers
390+
try:
391+
with trio.fail_after(CONNECT_TIMEOUT):
392+
# Connect clients to relay
393+
await connect(client1_host, relay_host1)
394+
await connect(client1_host, relay_host2)
395+
await connect(client1_host, relay_host3)
396+
397+
logger.info("All connections established")
398+
except Exception as e:
399+
logger.error("Failed to connect peers: %s", str(e))
400+
raise
401+
402+
await client1_discovery._add_relay(relay_id1)
403+
await client1_discovery._add_relay(relay_id2)
404+
await client1_discovery._add_relay(relay_id3)
405+
406+
selected_relay = await client1_transport._select_relay(target_host_info)
407+
# Without reservation preference
408+
# Round robin, so 1st time must be relay1
409+
assert selected_relay is not None and selected_relay is relay_id1
410+
411+
selected_relay = await client1_transport._select_relay(target_host_info)
412+
# Round robin, so 2nd time must be relay2
413+
assert selected_relay is not None and selected_relay is relay_id2
414+
415+
# Mock reservation with relay1 to prioritize over relay2
416+
relay_info3 = client1_discovery.get_relay_info(relay_id3)
417+
if relay_info3:
418+
relay_info3.has_reservation = True
419+
420+
selected_relay = await client1_transport._select_relay(target_host_info)
421+
# With reservation preference, relay2 must be chosen for target_peer.
422+
assert selected_relay is not None and selected_relay is relay_host3.get_id()
423+
424+
logger.info("Relay selection successful")

0 commit comments

Comments
 (0)