Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions libp2p/relay/circuit_v2/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ def enable_stop(self) -> bool: # pragma: no cover – helper
def enable_client(self) -> bool: # pragma: no cover – helper
return bool(self.roles & RelayRole.CLIENT)

@property
def enable_dht_discovery(self) -> bool: # pragma: no cover - helper
return False

def __post_init__(self) -> None:
"""Initialize default values."""
if self.limits is None:
Expand Down
51 changes: 32 additions & 19 deletions libp2p/relay/circuit_v2/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ def __init__(
self.read_timeout = read_timeout
self.write_timeout = write_timeout
self.close_timeout = close_timeout
self.resource_manager = RelayResourceManager(self.limits)
self.resource_manager = RelayResourceManager(
self.limits,
self.host.get_peerstore()
)
self._active_relays: dict[ID, tuple[INetStream, INetStream | None]] = {}
self.event_started = trio.Event()

Expand Down Expand Up @@ -526,30 +529,40 @@ async def _handle_reserve(self, stream: INetStream, msg: Any) -> None:
peer_id = ID(msg.peer)
logger.debug("Handling reservation request from peer %s", peer_id)

# Check if we can accept more reservations
if not self.resource_manager.can_accept_reservation(peer_id):
logger.debug("Reservation limit exceeded for peer %s", peer_id)
# Send status message with STATUS type
status = create_status(
code=StatusCode.RESOURCE_LIMIT_EXCEEDED,
message="Reservation limit exceeded",
)
# Check if peer already has a reservation
if self.resource_manager.has_reservation(peer_id):
logger.debug("Peer %s already has a reservation — refreshing", peer_id)
ttl = self.resource_manager.refresh_reservation(peer_id)
status_code = StatusCode.OK
status_msg_text = "Reservation refreshed"
else:
# Check if we can accept more reservations
if not self.resource_manager.can_accept_reservation(peer_id):
logger.debug("Reservation limit exceeded for peer %s", peer_id)
# Send status message with STATUS type
status = create_status(
code=StatusCode.RESOURCE_LIMIT_EXCEEDED,
message="Reservation limit exceeded",
)

status_msg = HopMessage(
type=HopMessage.STATUS,
status=status.to_pb(),
)
await stream.write(status_msg.SerializeToString())
return
status_msg = HopMessage(
type=HopMessage.STATUS,
status=status.to_pb(),
)
await stream.write(status_msg.SerializeToString())
return

# Accept reservation
logger.debug("Accepting reservation from peer %s", peer_id)
ttl = self.resource_manager.reserve(peer_id)
# Accept reservation
logger.debug("Accepting new reservation from peer %s", peer_id)
ttl = self.resource_manager.reserve(peer_id)
status_code = StatusCode.OK
status_msg_text = "Reservation accepted"

# Send reservation success response
with trio.fail_after(self.write_timeout):
status = create_status(
code=StatusCode.OK, message="Reservation accepted"
code=status_code,
message=status_msg_text
)

response = HopMessage(
Expand Down
63 changes: 54 additions & 9 deletions libp2p/relay/circuit_v2/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import time

from libp2p.abc import IPeerStore
from libp2p.peer.id import (
ID,
)
Expand All @@ -23,7 +24,6 @@
RANDOM_BYTES_LENGTH = 16 # 128 bits of randomness
TIMESTAMP_MULTIPLIER = 1000000 # To convert seconds to microseconds


# Reservation status enum
class ReservationStatus(Enum):
"""Lifecycle status of a relay reservation."""
Expand Down Expand Up @@ -61,7 +61,7 @@ def __init__(self, peer_id: ID, limits: RelayLimits):
self.peer_id = peer_id
self.limits = limits
self.created_at = time.time()
self.expires_at = self.created_at + limits.duration
self.expires_at = int(self.created_at + limits.duration)
self.data_used = 0
self.active_connections = 0
self.voucher = self._generate_voucher()
Expand Down Expand Up @@ -137,18 +137,21 @@ class RelayResourceManager:
- Managing connection quotas
"""

def __init__(self, limits: RelayLimits):
def __init__(self, limits: RelayLimits, peer_store: IPeerStore):
"""
Initialize the resource manager.

Parameters
----------
limits : RelayLimits
The resource limits to enforce
peer_store : IPeerStore
Peer store for retrieving public keys and peer metadata

"""
self.limits = limits
self._reservations: dict[ID, Reservation] = {}
self.peer_store = peer_store

def can_accept_reservation(self, peer_id: ID) -> bool:
"""
Expand Down Expand Up @@ -212,13 +215,27 @@ def verify_reservation(self, peer_id: ID, proto_res: PbReservation) -> bool:
True if the reservation is valid

"""
# TODO: Implement voucher and signature verification
# Fetch the reservation
reservation = self._reservations.get(peer_id)
return (
reservation is not None
and not reservation.is_expired()
and reservation.expires_at == proto_res.expire
)

# Reject if reservation is missing, expired, or mismatched
if (
reservation is None
or reservation.is_expired()
or reservation.voucher != proto_res.voucher
or reservation.expires_at != proto_res.expire
):
return False

# verify signature
try:
public_key = self.peer_store.pubkey(peer_id)
if public_key is None:
return False

return public_key.verify(proto_res.voucher, proto_res.signature)
except Exception:
return False

def can_accept_connection(self, peer_id: ID) -> bool:
"""
Expand Down Expand Up @@ -274,3 +291,31 @@ def reserve(self, peer_id: ID) -> int:
# Create new reservation
self.create_reservation(peer_id)
return self.limits.duration

def has_reservation(self, peer_id: ID) -> bool:
"""
Check if a reservation already exists for a peer

Parameters
----------
peer_id : ID
The peer ID to check for

Returns
-------
bool
True if reservation exists, False otherwise

"""
existing = self._reservations.get(peer_id)
if existing and not existing.is_expired():
return True
return False

def refresh_reservation(self, peer_id: ID) -> int:
if self.has_reservation(peer_id):
self.create_reservation(peer_id)
return self.limits.duration

return 0

Loading
Loading