From 76695269a45d8cf15416456be304200955a3c3c9 Mon Sep 17 00:00:00 2001 From: Winter-Soren Date: Sat, 21 Jun 2025 18:43:03 +0530 Subject: [PATCH 1/7] fix: revamp circuit relay example and moved example into examples directory --- examples/circuit_relay/__init__.py | 5 + examples/circuit_relay/relay_example.py | 429 ++++++++++++++++++++++++ libp2p/relay/circuit_v2/transport.py | 6 + 3 files changed, 440 insertions(+) create mode 100644 examples/circuit_relay/__init__.py create mode 100644 examples/circuit_relay/relay_example.py diff --git a/examples/circuit_relay/__init__.py b/examples/circuit_relay/__init__.py new file mode 100644 index 000000000..48f0a5f4a --- /dev/null +++ b/examples/circuit_relay/__init__.py @@ -0,0 +1,5 @@ +""" +Circuit Relay v2 example module. + +This package demonstrates the usage of Circuit Relay v2 protocol in libp2p. +""" \ No newline at end of file diff --git a/examples/circuit_relay/relay_example.py b/examples/circuit_relay/relay_example.py new file mode 100644 index 000000000..1b8e2670d --- /dev/null +++ b/examples/circuit_relay/relay_example.py @@ -0,0 +1,429 @@ +""" +Circuit Relay v2 Example. + +This example demonstrates using the Circuit Relay v2 protocol by setting up: +1. A relay node that facilitates connections +2. A destination node that accepts incoming connections +3. A source node that connects to the destination through the relay + +Usage: + # First terminal - start the relay: + python relay_example.py --role relay --port 8000 + + # Second terminal - start the destination: + python relay_example.py --role destination --port 8001 --relay-addr RELAY_PEER_ID + + # Third terminal - start the source: + python relay_example.py --role source --relay-addr RELAY_PEER_ID --dest-id DESTINATION_PEER_ID +""" + +import argparse +import logging +import sys +from typing import Any + +import multiaddr +import trio + +from libp2p import new_host +from libp2p.crypto.secp256k1 import create_new_key_pair +from libp2p.custom_types import TProtocol +from libp2p.network.stream.net_stream import INetStream +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr +from libp2p.relay.circuit_v2.config import RelayConfig +from libp2p.relay.circuit_v2.discovery import RelayDiscovery +from libp2p.relay.circuit_v2.protocol import CircuitV2Protocol, PROTOCOL_ID as RELAY_PROTOCOL_ID +from libp2p.relay.circuit_v2.resources import RelayLimits +from libp2p.relay.circuit_v2.transport import CircuitV2Transport +from libp2p.tools.async_service import background_trio_service + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s | %(name)s | %(levelname)s | %(message)s", +) +logger = logging.getLogger("circuit-relay-example") + +# Application protocol for our example +EXAMPLE_PROTOCOL_ID = TProtocol("/circuit-relay-example/1.0.0") +MAX_READ_LEN = 2**16 # 64KB + + +async def handle_example_protocol(stream: INetStream) -> None: + """Handle incoming messages on our example protocol.""" + remote_peer_id = stream.get_protocol().remote_peer_id + logger.info(f"New stream from peer: {remote_peer_id}") + + try: + # Read the incoming message + msg = await stream.read(MAX_READ_LEN) + if msg: + logger.info(f"Received message: {msg.decode()}") + + # Send a response + response = f"Hello! This is {stream.get_protocol().local_peer_id}".encode() + await stream.write(response) + logger.info(f"Sent response to {remote_peer_id}") + except Exception as e: + logger.error(f"Error handling stream: {e}") + finally: + await stream.close() + + +async def setup_relay_node(port: int, seed: int | None = None) -> None: + """Set up and run a relay node.""" + logger.info("Starting relay node...") + + # Create host with a fixed key if seed is provided + key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) + host = new_host(key_pair=key_pair) + + # Configure the relay + limits = RelayLimits( + duration=3600, # 1 hour + data=1024 * 1024 * 100, # 100 MB + max_circuit_conns=10, + max_reservations=5, + ) + + relay_config = RelayConfig( + enable_hop=True, # Act as a relay + enable_stop=True, # Accept relayed connections + enable_client=True, # Use other relays if needed + limits=limits, + ) + + # Initialize the protocol + protocol = CircuitV2Protocol(host, limits=limits, allow_hop=True) + + # Start the host + listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + + async with host.run(listen_addrs=[listen_addr]): + # Print information about this node + peer_id = host.get_id() + logger.info(f"Relay node started with ID: {peer_id}") + + addrs = host.get_addrs() + for addr in addrs: + logger.info(f"Listening on: {addr}") + + # Register our example protocol handler + host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) + + # Start the relay protocol service + async with background_trio_service(protocol): + logger.info("Circuit relay protocol started") + + # Create and register the transport + transport = CircuitV2Transport(host, protocol, relay_config) + logger.info("Circuit relay transport initialized") + + print("\nRelay node is running. Use the following address to connect:") + print(f"{addrs[0]}/p2p/{peer_id}") + print("\nPress Ctrl+C to exit\n") + + # Keep the relay running + await trio.sleep_forever() + + +async def setup_destination_node(port: int, relay_addr: str, seed: int | None = None) -> None: + """Set up and run a destination node that accepts incoming connections.""" + logger.info("Starting destination node...") + + # Create host with a fixed key if seed is provided + key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) + host = new_host(key_pair=key_pair) + + # Configure the circuit relay client + limits = RelayLimits( + duration=3600, # 1 hour + data=1024 * 1024 * 100, # 100 MB + max_circuit_conns=10, + max_reservations=5, + ) + + relay_config = RelayConfig( + enable_hop=False, # Not acting as a relay + enable_stop=True, # Accept relayed connections + enable_client=True, # Use relays for dialing + limits=limits, + ) + + # Initialize the protocol + protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False) + + # Start the host + listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + + async with host.run(listen_addrs=[listen_addr]): + # Print information about this node + peer_id = host.get_id() + logger.info(f"Destination node started with ID: {peer_id}") + + addrs = host.get_addrs() + for addr in addrs: + logger.info(f"Listening on: {addr}") + + # Register our example protocol handler + host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) + + # Start the relay protocol service + async with background_trio_service(protocol): + logger.info("Circuit relay protocol started") + + # Create and initialize transport + transport = CircuitV2Transport(host, protocol, relay_config) + + # Create discovery service + discovery = RelayDiscovery(host, auto_reserve=True) + transport.discovery = discovery + + # Start discovery service + async with background_trio_service(discovery): + logger.info("Relay discovery service started") + + # Connect to the relay + if relay_addr: + logger.info(f"Connecting to relay at {relay_addr}") + try: + # Handle both peer ID only or full multiaddr formats + if relay_addr.startswith("/"): + # Full multiaddr format + relay_maddr = multiaddr.Multiaddr(relay_addr) + relay_info = info_from_p2p_addr(relay_maddr) + else: + # Assume it's just a peer ID + relay_peer_id = ID.from_base58(relay_addr) + relay_info = PeerInfo(relay_peer_id, [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}")]) + logger.info(f"Using constructed address: {relay_info.addrs[0]}") + + await host.connect(relay_info) + logger.info(f"Connected to relay {relay_info.peer_id}") + except Exception as e: + logger.error(f"Failed to connect to relay: {e}") + return + + print("\nDestination node is running with peer ID:") + print(f"{peer_id}") + print("\nPress Ctrl+C to exit\n") + + # Keep the node running + await trio.sleep_forever() + + +async def setup_source_node(relay_addr: str, dest_id: str, seed: int | None = None) -> None: + """Set up and run a source node that connects to the destination through the relay.""" + logger.info("Starting source node...") + + if not relay_addr: + logger.error("Relay address is required for source mode") + return + + if not dest_id: + logger.error("Destination peer ID is required for source mode") + return + + # Create host with a fixed key if seed is provided + key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) + host = new_host(key_pair=key_pair) + + # Configure the circuit relay client + limits = RelayLimits( + duration=3600, # 1 hour + data=1024 * 1024 * 100, # 100 MB + max_circuit_conns=10, + max_reservations=5, + ) + + relay_config = RelayConfig( + enable_hop=False, # Not acting as a relay + enable_stop=True, # Accept relayed connections + enable_client=True, # Use relays for dialing + limits=limits, + ) + + # Initialize the protocol + protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False) + + # Start the host + async with host.run(listen_addrs=[multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/0")]): # Use ephemeral port + # Print information about this node + peer_id = host.get_id() + logger.info(f"Source node started with ID: {peer_id}") + + # Get assigned address for debugging + addrs = host.get_addrs() + if addrs: + logger.info(f"Source node listening on: {addrs[0]}") + + # Start the relay protocol service + async with background_trio_service(protocol): + logger.info("Circuit relay protocol started") + + # Create and initialize transport + transport = CircuitV2Transport(host, protocol, relay_config) + + # Create discovery service + discovery = RelayDiscovery(host, auto_reserve=True) + transport.discovery = discovery + + # Start discovery service + async with background_trio_service(discovery): + logger.info("Relay discovery service started") + + # Connect to the relay + logger.info(f"Connecting to relay at {relay_addr}") + try: + # Handle both peer ID only or full multiaddr formats + if relay_addr.startswith("/"): + # Full multiaddr format + relay_maddr = multiaddr.Multiaddr(relay_addr) + relay_info = info_from_p2p_addr(relay_maddr) + else: + # Assume it's just a peer ID + relay_peer_id = ID.from_base58(relay_addr) + relay_info = PeerInfo(relay_peer_id, [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}")]) + logger.info(f"Using constructed address: {relay_info.addrs[0]}") + + await host.connect(relay_info) + logger.info(f"Connected to relay {relay_info.peer_id}") + + # Wait for relay discovery to find the relay + await trio.sleep(2) + + # Convert destination ID string to peer ID + dest_peer_id = ID.from_base58(dest_id) + + # Try to connect to the destination through the relay + logger.info(f"Connecting to destination {dest_peer_id} through relay") + + # Create peer info with relay + relay_peer_id = relay_info.peer_id + logger.info(f"This is the relay peer id: {relay_peer_id}") + + # Create a proper peer info with a relay address + # The destination peer should be reachable through a p2p-circuit address + # Format: /p2p-circuit/p2p/DESTINATION_PEER_ID + circuit_addr = multiaddr.Multiaddr(f"/p2p-circuit/p2p/{dest_id}") + dest_peer_info = PeerInfo(dest_peer_id, [circuit_addr]) + logger.info(f"This is the dest peer info: {dest_peer_info}") + + # Dial through the relay + try: + logger.info(f"Attempting to dial destination {dest_peer_id} through relay {relay_peer_id}") + + connection = await transport.dial_peer_info( + dest_peer_info, relay_peer_id=relay_peer_id + ) + + logger.info(f"This is the dial connection: {connection}") + + logger.info(f"Successfully connected to destination through relay!") + + # Open a stream to our example protocol + stream = await host.new_stream(dest_peer_id, [EXAMPLE_PROTOCOL_ID]) + if stream: + logger.info(f"Opened stream to destination with protocol {EXAMPLE_PROTOCOL_ID}") + + # Send a message + msg = f"Hello from {peer_id}!".encode() + await stream.write(msg) + logger.info(f"Sent message to destination") + + # Wait for response + response = await stream.read(MAX_READ_LEN) + logger.info(f"Received response: {response.decode() if response else 'No response'}") + + # Close the stream + await stream.close() + else: + logger.error("Failed to open stream to destination") + except Exception as e: + logger.error(f"Failed to dial through relay: {str(e)}") + logger.error(f"Exception type: {type(e).__name__}") + raise + + except Exception as e: + logger.error(f"Error: {e}") + + print("\nSource operation completed") + # Keep running for a bit to allow messages to be processed + await trio.sleep(5) + + +def generate_fixed_private_key(seed: int) -> bytes: + """Generate a fixed private key from a seed for reproducible peer IDs.""" + if seed is None: + return None + + import random + random.seed(seed) + return random.getrandbits(32 * 8).to_bytes(length=32, byteorder="big") + + +def main() -> None: + """Parse arguments and run the appropriate node type.""" + parser = argparse.ArgumentParser(description="Circuit Relay v2 Example") + parser.add_argument( + "--role", + type=str, + choices=["relay", "source", "destination"], + required=True, + help="Node role (relay, source, or destination)", + ) + parser.add_argument( + "--port", + type=int, + default=0, + help="Port to listen on (for relay and destination nodes)", + ) + parser.add_argument( + "--relay-addr", + type=str, + help="Multiaddress or peer ID of relay node (for destination and source nodes)", + ) + parser.add_argument( + "--dest-id", + type=str, + help="Peer ID of destination node (for source node)", + ) + parser.add_argument( + "--seed", + type=int, + help="Random seed for reproducible peer IDs", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging", + ) + + args = parser.parse_args() + + # Set log level + if args.debug: + logging.getLogger().setLevel(logging.DEBUG) + logging.getLogger("libp2p").setLevel(logging.DEBUG) + + try: + if args.role == "relay": + trio.run(setup_relay_node, args.port, args.seed) + elif args.role == "destination": + if not args.relay_addr: + parser.error("--relay-addr is required for destination role") + trio.run(setup_destination_node, args.port, args.relay_addr, args.seed) + elif args.role == "source": + if not args.relay_addr or not args.dest_id: + parser.error("--relay-addr and --dest-id are required for source role") + trio.run(setup_source_node, args.relay_addr, args.dest_id, args.seed) + except KeyboardInterrupt: + print("\nExiting...") + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/libp2p/relay/circuit_v2/transport.py b/libp2p/relay/circuit_v2/transport.py index ffd310902..722af8fe4 100644 --- a/libp2p/relay/circuit_v2/transport.py +++ b/libp2p/relay/circuit_v2/transport.py @@ -152,6 +152,8 @@ async def dial_peer_info( If the connection cannot be established """ + print(f"1. inside the dial_peer_info is the relay peer id: {relay_peer_id}") + print(f"2. inside the dial_peer_info is the peer info: {peer_info}") # If no specific relay is provided, try to find one if relay_peer_id is None: relay_peer_id = await self._select_relay(peer_info) @@ -160,12 +162,14 @@ async def dial_peer_info( # Get a stream to the relay relay_stream = await self.host.new_stream(relay_peer_id, [PROTOCOL_ID]) + print(f"3. inside the dial_peer_info is the relay stream: {relay_stream}") if not relay_stream: raise ConnectionError(f"Could not open stream to relay {relay_peer_id}") try: # First try to make a reservation if enabled if self.config.enable_client: + print(f"4. inside the dial_peer_info is the relay stream: {relay_stream}") success = await self._make_reservation(relay_stream, relay_peer_id) if not success: logger.warning( @@ -177,7 +181,9 @@ async def dial_peer_info( type=HopMessage.CONNECT, peer=peer_info.peer_id.to_bytes(), ) + print(f"5. inside the dial_peer_info is the hop msg: {hop_msg}") await relay_stream.write(hop_msg.SerializeToString()) + print(f"6. inside the dial_peer_info is the relay stream: {relay_stream}") # Read response resp_bytes = await relay_stream.read() From 20209756fc7c9aacd2cad60d9a37be2dd9cb77da Mon Sep 17 00:00:00 2001 From: Winter-Soren Date: Sun, 13 Jul 2025 01:46:38 +0530 Subject: [PATCH 2/7] added more loggers and added protocol handlers --- examples/circuit_relay/relay_example.py | 12 +- libp2p/relay/circuit_v2/protocol.py | 193 ++++++++---------------- libp2p/relay/circuit_v2/transport.py | 64 +++++--- libp2p/transport/tcp/tcp.py | 6 + 4 files changed, 124 insertions(+), 151 deletions(-) diff --git a/examples/circuit_relay/relay_example.py b/examples/circuit_relay/relay_example.py index 1b8e2670d..9ecbc2f36 100644 --- a/examples/circuit_relay/relay_example.py +++ b/examples/circuit_relay/relay_example.py @@ -33,7 +33,7 @@ from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr from libp2p.relay.circuit_v2.config import RelayConfig from libp2p.relay.circuit_v2.discovery import RelayDiscovery -from libp2p.relay.circuit_v2.protocol import CircuitV2Protocol, PROTOCOL_ID as RELAY_PROTOCOL_ID +from libp2p.relay.circuit_v2.protocol import CircuitV2Protocol, PROTOCOL_ID, STOP_PROTOCOL_ID from libp2p.relay.circuit_v2.resources import RelayLimits from libp2p.relay.circuit_v2.transport import CircuitV2Transport from libp2p.tools.async_service import background_trio_service @@ -109,8 +109,11 @@ async def setup_relay_node(port: int, seed: int | None = None) -> None: for addr in addrs: logger.info(f"Listening on: {addr}") - # Register our example protocol handler + # Register protocol handlers host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) + host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream) + host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream) + logger.debug("Protocol handlers registered") # Start the relay protocol service async with background_trio_service(protocol): @@ -166,8 +169,11 @@ async def setup_destination_node(port: int, relay_addr: str, seed: int | None = for addr in addrs: logger.info(f"Listening on: {addr}") - # Register our example protocol handler + # Register protocol handlers host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) + host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream) + host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream) + logger.debug("Protocol handlers registered") # Start the relay protocol service async with background_trio_service(protocol): diff --git a/libp2p/relay/circuit_v2/protocol.py b/libp2p/relay/circuit_v2/protocol.py index 1cf76efa8..adbbe7efd 100644 --- a/libp2p/relay/circuit_v2/protocol.py +++ b/libp2p/relay/circuit_v2/protocol.py @@ -70,7 +70,7 @@ STREAM_READ_TIMEOUT = 15 # seconds STREAM_WRITE_TIMEOUT = 15 # seconds STREAM_CLOSE_TIMEOUT = 10 # seconds -MAX_READ_RETRIES = 5 # Maximum number of read retries +MAX_READ_RETRIES = 2 # Reduced retries to avoid masking real issues # Extended interfaces for type checking @@ -276,6 +276,7 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: This handler processes relay requests from other peers. """ + logger.debug("=== HOP STREAM HANDLER CALLED ===") try: # Try to get peer ID first try: @@ -290,121 +291,64 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: logger.debug("Handling hop stream from %s", remote_id) - # First, handle the read timeout gracefully - try: - with trio.fail_after( - STREAM_READ_TIMEOUT * 2 - ): # Double the timeout for reading - msg_bytes = await stream.read() - if not msg_bytes: - logger.error( - "Empty read from stream from %s", - remote_id, - ) - # Create a proto Status directly - pb_status = PbStatus() - pb_status.code = cast(Any, int(StatusCode.MALFORMED_MESSAGE)) - pb_status.message = "Empty message received" - - response = HopMessage( - type=HopMessage.STATUS, - status=pb_status, - ) - await stream.write(response.SerializeToString()) - await trio.sleep(0.5) # Longer wait to ensure message is sent - return - except trio.TooSlowError: - logger.error( - "Timeout reading from hop stream from %s", - remote_id, - ) - # Create a proto Status directly - pb_status = PbStatus() - pb_status.code = cast(Any, int(StatusCode.CONNECTION_FAILED)) - pb_status.message = "Stream read timeout" - - response = HopMessage( - type=HopMessage.STATUS, - status=pb_status, - ) - await stream.write(response.SerializeToString()) - await trio.sleep(0.5) # Longer wait to ensure the message is sent - return - except Exception as e: - logger.error( - "Error reading from hop stream from %s: %s", - remote_id, - str(e), - ) - # Create a proto Status directly - pb_status = PbStatus() - pb_status.code = cast(Any, int(StatusCode.MALFORMED_MESSAGE)) - pb_status.message = f"Read error: {str(e)}" - - response = HopMessage( - type=HopMessage.STATUS, - status=pb_status, - ) - await stream.write(response.SerializeToString()) - await trio.sleep(0.5) # Longer wait to ensure the message is sent - return - - # Parse the message - try: - hop_msg = HopMessage() - hop_msg.ParseFromString(msg_bytes) - except Exception as e: - logger.error( - "Error parsing hop message from %s: %s", - remote_id, - str(e), - ) - # Create a proto Status directly - pb_status = PbStatus() - pb_status.code = cast(Any, int(StatusCode.MALFORMED_MESSAGE)) - pb_status.message = f"Parse error: {str(e)}" - - response = HopMessage( - type=HopMessage.STATUS, - status=pb_status, - ) - await stream.write(response.SerializeToString()) - await trio.sleep(0.5) # Longer wait to ensure the message is sent - return + # Handle multiple messages on the same stream + while True: + # Read message with timeout + try: + with trio.fail_after(STREAM_READ_TIMEOUT): + msg_bytes = await stream.read() + if not msg_bytes: + logger.debug("Stream closed by peer %s", remote_id) + return + except trio.TooSlowError: + logger.error("Timeout reading from hop stream from %s", remote_id) + return + except Exception as e: + logger.error("Error reading from hop stream from %s: %s", remote_id, str(e)) + return - # Process based on message type - if hop_msg.type == HopMessage.RESERVE: - logger.debug("Handling RESERVE message from %s", remote_id) - await self._handle_reserve(stream, hop_msg) - # For RESERVE requests, let the client close the stream - return - elif hop_msg.type == HopMessage.CONNECT: - logger.debug("Handling CONNECT message from %s", remote_id) - await self._handle_connect(stream, hop_msg) - else: - logger.error("Invalid message type %d from %s", hop_msg.type, remote_id) - # Send a nice error response using _send_status method - await self._send_status( - stream, - StatusCode.MALFORMED_MESSAGE, - f"Invalid message type: {hop_msg.type}", - ) + # Parse the message + try: + hop_msg = HopMessage() + hop_msg.ParseFromString(msg_bytes) + except Exception as e: + logger.error("Error parsing hop message from %s: %s", remote_id, str(e)) + await self._send_status( + stream, + StatusCode.MALFORMED_MESSAGE, + f"Parse error: {str(e)}", + ) + return + + # Process based on message type + if hop_msg.type == HopMessage.RESERVE: + logger.debug("Handling RESERVE message from %s", remote_id) + await self._handle_reserve(stream, hop_msg) + # Continue reading for more messages + elif hop_msg.type == HopMessage.CONNECT: + logger.debug("Handling CONNECT message from %s", remote_id) + await self._handle_connect(stream, hop_msg) + # CONNECT establishes a circuit, so we're done with this stream + return + else: + logger.error("Invalid message type %d from %s", hop_msg.type, remote_id) + await self._send_status( + stream, + StatusCode.MALFORMED_MESSAGE, + f"Invalid message type: {hop_msg.type}", + ) + return except Exception as e: - logger.error( - "Unexpected error handling hop stream from %s: %s", remote_id, str(e) - ) + logger.error("Unexpected error handling hop stream from %s: %s", remote_id, str(e)) try: - # Send a nice error response using _send_status method await self._send_status( stream, StatusCode.MALFORMED_MESSAGE, f"Internal error: {str(e)}", ) except Exception as e2: - logger.error( - "Failed to send error response to %s: %s", remote_id, str(e2) - ) + logger.error("Failed to send error response to %s: %s", remote_id, str(e2)) async def _handle_stop_stream(self, stream: INetStream) -> None: """ @@ -536,12 +480,8 @@ async def _handle_reserve(self, stream: INetStream, msg: Any) -> None: ttl, ) - # Send the response with increased timeout + # Send the response await stream.write(response.SerializeToString()) - - # Add a small wait to ensure the message is fully sent - await trio.sleep(0.1) - logger.debug("Reservation response sent successfully") except Exception as e: @@ -556,18 +496,11 @@ async def _handle_reserve(self, stream: INetStream, msg: Any) -> None: ) except Exception as send_err: logger.error("Failed to send error response: %s", str(send_err)) - finally: - # Always close the stream when done with reservation - if cast(INetStreamWithExtras, stream).is_open(): - try: - with trio.fail_after(STREAM_CLOSE_TIMEOUT): - await stream.close() - except Exception as close_err: - logger.error("Error closing stream: %s", str(close_err)) async def _handle_connect(self, stream: INetStream, msg: Any) -> None: """Handle a connect request.""" peer_id = ID(msg.peer) + logger.debug("Handling CONNECT request for peer %s", peer_id) dst_stream: INetStream | None = None # Verify reservation if provided @@ -594,12 +527,15 @@ async def _handle_connect(self, stream: INetStream, msg: Any) -> None: try: # Store the source stream with properly typed None self._active_relays[peer_id] = (stream, None) + logger.debug("Stored source stream for peer %s", peer_id) # Try to connect to the destination with timeout with trio.fail_after(STREAM_READ_TIMEOUT): + logger.debug("Attempting to connect to destination %s", peer_id) dst_stream = await self.host.new_stream(peer_id, [STOP_PROTOCOL_ID]) if not dst_stream: raise ConnectionError("Could not connect to destination") + logger.debug("Successfully connected to destination %s", peer_id) # Send STOP CONNECT message stop_msg = StopMessage( @@ -640,6 +576,7 @@ async def _handle_connect(self, stream: INetStream, msg: Any) -> None: reservation.active_connections += 1 # Send success status + logger.debug("Sending OK status to source") await self._send_status( stream, StatusCode.OK, @@ -653,6 +590,7 @@ async def _handle_connect(self, stream: INetStream, msg: Any) -> None: except (trio.TooSlowError, ConnectionError) as e: logger.error("Error establishing relay connection: %s", str(e)) + logger.debug("Sending CONNECTION_FAILED status to source") await self._send_status( stream, StatusCode.CONNECTION_FAILED, @@ -730,9 +668,11 @@ async def _relay_data( logger.error("Error relaying data: %s", str(e)) finally: # Clean up streams and remove from active relays - await src_stream.reset() - await dst_stream.reset() + # Only reset streams once to avoid double-reset issues if peer_id in self._active_relays: + src_stream_cleanup, dst_stream_cleanup = self._active_relays[peer_id] + await self._close_stream(src_stream_cleanup) + await self._close_stream(dst_stream_cleanup) del self._active_relays[peer_id] async def _send_status( @@ -744,7 +684,7 @@ async def _send_status( """Send a status message.""" try: logger.debug("Sending status message with code %s: %s", code, message) - with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout + with trio.fail_after(STREAM_WRITE_TIMEOUT): # Create a proto Status directly pb_status = PbStatus() pb_status.code = cast( @@ -761,11 +701,7 @@ async def _send_status( logger.debug("Status message serialized (%d bytes)", len(msg_bytes)) await stream.write(msg_bytes) - logger.debug("Status message sent, waiting for processing") - - # Wait longer to ensure the message is sent - await trio.sleep(1.5) - logger.debug("Status message sending completed") + logger.debug("Status message sent successfully") except trio.TooSlowError: logger.error( "Timeout sending status message: code=%s, message=%s", code, message @@ -782,7 +718,7 @@ async def _send_stop_status( """Send a status message on a STOP stream.""" try: logger.debug("Sending stop status message with code %s: %s", code, message) - with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout + with trio.fail_after(STREAM_WRITE_TIMEOUT): # Create a proto Status directly pb_status = PbStatus() pb_status.code = cast( @@ -795,6 +731,5 @@ async def _send_stop_status( status=pb_status, ) await stream.write(status_msg.SerializeToString()) - await trio.sleep(0.5) # Ensure message is sent except Exception as e: logger.error("Error sending stop status message: %s", str(e)) diff --git a/libp2p/relay/circuit_v2/transport.py b/libp2p/relay/circuit_v2/transport.py index 722af8fe4..b33f7e467 100644 --- a/libp2p/relay/circuit_v2/transport.py +++ b/libp2p/relay/circuit_v2/transport.py @@ -45,6 +45,7 @@ from .protocol import ( PROTOCOL_ID, CircuitV2Protocol, + STREAM_READ_TIMEOUT, ) from .protocol_buffer import ( StatusCode, @@ -152,8 +153,6 @@ async def dial_peer_info( If the connection cannot be established """ - print(f"1. inside the dial_peer_info is the relay peer id: {relay_peer_id}") - print(f"2. inside the dial_peer_info is the peer info: {peer_info}") # If no specific relay is provided, try to find one if relay_peer_id is None: relay_peer_id = await self._select_relay(peer_info) @@ -161,15 +160,19 @@ async def dial_peer_info( raise ConnectionError("No suitable relay found") # Get a stream to the relay - relay_stream = await self.host.new_stream(relay_peer_id, [PROTOCOL_ID]) - print(f"3. inside the dial_peer_info is the relay stream: {relay_stream}") - if not relay_stream: - raise ConnectionError(f"Could not open stream to relay {relay_peer_id}") + try: + logger.debug("Opening stream to relay %s with protocol %s", relay_peer_id, PROTOCOL_ID) + relay_stream = await self.host.new_stream(relay_peer_id, [PROTOCOL_ID]) + if not relay_stream: + raise ConnectionError(f"Could not open stream to relay {relay_peer_id}") + logger.debug("Successfully opened stream to relay %s", relay_peer_id) + except Exception as e: + logger.error("Failed to open stream to relay %s: %s", relay_peer_id, str(e)) + raise ConnectionError(f"Could not open stream to relay {relay_peer_id}: {str(e)}") try: # First try to make a reservation if enabled if self.config.enable_client: - print(f"4. inside the dial_peer_info is the relay stream: {relay_stream}") success = await self._make_reservation(relay_stream, relay_peer_id) if not success: logger.warning( @@ -181,14 +184,13 @@ async def dial_peer_info( type=HopMessage.CONNECT, peer=peer_info.peer_id.to_bytes(), ) - print(f"5. inside the dial_peer_info is the hop msg: {hop_msg}") await relay_stream.write(hop_msg.SerializeToString()) - print(f"6. inside the dial_peer_info is the relay stream: {relay_stream}") - # Read response - resp_bytes = await relay_stream.read() - resp = HopMessage() - resp.ParseFromString(resp_bytes) + # Read response with timeout + with trio.fail_after(STREAM_READ_TIMEOUT): + resp_bytes = await relay_stream.read() + resp = HopMessage() + resp.ParseFromString(resp_bytes) # Access status attributes directly status_code = getattr(resp.status, "code", StatusCode.OK) @@ -262,17 +264,41 @@ async def _make_reservation( type=HopMessage.RESERVE, peer=self.host.get_id().to_bytes(), ) - await stream.write(reserve_msg.SerializeToString()) - - # Read response - resp_bytes = await stream.read() - resp = HopMessage() - resp.ParseFromString(resp_bytes) + logger.debug("=== SENDING RESERVATION REQUEST ===") + logger.debug("Message type: %s", reserve_msg.type) + logger.debug("Peer ID: %s", self.host.get_id()) + logger.debug("Raw message: %s", reserve_msg) + + try: + await stream.write(reserve_msg.SerializeToString()) + logger.debug("Successfully sent reservation request") + except Exception as e: + logger.error("Failed to send reservation request: %s", str(e)) + raise + + # Read response with timeout + logger.debug("=== WAITING FOR RESERVATION RESPONSE ===") + with trio.fail_after(STREAM_READ_TIMEOUT): + try: + resp_bytes = await stream.read() + logger.debug("Received reservation response: %d bytes", len(resp_bytes)) + resp = HopMessage() + resp.ParseFromString(resp_bytes) + logger.debug("=== PARSED RESERVATION RESPONSE ===") + logger.debug("Message type: %s", resp.type) + logger.debug("Status code: %s", getattr(resp.status, "code", "unknown")) + logger.debug("Status message: %s", getattr(resp.status, "message", "unknown")) + logger.debug("Raw response: %s", resp) + except Exception as e: + logger.error("Failed to read/parse reservation response: %s", str(e)) + raise # Access status attributes directly status_code = getattr(resp.status, "code", StatusCode.OK) status_msg = getattr(resp.status, "message", "Unknown error") + logger.debug("Reservation response: code=%s, message=%s", status_code, status_msg) + if status_code != StatusCode.OK: logger.warning( "Reservation failed with relay %s: %s", diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 1598ea42a..81477c37f 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -160,15 +160,21 @@ async def dial(self, maddr: Multiaddr) -> IRawConnection: try: # trio.open_tcp_stream requires host to be str or bytes, not None. + logger.debug("=== OPENING TCP STREAM ===") + logger.debug("Host: %s", host_str) + logger.debug("Port: %d", port_int) stream = await trio.open_tcp_stream(host_str, port_int) + logger.debug("Successfully opened TCP stream") except OSError as error: # OSError is common for network issues like "Connection refused" # or "Host unreachable". + logger.error("Failed to open TCP stream: %s", error) raise OpenConnectionError( f"Failed to open TCP stream to {maddr}: {error}" ) from error except Exception as error: # Catch other potential errors from trio.open_tcp_stream and wrap them. + logger.error("Unexpected error opening TCP stream: %s", error) raise OpenConnectionError( f"An unexpected error occurred when dialing {maddr}: {error}" ) from error From 1f475ccbb6ca016c3877c2b299466e27539ea7f2 Mon Sep 17 00:00:00 2001 From: Winter-Soren Date: Sun, 13 Jul 2025 01:53:39 +0530 Subject: [PATCH 3/7] fixed pyrefly pre-commithook issue --- examples/circuit_relay/__init__.py | 2 +- examples/circuit_relay/relay_example.py | 209 ++++++++++++++---------- libp2p/relay/circuit_v2/protocol.py | 20 ++- libp2p/relay/circuit_v2/transport.py | 34 +++- 4 files changed, 166 insertions(+), 99 deletions(-) diff --git a/examples/circuit_relay/__init__.py b/examples/circuit_relay/__init__.py index 48f0a5f4a..ccab8b932 100644 --- a/examples/circuit_relay/__init__.py +++ b/examples/circuit_relay/__init__.py @@ -2,4 +2,4 @@ Circuit Relay v2 example module. This package demonstrates the usage of Circuit Relay v2 protocol in libp2p. -""" \ No newline at end of file +""" diff --git a/examples/circuit_relay/relay_example.py b/examples/circuit_relay/relay_example.py index 9ecbc2f36..888b20357 100644 --- a/examples/circuit_relay/relay_example.py +++ b/examples/circuit_relay/relay_example.py @@ -20,7 +20,6 @@ import argparse import logging import sys -from typing import Any import multiaddr import trio @@ -33,7 +32,11 @@ from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr from libp2p.relay.circuit_v2.config import RelayConfig from libp2p.relay.circuit_v2.discovery import RelayDiscovery -from libp2p.relay.circuit_v2.protocol import CircuitV2Protocol, PROTOCOL_ID, STOP_PROTOCOL_ID +from libp2p.relay.circuit_v2.protocol import ( + PROTOCOL_ID, + STOP_PROTOCOL_ID, + CircuitV2Protocol, +) from libp2p.relay.circuit_v2.resources import RelayLimits from libp2p.relay.circuit_v2.transport import CircuitV2Transport from libp2p.tools.async_service import background_trio_service @@ -52,17 +55,19 @@ async def handle_example_protocol(stream: INetStream) -> None: """Handle incoming messages on our example protocol.""" - remote_peer_id = stream.get_protocol().remote_peer_id + remote_peer_id = stream.muxed_conn.peer_id logger.info(f"New stream from peer: {remote_peer_id}") - + try: # Read the incoming message msg = await stream.read(MAX_READ_LEN) if msg: logger.info(f"Received message: {msg.decode()}") - + # Send a response - response = f"Hello! This is {stream.get_protocol().local_peer_id}".encode() + # Get the local peer ID from the secure connection + local_peer_id = stream.muxed_conn.peer_id + response = f"Hello! This is {local_peer_id}".encode() await stream.write(response) logger.info(f"Sent response to {remote_peer_id}") except Exception as e: @@ -74,11 +79,11 @@ async def handle_example_protocol(stream: INetStream) -> None: async def setup_relay_node(port: int, seed: int | None = None) -> None: """Set up and run a relay node.""" logger.info("Starting relay node...") - + # Create host with a fixed key if seed is provided key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) host = new_host(key_pair=key_pair) - + # Configure the relay limits = RelayLimits( duration=3600, # 1 hour @@ -86,59 +91,61 @@ async def setup_relay_node(port: int, seed: int | None = None) -> None: max_circuit_conns=10, max_reservations=5, ) - + relay_config = RelayConfig( enable_hop=True, # Act as a relay enable_stop=True, # Accept relayed connections enable_client=True, # Use other relays if needed limits=limits, ) - + # Initialize the protocol protocol = CircuitV2Protocol(host, limits=limits, allow_hop=True) - + # Start the host listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") - + async with host.run(listen_addrs=[listen_addr]): # Print information about this node peer_id = host.get_id() logger.info(f"Relay node started with ID: {peer_id}") - + addrs = host.get_addrs() for addr in addrs: logger.info(f"Listening on: {addr}") - + # Register protocol handlers host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream) host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream) logger.debug("Protocol handlers registered") - + # Start the relay protocol service async with background_trio_service(protocol): logger.info("Circuit relay protocol started") - + # Create and register the transport transport = CircuitV2Transport(host, protocol, relay_config) logger.info("Circuit relay transport initialized") - + print("\nRelay node is running. Use the following address to connect:") print(f"{addrs[0]}/p2p/{peer_id}") print("\nPress Ctrl+C to exit\n") - + # Keep the relay running await trio.sleep_forever() -async def setup_destination_node(port: int, relay_addr: str, seed: int | None = None) -> None: +async def setup_destination_node( + port: int, relay_addr: str, seed: int | None = None +) -> None: """Set up and run a destination node that accepts incoming connections.""" logger.info("Starting destination node...") - + # Create host with a fixed key if seed is provided key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) host = new_host(key_pair=key_pair) - + # Configure the circuit relay client limits = RelayLimits( duration=3600, # 1 hour @@ -146,50 +153,50 @@ async def setup_destination_node(port: int, relay_addr: str, seed: int | None = max_circuit_conns=10, max_reservations=5, ) - + relay_config = RelayConfig( enable_hop=False, # Not acting as a relay enable_stop=True, # Accept relayed connections enable_client=True, # Use relays for dialing limits=limits, ) - + # Initialize the protocol protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False) - + # Start the host listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") - + async with host.run(listen_addrs=[listen_addr]): # Print information about this node peer_id = host.get_id() logger.info(f"Destination node started with ID: {peer_id}") - + addrs = host.get_addrs() for addr in addrs: logger.info(f"Listening on: {addr}") - + # Register protocol handlers host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream) host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream) logger.debug("Protocol handlers registered") - + # Start the relay protocol service async with background_trio_service(protocol): logger.info("Circuit relay protocol started") - + # Create and initialize transport transport = CircuitV2Transport(host, protocol, relay_config) - + # Create discovery service discovery = RelayDiscovery(host, auto_reserve=True) transport.discovery = discovery - + # Start discovery service async with background_trio_service(discovery): logger.info("Relay discovery service started") - + # Connect to the relay if relay_addr: logger.info(f"Connecting to relay at {relay_addr}") @@ -202,39 +209,50 @@ async def setup_destination_node(port: int, relay_addr: str, seed: int | None = else: # Assume it's just a peer ID relay_peer_id = ID.from_base58(relay_addr) - relay_info = PeerInfo(relay_peer_id, [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}")]) - logger.info(f"Using constructed address: {relay_info.addrs[0]}") - + relay_info = PeerInfo( + relay_peer_id, + [ + multiaddr.Multiaddr( + f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}" + ) + ], + ) + logger.info( + f"Using constructed address: {relay_info.addrs[0]}" + ) + await host.connect(relay_info) logger.info(f"Connected to relay {relay_info.peer_id}") except Exception as e: logger.error(f"Failed to connect to relay: {e}") return - + print("\nDestination node is running with peer ID:") print(f"{peer_id}") print("\nPress Ctrl+C to exit\n") - + # Keep the node running await trio.sleep_forever() -async def setup_source_node(relay_addr: str, dest_id: str, seed: int | None = None) -> None: +async def setup_source_node( + relay_addr: str, dest_id: str, seed: int | None = None +) -> None: """Set up and run a source node that connects to the destination through the relay.""" logger.info("Starting source node...") - + if not relay_addr: logger.error("Relay address is required for source mode") return - + if not dest_id: logger.error("Destination peer ID is required for source mode") return - + # Create host with a fixed key if seed is provided key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) host = new_host(key_pair=key_pair) - + # Configure the circuit relay client limits = RelayLimits( duration=3600, # 1 hour @@ -242,43 +260,45 @@ async def setup_source_node(relay_addr: str, dest_id: str, seed: int | None = No max_circuit_conns=10, max_reservations=5, ) - + relay_config = RelayConfig( enable_hop=False, # Not acting as a relay enable_stop=True, # Accept relayed connections enable_client=True, # Use relays for dialing limits=limits, ) - + # Initialize the protocol protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False) - + # Start the host - async with host.run(listen_addrs=[multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/0")]): # Use ephemeral port + async with host.run( + listen_addrs=[multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0")] + ): # Use ephemeral port # Print information about this node peer_id = host.get_id() logger.info(f"Source node started with ID: {peer_id}") - + # Get assigned address for debugging addrs = host.get_addrs() if addrs: logger.info(f"Source node listening on: {addrs[0]}") - + # Start the relay protocol service async with background_trio_service(protocol): logger.info("Circuit relay protocol started") - + # Create and initialize transport transport = CircuitV2Transport(host, protocol, relay_config) - + # Create discovery service discovery = RelayDiscovery(host, auto_reserve=True) transport.discovery = discovery - + # Start discovery service async with background_trio_service(discovery): logger.info("Relay discovery service started") - + # Connect to the relay logger.info(f"Connecting to relay at {relay_addr}") try: @@ -290,58 +310,77 @@ async def setup_source_node(relay_addr: str, dest_id: str, seed: int | None = No else: # Assume it's just a peer ID relay_peer_id = ID.from_base58(relay_addr) - relay_info = PeerInfo(relay_peer_id, [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}")]) + relay_info = PeerInfo( + relay_peer_id, + [ + multiaddr.Multiaddr( + f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}" + ) + ], + ) logger.info(f"Using constructed address: {relay_info.addrs[0]}") - + await host.connect(relay_info) logger.info(f"Connected to relay {relay_info.peer_id}") - + # Wait for relay discovery to find the relay await trio.sleep(2) - + # Convert destination ID string to peer ID dest_peer_id = ID.from_base58(dest_id) - + # Try to connect to the destination through the relay - logger.info(f"Connecting to destination {dest_peer_id} through relay") - + logger.info( + f"Connecting to destination {dest_peer_id} through relay" + ) + # Create peer info with relay relay_peer_id = relay_info.peer_id logger.info(f"This is the relay peer id: {relay_peer_id}") - + # Create a proper peer info with a relay address # The destination peer should be reachable through a p2p-circuit address # Format: /p2p-circuit/p2p/DESTINATION_PEER_ID circuit_addr = multiaddr.Multiaddr(f"/p2p-circuit/p2p/{dest_id}") dest_peer_info = PeerInfo(dest_peer_id, [circuit_addr]) logger.info(f"This is the dest peer info: {dest_peer_info}") - + # Dial through the relay try: - logger.info(f"Attempting to dial destination {dest_peer_id} through relay {relay_peer_id}") - + logger.info( + f"Attempting to dial destination {dest_peer_id} through relay {relay_peer_id}" + ) + connection = await transport.dial_peer_info( dest_peer_info, relay_peer_id=relay_peer_id ) - + logger.info(f"This is the dial connection: {connection}") - - logger.info(f"Successfully connected to destination through relay!") - + + logger.info( + "Successfully connected to destination through relay!" + ) + # Open a stream to our example protocol - stream = await host.new_stream(dest_peer_id, [EXAMPLE_PROTOCOL_ID]) + stream = await host.new_stream( + dest_peer_id, [EXAMPLE_PROTOCOL_ID] + ) if stream: - logger.info(f"Opened stream to destination with protocol {EXAMPLE_PROTOCOL_ID}") - + logger.info( + f"Opened stream to destination with protocol {EXAMPLE_PROTOCOL_ID}" + ) + # Send a message msg = f"Hello from {peer_id}!".encode() await stream.write(msg) - logger.info(f"Sent message to destination") - + logger.info("Sent message to destination") + # Wait for response response = await stream.read(MAX_READ_LEN) - logger.info(f"Received response: {response.decode() if response else 'No response'}") - + logger.info( + f"Received response: {response.decode() if response else 'No response'}" + ) + # Close the stream await stream.close() else: @@ -350,21 +389,23 @@ async def setup_source_node(relay_addr: str, dest_id: str, seed: int | None = No logger.error(f"Failed to dial through relay: {str(e)}") logger.error(f"Exception type: {type(e).__name__}") raise - + except Exception as e: logger.error(f"Error: {e}") - + print("\nSource operation completed") # Keep running for a bit to allow messages to be processed await trio.sleep(5) -def generate_fixed_private_key(seed: int) -> bytes: +def generate_fixed_private_key(seed: int | None) -> bytes: """Generate a fixed private key from a seed for reproducible peer IDs.""" - if seed is None: - return None - import random + + if seed is None: + # Generate random bytes if no seed provided + return random.getrandbits(32 * 8).to_bytes(length=32, byteorder="big") + random.seed(seed) return random.getrandbits(32 * 8).to_bytes(length=32, byteorder="big") @@ -405,14 +446,14 @@ def main() -> None: action="store_true", help="Enable debug logging", ) - + args = parser.parse_args() - + # Set log level if args.debug: logging.getLogger().setLevel(logging.DEBUG) logging.getLogger("libp2p").setLevel(logging.DEBUG) - + try: if args.role == "relay": trio.run(setup_relay_node, args.port, args.seed) @@ -432,4 +473,4 @@ def main() -> None: if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/libp2p/relay/circuit_v2/protocol.py b/libp2p/relay/circuit_v2/protocol.py index adbbe7efd..fd0cd6994 100644 --- a/libp2p/relay/circuit_v2/protocol.py +++ b/libp2p/relay/circuit_v2/protocol.py @@ -304,7 +304,9 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: logger.error("Timeout reading from hop stream from %s", remote_id) return except Exception as e: - logger.error("Error reading from hop stream from %s: %s", remote_id, str(e)) + logger.error( + "Error reading from hop stream from %s: %s", remote_id, str(e) + ) return # Parse the message @@ -312,7 +314,9 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: hop_msg = HopMessage() hop_msg.ParseFromString(msg_bytes) except Exception as e: - logger.error("Error parsing hop message from %s: %s", remote_id, str(e)) + logger.error( + "Error parsing hop message from %s: %s", remote_id, str(e) + ) await self._send_status( stream, StatusCode.MALFORMED_MESSAGE, @@ -331,7 +335,9 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: # CONNECT establishes a circuit, so we're done with this stream return else: - logger.error("Invalid message type %d from %s", hop_msg.type, remote_id) + logger.error( + "Invalid message type %d from %s", hop_msg.type, remote_id + ) await self._send_status( stream, StatusCode.MALFORMED_MESSAGE, @@ -340,7 +346,9 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: return except Exception as e: - logger.error("Unexpected error handling hop stream from %s: %s", remote_id, str(e)) + logger.error( + "Unexpected error handling hop stream from %s: %s", remote_id, str(e) + ) try: await self._send_status( stream, @@ -348,7 +356,9 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: f"Internal error: {str(e)}", ) except Exception as e2: - logger.error("Failed to send error response to %s: %s", remote_id, str(e2)) + logger.error( + "Failed to send error response to %s: %s", remote_id, str(e2) + ) async def _handle_stop_stream(self, stream: INetStream) -> None: """ diff --git a/libp2p/relay/circuit_v2/transport.py b/libp2p/relay/circuit_v2/transport.py index b33f7e467..c472ac177 100644 --- a/libp2p/relay/circuit_v2/transport.py +++ b/libp2p/relay/circuit_v2/transport.py @@ -44,8 +44,8 @@ ) from .protocol import ( PROTOCOL_ID, - CircuitV2Protocol, STREAM_READ_TIMEOUT, + CircuitV2Protocol, ) from .protocol_buffer import ( StatusCode, @@ -161,14 +161,20 @@ async def dial_peer_info( # Get a stream to the relay try: - logger.debug("Opening stream to relay %s with protocol %s", relay_peer_id, PROTOCOL_ID) + logger.debug( + "Opening stream to relay %s with protocol %s", + relay_peer_id, + PROTOCOL_ID, + ) relay_stream = await self.host.new_stream(relay_peer_id, [PROTOCOL_ID]) if not relay_stream: raise ConnectionError(f"Could not open stream to relay {relay_peer_id}") logger.debug("Successfully opened stream to relay %s", relay_peer_id) except Exception as e: logger.error("Failed to open stream to relay %s: %s", relay_peer_id, str(e)) - raise ConnectionError(f"Could not open stream to relay {relay_peer_id}: {str(e)}") + raise ConnectionError( + f"Could not open stream to relay {relay_peer_id}: {str(e)}" + ) try: # First try to make a reservation if enabled @@ -268,7 +274,7 @@ async def _make_reservation( logger.debug("Message type: %s", reserve_msg.type) logger.debug("Peer ID: %s", self.host.get_id()) logger.debug("Raw message: %s", reserve_msg) - + try: await stream.write(reserve_msg.SerializeToString()) logger.debug("Successfully sent reservation request") @@ -281,23 +287,33 @@ async def _make_reservation( with trio.fail_after(STREAM_READ_TIMEOUT): try: resp_bytes = await stream.read() - logger.debug("Received reservation response: %d bytes", len(resp_bytes)) + logger.debug( + "Received reservation response: %d bytes", len(resp_bytes) + ) resp = HopMessage() resp.ParseFromString(resp_bytes) logger.debug("=== PARSED RESERVATION RESPONSE ===") logger.debug("Message type: %s", resp.type) - logger.debug("Status code: %s", getattr(resp.status, "code", "unknown")) - logger.debug("Status message: %s", getattr(resp.status, "message", "unknown")) + logger.debug( + "Status code: %s", getattr(resp.status, "code", "unknown") + ) + logger.debug( + "Status message: %s", getattr(resp.status, "message", "unknown") + ) logger.debug("Raw response: %s", resp) except Exception as e: - logger.error("Failed to read/parse reservation response: %s", str(e)) + logger.error( + "Failed to read/parse reservation response: %s", str(e) + ) raise # Access status attributes directly status_code = getattr(resp.status, "code", StatusCode.OK) status_msg = getattr(resp.status, "message", "Unknown error") - logger.debug("Reservation response: code=%s, message=%s", status_code, status_msg) + logger.debug( + "Reservation response: code=%s, message=%s", status_code, status_msg + ) if status_code != StatusCode.OK: logger.warning( From 1989ce7a83f240e69163b7752add83933e2354a1 Mon Sep 17 00:00:00 2001 From: Winter-Soren Date: Sat, 21 Jun 2025 18:43:03 +0530 Subject: [PATCH 4/7] fix: revamp circuit relay example and moved example into examples directory --- examples/circuit_relay/__init__.py | 5 + examples/circuit_relay/relay_example.py | 429 ++++++++++++++++++++++++ libp2p/relay/circuit_v2/transport.py | 6 + 3 files changed, 440 insertions(+) create mode 100644 examples/circuit_relay/__init__.py create mode 100644 examples/circuit_relay/relay_example.py diff --git a/examples/circuit_relay/__init__.py b/examples/circuit_relay/__init__.py new file mode 100644 index 000000000..48f0a5f4a --- /dev/null +++ b/examples/circuit_relay/__init__.py @@ -0,0 +1,5 @@ +""" +Circuit Relay v2 example module. + +This package demonstrates the usage of Circuit Relay v2 protocol in libp2p. +""" \ No newline at end of file diff --git a/examples/circuit_relay/relay_example.py b/examples/circuit_relay/relay_example.py new file mode 100644 index 000000000..1b8e2670d --- /dev/null +++ b/examples/circuit_relay/relay_example.py @@ -0,0 +1,429 @@ +""" +Circuit Relay v2 Example. + +This example demonstrates using the Circuit Relay v2 protocol by setting up: +1. A relay node that facilitates connections +2. A destination node that accepts incoming connections +3. A source node that connects to the destination through the relay + +Usage: + # First terminal - start the relay: + python relay_example.py --role relay --port 8000 + + # Second terminal - start the destination: + python relay_example.py --role destination --port 8001 --relay-addr RELAY_PEER_ID + + # Third terminal - start the source: + python relay_example.py --role source --relay-addr RELAY_PEER_ID --dest-id DESTINATION_PEER_ID +""" + +import argparse +import logging +import sys +from typing import Any + +import multiaddr +import trio + +from libp2p import new_host +from libp2p.crypto.secp256k1 import create_new_key_pair +from libp2p.custom_types import TProtocol +from libp2p.network.stream.net_stream import INetStream +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr +from libp2p.relay.circuit_v2.config import RelayConfig +from libp2p.relay.circuit_v2.discovery import RelayDiscovery +from libp2p.relay.circuit_v2.protocol import CircuitV2Protocol, PROTOCOL_ID as RELAY_PROTOCOL_ID +from libp2p.relay.circuit_v2.resources import RelayLimits +from libp2p.relay.circuit_v2.transport import CircuitV2Transport +from libp2p.tools.async_service import background_trio_service + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s | %(name)s | %(levelname)s | %(message)s", +) +logger = logging.getLogger("circuit-relay-example") + +# Application protocol for our example +EXAMPLE_PROTOCOL_ID = TProtocol("/circuit-relay-example/1.0.0") +MAX_READ_LEN = 2**16 # 64KB + + +async def handle_example_protocol(stream: INetStream) -> None: + """Handle incoming messages on our example protocol.""" + remote_peer_id = stream.get_protocol().remote_peer_id + logger.info(f"New stream from peer: {remote_peer_id}") + + try: + # Read the incoming message + msg = await stream.read(MAX_READ_LEN) + if msg: + logger.info(f"Received message: {msg.decode()}") + + # Send a response + response = f"Hello! This is {stream.get_protocol().local_peer_id}".encode() + await stream.write(response) + logger.info(f"Sent response to {remote_peer_id}") + except Exception as e: + logger.error(f"Error handling stream: {e}") + finally: + await stream.close() + + +async def setup_relay_node(port: int, seed: int | None = None) -> None: + """Set up and run a relay node.""" + logger.info("Starting relay node...") + + # Create host with a fixed key if seed is provided + key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) + host = new_host(key_pair=key_pair) + + # Configure the relay + limits = RelayLimits( + duration=3600, # 1 hour + data=1024 * 1024 * 100, # 100 MB + max_circuit_conns=10, + max_reservations=5, + ) + + relay_config = RelayConfig( + enable_hop=True, # Act as a relay + enable_stop=True, # Accept relayed connections + enable_client=True, # Use other relays if needed + limits=limits, + ) + + # Initialize the protocol + protocol = CircuitV2Protocol(host, limits=limits, allow_hop=True) + + # Start the host + listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + + async with host.run(listen_addrs=[listen_addr]): + # Print information about this node + peer_id = host.get_id() + logger.info(f"Relay node started with ID: {peer_id}") + + addrs = host.get_addrs() + for addr in addrs: + logger.info(f"Listening on: {addr}") + + # Register our example protocol handler + host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) + + # Start the relay protocol service + async with background_trio_service(protocol): + logger.info("Circuit relay protocol started") + + # Create and register the transport + transport = CircuitV2Transport(host, protocol, relay_config) + logger.info("Circuit relay transport initialized") + + print("\nRelay node is running. Use the following address to connect:") + print(f"{addrs[0]}/p2p/{peer_id}") + print("\nPress Ctrl+C to exit\n") + + # Keep the relay running + await trio.sleep_forever() + + +async def setup_destination_node(port: int, relay_addr: str, seed: int | None = None) -> None: + """Set up and run a destination node that accepts incoming connections.""" + logger.info("Starting destination node...") + + # Create host with a fixed key if seed is provided + key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) + host = new_host(key_pair=key_pair) + + # Configure the circuit relay client + limits = RelayLimits( + duration=3600, # 1 hour + data=1024 * 1024 * 100, # 100 MB + max_circuit_conns=10, + max_reservations=5, + ) + + relay_config = RelayConfig( + enable_hop=False, # Not acting as a relay + enable_stop=True, # Accept relayed connections + enable_client=True, # Use relays for dialing + limits=limits, + ) + + # Initialize the protocol + protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False) + + # Start the host + listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + + async with host.run(listen_addrs=[listen_addr]): + # Print information about this node + peer_id = host.get_id() + logger.info(f"Destination node started with ID: {peer_id}") + + addrs = host.get_addrs() + for addr in addrs: + logger.info(f"Listening on: {addr}") + + # Register our example protocol handler + host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) + + # Start the relay protocol service + async with background_trio_service(protocol): + logger.info("Circuit relay protocol started") + + # Create and initialize transport + transport = CircuitV2Transport(host, protocol, relay_config) + + # Create discovery service + discovery = RelayDiscovery(host, auto_reserve=True) + transport.discovery = discovery + + # Start discovery service + async with background_trio_service(discovery): + logger.info("Relay discovery service started") + + # Connect to the relay + if relay_addr: + logger.info(f"Connecting to relay at {relay_addr}") + try: + # Handle both peer ID only or full multiaddr formats + if relay_addr.startswith("/"): + # Full multiaddr format + relay_maddr = multiaddr.Multiaddr(relay_addr) + relay_info = info_from_p2p_addr(relay_maddr) + else: + # Assume it's just a peer ID + relay_peer_id = ID.from_base58(relay_addr) + relay_info = PeerInfo(relay_peer_id, [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}")]) + logger.info(f"Using constructed address: {relay_info.addrs[0]}") + + await host.connect(relay_info) + logger.info(f"Connected to relay {relay_info.peer_id}") + except Exception as e: + logger.error(f"Failed to connect to relay: {e}") + return + + print("\nDestination node is running with peer ID:") + print(f"{peer_id}") + print("\nPress Ctrl+C to exit\n") + + # Keep the node running + await trio.sleep_forever() + + +async def setup_source_node(relay_addr: str, dest_id: str, seed: int | None = None) -> None: + """Set up and run a source node that connects to the destination through the relay.""" + logger.info("Starting source node...") + + if not relay_addr: + logger.error("Relay address is required for source mode") + return + + if not dest_id: + logger.error("Destination peer ID is required for source mode") + return + + # Create host with a fixed key if seed is provided + key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) + host = new_host(key_pair=key_pair) + + # Configure the circuit relay client + limits = RelayLimits( + duration=3600, # 1 hour + data=1024 * 1024 * 100, # 100 MB + max_circuit_conns=10, + max_reservations=5, + ) + + relay_config = RelayConfig( + enable_hop=False, # Not acting as a relay + enable_stop=True, # Accept relayed connections + enable_client=True, # Use relays for dialing + limits=limits, + ) + + # Initialize the protocol + protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False) + + # Start the host + async with host.run(listen_addrs=[multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/0")]): # Use ephemeral port + # Print information about this node + peer_id = host.get_id() + logger.info(f"Source node started with ID: {peer_id}") + + # Get assigned address for debugging + addrs = host.get_addrs() + if addrs: + logger.info(f"Source node listening on: {addrs[0]}") + + # Start the relay protocol service + async with background_trio_service(protocol): + logger.info("Circuit relay protocol started") + + # Create and initialize transport + transport = CircuitV2Transport(host, protocol, relay_config) + + # Create discovery service + discovery = RelayDiscovery(host, auto_reserve=True) + transport.discovery = discovery + + # Start discovery service + async with background_trio_service(discovery): + logger.info("Relay discovery service started") + + # Connect to the relay + logger.info(f"Connecting to relay at {relay_addr}") + try: + # Handle both peer ID only or full multiaddr formats + if relay_addr.startswith("/"): + # Full multiaddr format + relay_maddr = multiaddr.Multiaddr(relay_addr) + relay_info = info_from_p2p_addr(relay_maddr) + else: + # Assume it's just a peer ID + relay_peer_id = ID.from_base58(relay_addr) + relay_info = PeerInfo(relay_peer_id, [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}")]) + logger.info(f"Using constructed address: {relay_info.addrs[0]}") + + await host.connect(relay_info) + logger.info(f"Connected to relay {relay_info.peer_id}") + + # Wait for relay discovery to find the relay + await trio.sleep(2) + + # Convert destination ID string to peer ID + dest_peer_id = ID.from_base58(dest_id) + + # Try to connect to the destination through the relay + logger.info(f"Connecting to destination {dest_peer_id} through relay") + + # Create peer info with relay + relay_peer_id = relay_info.peer_id + logger.info(f"This is the relay peer id: {relay_peer_id}") + + # Create a proper peer info with a relay address + # The destination peer should be reachable through a p2p-circuit address + # Format: /p2p-circuit/p2p/DESTINATION_PEER_ID + circuit_addr = multiaddr.Multiaddr(f"/p2p-circuit/p2p/{dest_id}") + dest_peer_info = PeerInfo(dest_peer_id, [circuit_addr]) + logger.info(f"This is the dest peer info: {dest_peer_info}") + + # Dial through the relay + try: + logger.info(f"Attempting to dial destination {dest_peer_id} through relay {relay_peer_id}") + + connection = await transport.dial_peer_info( + dest_peer_info, relay_peer_id=relay_peer_id + ) + + logger.info(f"This is the dial connection: {connection}") + + logger.info(f"Successfully connected to destination through relay!") + + # Open a stream to our example protocol + stream = await host.new_stream(dest_peer_id, [EXAMPLE_PROTOCOL_ID]) + if stream: + logger.info(f"Opened stream to destination with protocol {EXAMPLE_PROTOCOL_ID}") + + # Send a message + msg = f"Hello from {peer_id}!".encode() + await stream.write(msg) + logger.info(f"Sent message to destination") + + # Wait for response + response = await stream.read(MAX_READ_LEN) + logger.info(f"Received response: {response.decode() if response else 'No response'}") + + # Close the stream + await stream.close() + else: + logger.error("Failed to open stream to destination") + except Exception as e: + logger.error(f"Failed to dial through relay: {str(e)}") + logger.error(f"Exception type: {type(e).__name__}") + raise + + except Exception as e: + logger.error(f"Error: {e}") + + print("\nSource operation completed") + # Keep running for a bit to allow messages to be processed + await trio.sleep(5) + + +def generate_fixed_private_key(seed: int) -> bytes: + """Generate a fixed private key from a seed for reproducible peer IDs.""" + if seed is None: + return None + + import random + random.seed(seed) + return random.getrandbits(32 * 8).to_bytes(length=32, byteorder="big") + + +def main() -> None: + """Parse arguments and run the appropriate node type.""" + parser = argparse.ArgumentParser(description="Circuit Relay v2 Example") + parser.add_argument( + "--role", + type=str, + choices=["relay", "source", "destination"], + required=True, + help="Node role (relay, source, or destination)", + ) + parser.add_argument( + "--port", + type=int, + default=0, + help="Port to listen on (for relay and destination nodes)", + ) + parser.add_argument( + "--relay-addr", + type=str, + help="Multiaddress or peer ID of relay node (for destination and source nodes)", + ) + parser.add_argument( + "--dest-id", + type=str, + help="Peer ID of destination node (for source node)", + ) + parser.add_argument( + "--seed", + type=int, + help="Random seed for reproducible peer IDs", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging", + ) + + args = parser.parse_args() + + # Set log level + if args.debug: + logging.getLogger().setLevel(logging.DEBUG) + logging.getLogger("libp2p").setLevel(logging.DEBUG) + + try: + if args.role == "relay": + trio.run(setup_relay_node, args.port, args.seed) + elif args.role == "destination": + if not args.relay_addr: + parser.error("--relay-addr is required for destination role") + trio.run(setup_destination_node, args.port, args.relay_addr, args.seed) + elif args.role == "source": + if not args.relay_addr or not args.dest_id: + parser.error("--relay-addr and --dest-id are required for source role") + trio.run(setup_source_node, args.relay_addr, args.dest_id, args.seed) + except KeyboardInterrupt: + print("\nExiting...") + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/libp2p/relay/circuit_v2/transport.py b/libp2p/relay/circuit_v2/transport.py index ffd310902..722af8fe4 100644 --- a/libp2p/relay/circuit_v2/transport.py +++ b/libp2p/relay/circuit_v2/transport.py @@ -152,6 +152,8 @@ async def dial_peer_info( If the connection cannot be established """ + print(f"1. inside the dial_peer_info is the relay peer id: {relay_peer_id}") + print(f"2. inside the dial_peer_info is the peer info: {peer_info}") # If no specific relay is provided, try to find one if relay_peer_id is None: relay_peer_id = await self._select_relay(peer_info) @@ -160,12 +162,14 @@ async def dial_peer_info( # Get a stream to the relay relay_stream = await self.host.new_stream(relay_peer_id, [PROTOCOL_ID]) + print(f"3. inside the dial_peer_info is the relay stream: {relay_stream}") if not relay_stream: raise ConnectionError(f"Could not open stream to relay {relay_peer_id}") try: # First try to make a reservation if enabled if self.config.enable_client: + print(f"4. inside the dial_peer_info is the relay stream: {relay_stream}") success = await self._make_reservation(relay_stream, relay_peer_id) if not success: logger.warning( @@ -177,7 +181,9 @@ async def dial_peer_info( type=HopMessage.CONNECT, peer=peer_info.peer_id.to_bytes(), ) + print(f"5. inside the dial_peer_info is the hop msg: {hop_msg}") await relay_stream.write(hop_msg.SerializeToString()) + print(f"6. inside the dial_peer_info is the relay stream: {relay_stream}") # Read response resp_bytes = await relay_stream.read() From 4f18f6c689ab068d833d08527b148c83c148893a Mon Sep 17 00:00:00 2001 From: Winter-Soren Date: Sun, 13 Jul 2025 01:46:38 +0530 Subject: [PATCH 5/7] added more loggers and added protocol handlers --- examples/circuit_relay/relay_example.py | 12 +- libp2p/relay/circuit_v2/protocol.py | 193 ++++++++---------------- libp2p/relay/circuit_v2/transport.py | 64 +++++--- libp2p/transport/tcp/tcp.py | 6 + 4 files changed, 124 insertions(+), 151 deletions(-) diff --git a/examples/circuit_relay/relay_example.py b/examples/circuit_relay/relay_example.py index 1b8e2670d..9ecbc2f36 100644 --- a/examples/circuit_relay/relay_example.py +++ b/examples/circuit_relay/relay_example.py @@ -33,7 +33,7 @@ from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr from libp2p.relay.circuit_v2.config import RelayConfig from libp2p.relay.circuit_v2.discovery import RelayDiscovery -from libp2p.relay.circuit_v2.protocol import CircuitV2Protocol, PROTOCOL_ID as RELAY_PROTOCOL_ID +from libp2p.relay.circuit_v2.protocol import CircuitV2Protocol, PROTOCOL_ID, STOP_PROTOCOL_ID from libp2p.relay.circuit_v2.resources import RelayLimits from libp2p.relay.circuit_v2.transport import CircuitV2Transport from libp2p.tools.async_service import background_trio_service @@ -109,8 +109,11 @@ async def setup_relay_node(port: int, seed: int | None = None) -> None: for addr in addrs: logger.info(f"Listening on: {addr}") - # Register our example protocol handler + # Register protocol handlers host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) + host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream) + host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream) + logger.debug("Protocol handlers registered") # Start the relay protocol service async with background_trio_service(protocol): @@ -166,8 +169,11 @@ async def setup_destination_node(port: int, relay_addr: str, seed: int | None = for addr in addrs: logger.info(f"Listening on: {addr}") - # Register our example protocol handler + # Register protocol handlers host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) + host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream) + host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream) + logger.debug("Protocol handlers registered") # Start the relay protocol service async with background_trio_service(protocol): diff --git a/libp2p/relay/circuit_v2/protocol.py b/libp2p/relay/circuit_v2/protocol.py index 1cf76efa8..adbbe7efd 100644 --- a/libp2p/relay/circuit_v2/protocol.py +++ b/libp2p/relay/circuit_v2/protocol.py @@ -70,7 +70,7 @@ STREAM_READ_TIMEOUT = 15 # seconds STREAM_WRITE_TIMEOUT = 15 # seconds STREAM_CLOSE_TIMEOUT = 10 # seconds -MAX_READ_RETRIES = 5 # Maximum number of read retries +MAX_READ_RETRIES = 2 # Reduced retries to avoid masking real issues # Extended interfaces for type checking @@ -276,6 +276,7 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: This handler processes relay requests from other peers. """ + logger.debug("=== HOP STREAM HANDLER CALLED ===") try: # Try to get peer ID first try: @@ -290,121 +291,64 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: logger.debug("Handling hop stream from %s", remote_id) - # First, handle the read timeout gracefully - try: - with trio.fail_after( - STREAM_READ_TIMEOUT * 2 - ): # Double the timeout for reading - msg_bytes = await stream.read() - if not msg_bytes: - logger.error( - "Empty read from stream from %s", - remote_id, - ) - # Create a proto Status directly - pb_status = PbStatus() - pb_status.code = cast(Any, int(StatusCode.MALFORMED_MESSAGE)) - pb_status.message = "Empty message received" - - response = HopMessage( - type=HopMessage.STATUS, - status=pb_status, - ) - await stream.write(response.SerializeToString()) - await trio.sleep(0.5) # Longer wait to ensure message is sent - return - except trio.TooSlowError: - logger.error( - "Timeout reading from hop stream from %s", - remote_id, - ) - # Create a proto Status directly - pb_status = PbStatus() - pb_status.code = cast(Any, int(StatusCode.CONNECTION_FAILED)) - pb_status.message = "Stream read timeout" - - response = HopMessage( - type=HopMessage.STATUS, - status=pb_status, - ) - await stream.write(response.SerializeToString()) - await trio.sleep(0.5) # Longer wait to ensure the message is sent - return - except Exception as e: - logger.error( - "Error reading from hop stream from %s: %s", - remote_id, - str(e), - ) - # Create a proto Status directly - pb_status = PbStatus() - pb_status.code = cast(Any, int(StatusCode.MALFORMED_MESSAGE)) - pb_status.message = f"Read error: {str(e)}" - - response = HopMessage( - type=HopMessage.STATUS, - status=pb_status, - ) - await stream.write(response.SerializeToString()) - await trio.sleep(0.5) # Longer wait to ensure the message is sent - return - - # Parse the message - try: - hop_msg = HopMessage() - hop_msg.ParseFromString(msg_bytes) - except Exception as e: - logger.error( - "Error parsing hop message from %s: %s", - remote_id, - str(e), - ) - # Create a proto Status directly - pb_status = PbStatus() - pb_status.code = cast(Any, int(StatusCode.MALFORMED_MESSAGE)) - pb_status.message = f"Parse error: {str(e)}" - - response = HopMessage( - type=HopMessage.STATUS, - status=pb_status, - ) - await stream.write(response.SerializeToString()) - await trio.sleep(0.5) # Longer wait to ensure the message is sent - return + # Handle multiple messages on the same stream + while True: + # Read message with timeout + try: + with trio.fail_after(STREAM_READ_TIMEOUT): + msg_bytes = await stream.read() + if not msg_bytes: + logger.debug("Stream closed by peer %s", remote_id) + return + except trio.TooSlowError: + logger.error("Timeout reading from hop stream from %s", remote_id) + return + except Exception as e: + logger.error("Error reading from hop stream from %s: %s", remote_id, str(e)) + return - # Process based on message type - if hop_msg.type == HopMessage.RESERVE: - logger.debug("Handling RESERVE message from %s", remote_id) - await self._handle_reserve(stream, hop_msg) - # For RESERVE requests, let the client close the stream - return - elif hop_msg.type == HopMessage.CONNECT: - logger.debug("Handling CONNECT message from %s", remote_id) - await self._handle_connect(stream, hop_msg) - else: - logger.error("Invalid message type %d from %s", hop_msg.type, remote_id) - # Send a nice error response using _send_status method - await self._send_status( - stream, - StatusCode.MALFORMED_MESSAGE, - f"Invalid message type: {hop_msg.type}", - ) + # Parse the message + try: + hop_msg = HopMessage() + hop_msg.ParseFromString(msg_bytes) + except Exception as e: + logger.error("Error parsing hop message from %s: %s", remote_id, str(e)) + await self._send_status( + stream, + StatusCode.MALFORMED_MESSAGE, + f"Parse error: {str(e)}", + ) + return + + # Process based on message type + if hop_msg.type == HopMessage.RESERVE: + logger.debug("Handling RESERVE message from %s", remote_id) + await self._handle_reserve(stream, hop_msg) + # Continue reading for more messages + elif hop_msg.type == HopMessage.CONNECT: + logger.debug("Handling CONNECT message from %s", remote_id) + await self._handle_connect(stream, hop_msg) + # CONNECT establishes a circuit, so we're done with this stream + return + else: + logger.error("Invalid message type %d from %s", hop_msg.type, remote_id) + await self._send_status( + stream, + StatusCode.MALFORMED_MESSAGE, + f"Invalid message type: {hop_msg.type}", + ) + return except Exception as e: - logger.error( - "Unexpected error handling hop stream from %s: %s", remote_id, str(e) - ) + logger.error("Unexpected error handling hop stream from %s: %s", remote_id, str(e)) try: - # Send a nice error response using _send_status method await self._send_status( stream, StatusCode.MALFORMED_MESSAGE, f"Internal error: {str(e)}", ) except Exception as e2: - logger.error( - "Failed to send error response to %s: %s", remote_id, str(e2) - ) + logger.error("Failed to send error response to %s: %s", remote_id, str(e2)) async def _handle_stop_stream(self, stream: INetStream) -> None: """ @@ -536,12 +480,8 @@ async def _handle_reserve(self, stream: INetStream, msg: Any) -> None: ttl, ) - # Send the response with increased timeout + # Send the response await stream.write(response.SerializeToString()) - - # Add a small wait to ensure the message is fully sent - await trio.sleep(0.1) - logger.debug("Reservation response sent successfully") except Exception as e: @@ -556,18 +496,11 @@ async def _handle_reserve(self, stream: INetStream, msg: Any) -> None: ) except Exception as send_err: logger.error("Failed to send error response: %s", str(send_err)) - finally: - # Always close the stream when done with reservation - if cast(INetStreamWithExtras, stream).is_open(): - try: - with trio.fail_after(STREAM_CLOSE_TIMEOUT): - await stream.close() - except Exception as close_err: - logger.error("Error closing stream: %s", str(close_err)) async def _handle_connect(self, stream: INetStream, msg: Any) -> None: """Handle a connect request.""" peer_id = ID(msg.peer) + logger.debug("Handling CONNECT request for peer %s", peer_id) dst_stream: INetStream | None = None # Verify reservation if provided @@ -594,12 +527,15 @@ async def _handle_connect(self, stream: INetStream, msg: Any) -> None: try: # Store the source stream with properly typed None self._active_relays[peer_id] = (stream, None) + logger.debug("Stored source stream for peer %s", peer_id) # Try to connect to the destination with timeout with trio.fail_after(STREAM_READ_TIMEOUT): + logger.debug("Attempting to connect to destination %s", peer_id) dst_stream = await self.host.new_stream(peer_id, [STOP_PROTOCOL_ID]) if not dst_stream: raise ConnectionError("Could not connect to destination") + logger.debug("Successfully connected to destination %s", peer_id) # Send STOP CONNECT message stop_msg = StopMessage( @@ -640,6 +576,7 @@ async def _handle_connect(self, stream: INetStream, msg: Any) -> None: reservation.active_connections += 1 # Send success status + logger.debug("Sending OK status to source") await self._send_status( stream, StatusCode.OK, @@ -653,6 +590,7 @@ async def _handle_connect(self, stream: INetStream, msg: Any) -> None: except (trio.TooSlowError, ConnectionError) as e: logger.error("Error establishing relay connection: %s", str(e)) + logger.debug("Sending CONNECTION_FAILED status to source") await self._send_status( stream, StatusCode.CONNECTION_FAILED, @@ -730,9 +668,11 @@ async def _relay_data( logger.error("Error relaying data: %s", str(e)) finally: # Clean up streams and remove from active relays - await src_stream.reset() - await dst_stream.reset() + # Only reset streams once to avoid double-reset issues if peer_id in self._active_relays: + src_stream_cleanup, dst_stream_cleanup = self._active_relays[peer_id] + await self._close_stream(src_stream_cleanup) + await self._close_stream(dst_stream_cleanup) del self._active_relays[peer_id] async def _send_status( @@ -744,7 +684,7 @@ async def _send_status( """Send a status message.""" try: logger.debug("Sending status message with code %s: %s", code, message) - with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout + with trio.fail_after(STREAM_WRITE_TIMEOUT): # Create a proto Status directly pb_status = PbStatus() pb_status.code = cast( @@ -761,11 +701,7 @@ async def _send_status( logger.debug("Status message serialized (%d bytes)", len(msg_bytes)) await stream.write(msg_bytes) - logger.debug("Status message sent, waiting for processing") - - # Wait longer to ensure the message is sent - await trio.sleep(1.5) - logger.debug("Status message sending completed") + logger.debug("Status message sent successfully") except trio.TooSlowError: logger.error( "Timeout sending status message: code=%s, message=%s", code, message @@ -782,7 +718,7 @@ async def _send_stop_status( """Send a status message on a STOP stream.""" try: logger.debug("Sending stop status message with code %s: %s", code, message) - with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout + with trio.fail_after(STREAM_WRITE_TIMEOUT): # Create a proto Status directly pb_status = PbStatus() pb_status.code = cast( @@ -795,6 +731,5 @@ async def _send_stop_status( status=pb_status, ) await stream.write(status_msg.SerializeToString()) - await trio.sleep(0.5) # Ensure message is sent except Exception as e: logger.error("Error sending stop status message: %s", str(e)) diff --git a/libp2p/relay/circuit_v2/transport.py b/libp2p/relay/circuit_v2/transport.py index 722af8fe4..b33f7e467 100644 --- a/libp2p/relay/circuit_v2/transport.py +++ b/libp2p/relay/circuit_v2/transport.py @@ -45,6 +45,7 @@ from .protocol import ( PROTOCOL_ID, CircuitV2Protocol, + STREAM_READ_TIMEOUT, ) from .protocol_buffer import ( StatusCode, @@ -152,8 +153,6 @@ async def dial_peer_info( If the connection cannot be established """ - print(f"1. inside the dial_peer_info is the relay peer id: {relay_peer_id}") - print(f"2. inside the dial_peer_info is the peer info: {peer_info}") # If no specific relay is provided, try to find one if relay_peer_id is None: relay_peer_id = await self._select_relay(peer_info) @@ -161,15 +160,19 @@ async def dial_peer_info( raise ConnectionError("No suitable relay found") # Get a stream to the relay - relay_stream = await self.host.new_stream(relay_peer_id, [PROTOCOL_ID]) - print(f"3. inside the dial_peer_info is the relay stream: {relay_stream}") - if not relay_stream: - raise ConnectionError(f"Could not open stream to relay {relay_peer_id}") + try: + logger.debug("Opening stream to relay %s with protocol %s", relay_peer_id, PROTOCOL_ID) + relay_stream = await self.host.new_stream(relay_peer_id, [PROTOCOL_ID]) + if not relay_stream: + raise ConnectionError(f"Could not open stream to relay {relay_peer_id}") + logger.debug("Successfully opened stream to relay %s", relay_peer_id) + except Exception as e: + logger.error("Failed to open stream to relay %s: %s", relay_peer_id, str(e)) + raise ConnectionError(f"Could not open stream to relay {relay_peer_id}: {str(e)}") try: # First try to make a reservation if enabled if self.config.enable_client: - print(f"4. inside the dial_peer_info is the relay stream: {relay_stream}") success = await self._make_reservation(relay_stream, relay_peer_id) if not success: logger.warning( @@ -181,14 +184,13 @@ async def dial_peer_info( type=HopMessage.CONNECT, peer=peer_info.peer_id.to_bytes(), ) - print(f"5. inside the dial_peer_info is the hop msg: {hop_msg}") await relay_stream.write(hop_msg.SerializeToString()) - print(f"6. inside the dial_peer_info is the relay stream: {relay_stream}") - # Read response - resp_bytes = await relay_stream.read() - resp = HopMessage() - resp.ParseFromString(resp_bytes) + # Read response with timeout + with trio.fail_after(STREAM_READ_TIMEOUT): + resp_bytes = await relay_stream.read() + resp = HopMessage() + resp.ParseFromString(resp_bytes) # Access status attributes directly status_code = getattr(resp.status, "code", StatusCode.OK) @@ -262,17 +264,41 @@ async def _make_reservation( type=HopMessage.RESERVE, peer=self.host.get_id().to_bytes(), ) - await stream.write(reserve_msg.SerializeToString()) - - # Read response - resp_bytes = await stream.read() - resp = HopMessage() - resp.ParseFromString(resp_bytes) + logger.debug("=== SENDING RESERVATION REQUEST ===") + logger.debug("Message type: %s", reserve_msg.type) + logger.debug("Peer ID: %s", self.host.get_id()) + logger.debug("Raw message: %s", reserve_msg) + + try: + await stream.write(reserve_msg.SerializeToString()) + logger.debug("Successfully sent reservation request") + except Exception as e: + logger.error("Failed to send reservation request: %s", str(e)) + raise + + # Read response with timeout + logger.debug("=== WAITING FOR RESERVATION RESPONSE ===") + with trio.fail_after(STREAM_READ_TIMEOUT): + try: + resp_bytes = await stream.read() + logger.debug("Received reservation response: %d bytes", len(resp_bytes)) + resp = HopMessage() + resp.ParseFromString(resp_bytes) + logger.debug("=== PARSED RESERVATION RESPONSE ===") + logger.debug("Message type: %s", resp.type) + logger.debug("Status code: %s", getattr(resp.status, "code", "unknown")) + logger.debug("Status message: %s", getattr(resp.status, "message", "unknown")) + logger.debug("Raw response: %s", resp) + except Exception as e: + logger.error("Failed to read/parse reservation response: %s", str(e)) + raise # Access status attributes directly status_code = getattr(resp.status, "code", StatusCode.OK) status_msg = getattr(resp.status, "message", "Unknown error") + logger.debug("Reservation response: code=%s, message=%s", status_code, status_msg) + if status_code != StatusCode.OK: logger.warning( "Reservation failed with relay %s: %s", diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 1598ea42a..81477c37f 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -160,15 +160,21 @@ async def dial(self, maddr: Multiaddr) -> IRawConnection: try: # trio.open_tcp_stream requires host to be str or bytes, not None. + logger.debug("=== OPENING TCP STREAM ===") + logger.debug("Host: %s", host_str) + logger.debug("Port: %d", port_int) stream = await trio.open_tcp_stream(host_str, port_int) + logger.debug("Successfully opened TCP stream") except OSError as error: # OSError is common for network issues like "Connection refused" # or "Host unreachable". + logger.error("Failed to open TCP stream: %s", error) raise OpenConnectionError( f"Failed to open TCP stream to {maddr}: {error}" ) from error except Exception as error: # Catch other potential errors from trio.open_tcp_stream and wrap them. + logger.error("Unexpected error opening TCP stream: %s", error) raise OpenConnectionError( f"An unexpected error occurred when dialing {maddr}: {error}" ) from error From 991c990c16ae80c7333936a95e6e889c9e210924 Mon Sep 17 00:00:00 2001 From: Winter-Soren Date: Sun, 13 Jul 2025 01:53:39 +0530 Subject: [PATCH 6/7] fixed pyrefly pre-commithook issue --- examples/circuit_relay/__init__.py | 2 +- examples/circuit_relay/relay_example.py | 209 ++++++++++++++---------- libp2p/relay/circuit_v2/protocol.py | 20 ++- libp2p/relay/circuit_v2/transport.py | 34 +++- 4 files changed, 166 insertions(+), 99 deletions(-) diff --git a/examples/circuit_relay/__init__.py b/examples/circuit_relay/__init__.py index 48f0a5f4a..ccab8b932 100644 --- a/examples/circuit_relay/__init__.py +++ b/examples/circuit_relay/__init__.py @@ -2,4 +2,4 @@ Circuit Relay v2 example module. This package demonstrates the usage of Circuit Relay v2 protocol in libp2p. -""" \ No newline at end of file +""" diff --git a/examples/circuit_relay/relay_example.py b/examples/circuit_relay/relay_example.py index 9ecbc2f36..888b20357 100644 --- a/examples/circuit_relay/relay_example.py +++ b/examples/circuit_relay/relay_example.py @@ -20,7 +20,6 @@ import argparse import logging import sys -from typing import Any import multiaddr import trio @@ -33,7 +32,11 @@ from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr from libp2p.relay.circuit_v2.config import RelayConfig from libp2p.relay.circuit_v2.discovery import RelayDiscovery -from libp2p.relay.circuit_v2.protocol import CircuitV2Protocol, PROTOCOL_ID, STOP_PROTOCOL_ID +from libp2p.relay.circuit_v2.protocol import ( + PROTOCOL_ID, + STOP_PROTOCOL_ID, + CircuitV2Protocol, +) from libp2p.relay.circuit_v2.resources import RelayLimits from libp2p.relay.circuit_v2.transport import CircuitV2Transport from libp2p.tools.async_service import background_trio_service @@ -52,17 +55,19 @@ async def handle_example_protocol(stream: INetStream) -> None: """Handle incoming messages on our example protocol.""" - remote_peer_id = stream.get_protocol().remote_peer_id + remote_peer_id = stream.muxed_conn.peer_id logger.info(f"New stream from peer: {remote_peer_id}") - + try: # Read the incoming message msg = await stream.read(MAX_READ_LEN) if msg: logger.info(f"Received message: {msg.decode()}") - + # Send a response - response = f"Hello! This is {stream.get_protocol().local_peer_id}".encode() + # Get the local peer ID from the secure connection + local_peer_id = stream.muxed_conn.peer_id + response = f"Hello! This is {local_peer_id}".encode() await stream.write(response) logger.info(f"Sent response to {remote_peer_id}") except Exception as e: @@ -74,11 +79,11 @@ async def handle_example_protocol(stream: INetStream) -> None: async def setup_relay_node(port: int, seed: int | None = None) -> None: """Set up and run a relay node.""" logger.info("Starting relay node...") - + # Create host with a fixed key if seed is provided key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) host = new_host(key_pair=key_pair) - + # Configure the relay limits = RelayLimits( duration=3600, # 1 hour @@ -86,59 +91,61 @@ async def setup_relay_node(port: int, seed: int | None = None) -> None: max_circuit_conns=10, max_reservations=5, ) - + relay_config = RelayConfig( enable_hop=True, # Act as a relay enable_stop=True, # Accept relayed connections enable_client=True, # Use other relays if needed limits=limits, ) - + # Initialize the protocol protocol = CircuitV2Protocol(host, limits=limits, allow_hop=True) - + # Start the host listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") - + async with host.run(listen_addrs=[listen_addr]): # Print information about this node peer_id = host.get_id() logger.info(f"Relay node started with ID: {peer_id}") - + addrs = host.get_addrs() for addr in addrs: logger.info(f"Listening on: {addr}") - + # Register protocol handlers host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream) host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream) logger.debug("Protocol handlers registered") - + # Start the relay protocol service async with background_trio_service(protocol): logger.info("Circuit relay protocol started") - + # Create and register the transport transport = CircuitV2Transport(host, protocol, relay_config) logger.info("Circuit relay transport initialized") - + print("\nRelay node is running. Use the following address to connect:") print(f"{addrs[0]}/p2p/{peer_id}") print("\nPress Ctrl+C to exit\n") - + # Keep the relay running await trio.sleep_forever() -async def setup_destination_node(port: int, relay_addr: str, seed: int | None = None) -> None: +async def setup_destination_node( + port: int, relay_addr: str, seed: int | None = None +) -> None: """Set up and run a destination node that accepts incoming connections.""" logger.info("Starting destination node...") - + # Create host with a fixed key if seed is provided key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) host = new_host(key_pair=key_pair) - + # Configure the circuit relay client limits = RelayLimits( duration=3600, # 1 hour @@ -146,50 +153,50 @@ async def setup_destination_node(port: int, relay_addr: str, seed: int | None = max_circuit_conns=10, max_reservations=5, ) - + relay_config = RelayConfig( enable_hop=False, # Not acting as a relay enable_stop=True, # Accept relayed connections enable_client=True, # Use relays for dialing limits=limits, ) - + # Initialize the protocol protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False) - + # Start the host listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") - + async with host.run(listen_addrs=[listen_addr]): # Print information about this node peer_id = host.get_id() logger.info(f"Destination node started with ID: {peer_id}") - + addrs = host.get_addrs() for addr in addrs: logger.info(f"Listening on: {addr}") - + # Register protocol handlers host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream) host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream) logger.debug("Protocol handlers registered") - + # Start the relay protocol service async with background_trio_service(protocol): logger.info("Circuit relay protocol started") - + # Create and initialize transport transport = CircuitV2Transport(host, protocol, relay_config) - + # Create discovery service discovery = RelayDiscovery(host, auto_reserve=True) transport.discovery = discovery - + # Start discovery service async with background_trio_service(discovery): logger.info("Relay discovery service started") - + # Connect to the relay if relay_addr: logger.info(f"Connecting to relay at {relay_addr}") @@ -202,39 +209,50 @@ async def setup_destination_node(port: int, relay_addr: str, seed: int | None = else: # Assume it's just a peer ID relay_peer_id = ID.from_base58(relay_addr) - relay_info = PeerInfo(relay_peer_id, [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}")]) - logger.info(f"Using constructed address: {relay_info.addrs[0]}") - + relay_info = PeerInfo( + relay_peer_id, + [ + multiaddr.Multiaddr( + f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}" + ) + ], + ) + logger.info( + f"Using constructed address: {relay_info.addrs[0]}" + ) + await host.connect(relay_info) logger.info(f"Connected to relay {relay_info.peer_id}") except Exception as e: logger.error(f"Failed to connect to relay: {e}") return - + print("\nDestination node is running with peer ID:") print(f"{peer_id}") print("\nPress Ctrl+C to exit\n") - + # Keep the node running await trio.sleep_forever() -async def setup_source_node(relay_addr: str, dest_id: str, seed: int | None = None) -> None: +async def setup_source_node( + relay_addr: str, dest_id: str, seed: int | None = None +) -> None: """Set up and run a source node that connects to the destination through the relay.""" logger.info("Starting source node...") - + if not relay_addr: logger.error("Relay address is required for source mode") return - + if not dest_id: logger.error("Destination peer ID is required for source mode") return - + # Create host with a fixed key if seed is provided key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) host = new_host(key_pair=key_pair) - + # Configure the circuit relay client limits = RelayLimits( duration=3600, # 1 hour @@ -242,43 +260,45 @@ async def setup_source_node(relay_addr: str, dest_id: str, seed: int | None = No max_circuit_conns=10, max_reservations=5, ) - + relay_config = RelayConfig( enable_hop=False, # Not acting as a relay enable_stop=True, # Accept relayed connections enable_client=True, # Use relays for dialing limits=limits, ) - + # Initialize the protocol protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False) - + # Start the host - async with host.run(listen_addrs=[multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/0")]): # Use ephemeral port + async with host.run( + listen_addrs=[multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0")] + ): # Use ephemeral port # Print information about this node peer_id = host.get_id() logger.info(f"Source node started with ID: {peer_id}") - + # Get assigned address for debugging addrs = host.get_addrs() if addrs: logger.info(f"Source node listening on: {addrs[0]}") - + # Start the relay protocol service async with background_trio_service(protocol): logger.info("Circuit relay protocol started") - + # Create and initialize transport transport = CircuitV2Transport(host, protocol, relay_config) - + # Create discovery service discovery = RelayDiscovery(host, auto_reserve=True) transport.discovery = discovery - + # Start discovery service async with background_trio_service(discovery): logger.info("Relay discovery service started") - + # Connect to the relay logger.info(f"Connecting to relay at {relay_addr}") try: @@ -290,58 +310,77 @@ async def setup_source_node(relay_addr: str, dest_id: str, seed: int | None = No else: # Assume it's just a peer ID relay_peer_id = ID.from_base58(relay_addr) - relay_info = PeerInfo(relay_peer_id, [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}")]) + relay_info = PeerInfo( + relay_peer_id, + [ + multiaddr.Multiaddr( + f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}" + ) + ], + ) logger.info(f"Using constructed address: {relay_info.addrs[0]}") - + await host.connect(relay_info) logger.info(f"Connected to relay {relay_info.peer_id}") - + # Wait for relay discovery to find the relay await trio.sleep(2) - + # Convert destination ID string to peer ID dest_peer_id = ID.from_base58(dest_id) - + # Try to connect to the destination through the relay - logger.info(f"Connecting to destination {dest_peer_id} through relay") - + logger.info( + f"Connecting to destination {dest_peer_id} through relay" + ) + # Create peer info with relay relay_peer_id = relay_info.peer_id logger.info(f"This is the relay peer id: {relay_peer_id}") - + # Create a proper peer info with a relay address # The destination peer should be reachable through a p2p-circuit address # Format: /p2p-circuit/p2p/DESTINATION_PEER_ID circuit_addr = multiaddr.Multiaddr(f"/p2p-circuit/p2p/{dest_id}") dest_peer_info = PeerInfo(dest_peer_id, [circuit_addr]) logger.info(f"This is the dest peer info: {dest_peer_info}") - + # Dial through the relay try: - logger.info(f"Attempting to dial destination {dest_peer_id} through relay {relay_peer_id}") - + logger.info( + f"Attempting to dial destination {dest_peer_id} through relay {relay_peer_id}" + ) + connection = await transport.dial_peer_info( dest_peer_info, relay_peer_id=relay_peer_id ) - + logger.info(f"This is the dial connection: {connection}") - - logger.info(f"Successfully connected to destination through relay!") - + + logger.info( + "Successfully connected to destination through relay!" + ) + # Open a stream to our example protocol - stream = await host.new_stream(dest_peer_id, [EXAMPLE_PROTOCOL_ID]) + stream = await host.new_stream( + dest_peer_id, [EXAMPLE_PROTOCOL_ID] + ) if stream: - logger.info(f"Opened stream to destination with protocol {EXAMPLE_PROTOCOL_ID}") - + logger.info( + f"Opened stream to destination with protocol {EXAMPLE_PROTOCOL_ID}" + ) + # Send a message msg = f"Hello from {peer_id}!".encode() await stream.write(msg) - logger.info(f"Sent message to destination") - + logger.info("Sent message to destination") + # Wait for response response = await stream.read(MAX_READ_LEN) - logger.info(f"Received response: {response.decode() if response else 'No response'}") - + logger.info( + f"Received response: {response.decode() if response else 'No response'}" + ) + # Close the stream await stream.close() else: @@ -350,21 +389,23 @@ async def setup_source_node(relay_addr: str, dest_id: str, seed: int | None = No logger.error(f"Failed to dial through relay: {str(e)}") logger.error(f"Exception type: {type(e).__name__}") raise - + except Exception as e: logger.error(f"Error: {e}") - + print("\nSource operation completed") # Keep running for a bit to allow messages to be processed await trio.sleep(5) -def generate_fixed_private_key(seed: int) -> bytes: +def generate_fixed_private_key(seed: int | None) -> bytes: """Generate a fixed private key from a seed for reproducible peer IDs.""" - if seed is None: - return None - import random + + if seed is None: + # Generate random bytes if no seed provided + return random.getrandbits(32 * 8).to_bytes(length=32, byteorder="big") + random.seed(seed) return random.getrandbits(32 * 8).to_bytes(length=32, byteorder="big") @@ -405,14 +446,14 @@ def main() -> None: action="store_true", help="Enable debug logging", ) - + args = parser.parse_args() - + # Set log level if args.debug: logging.getLogger().setLevel(logging.DEBUG) logging.getLogger("libp2p").setLevel(logging.DEBUG) - + try: if args.role == "relay": trio.run(setup_relay_node, args.port, args.seed) @@ -432,4 +473,4 @@ def main() -> None: if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/libp2p/relay/circuit_v2/protocol.py b/libp2p/relay/circuit_v2/protocol.py index adbbe7efd..fd0cd6994 100644 --- a/libp2p/relay/circuit_v2/protocol.py +++ b/libp2p/relay/circuit_v2/protocol.py @@ -304,7 +304,9 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: logger.error("Timeout reading from hop stream from %s", remote_id) return except Exception as e: - logger.error("Error reading from hop stream from %s: %s", remote_id, str(e)) + logger.error( + "Error reading from hop stream from %s: %s", remote_id, str(e) + ) return # Parse the message @@ -312,7 +314,9 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: hop_msg = HopMessage() hop_msg.ParseFromString(msg_bytes) except Exception as e: - logger.error("Error parsing hop message from %s: %s", remote_id, str(e)) + logger.error( + "Error parsing hop message from %s: %s", remote_id, str(e) + ) await self._send_status( stream, StatusCode.MALFORMED_MESSAGE, @@ -331,7 +335,9 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: # CONNECT establishes a circuit, so we're done with this stream return else: - logger.error("Invalid message type %d from %s", hop_msg.type, remote_id) + logger.error( + "Invalid message type %d from %s", hop_msg.type, remote_id + ) await self._send_status( stream, StatusCode.MALFORMED_MESSAGE, @@ -340,7 +346,9 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: return except Exception as e: - logger.error("Unexpected error handling hop stream from %s: %s", remote_id, str(e)) + logger.error( + "Unexpected error handling hop stream from %s: %s", remote_id, str(e) + ) try: await self._send_status( stream, @@ -348,7 +356,9 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: f"Internal error: {str(e)}", ) except Exception as e2: - logger.error("Failed to send error response to %s: %s", remote_id, str(e2)) + logger.error( + "Failed to send error response to %s: %s", remote_id, str(e2) + ) async def _handle_stop_stream(self, stream: INetStream) -> None: """ diff --git a/libp2p/relay/circuit_v2/transport.py b/libp2p/relay/circuit_v2/transport.py index b33f7e467..c472ac177 100644 --- a/libp2p/relay/circuit_v2/transport.py +++ b/libp2p/relay/circuit_v2/transport.py @@ -44,8 +44,8 @@ ) from .protocol import ( PROTOCOL_ID, - CircuitV2Protocol, STREAM_READ_TIMEOUT, + CircuitV2Protocol, ) from .protocol_buffer import ( StatusCode, @@ -161,14 +161,20 @@ async def dial_peer_info( # Get a stream to the relay try: - logger.debug("Opening stream to relay %s with protocol %s", relay_peer_id, PROTOCOL_ID) + logger.debug( + "Opening stream to relay %s with protocol %s", + relay_peer_id, + PROTOCOL_ID, + ) relay_stream = await self.host.new_stream(relay_peer_id, [PROTOCOL_ID]) if not relay_stream: raise ConnectionError(f"Could not open stream to relay {relay_peer_id}") logger.debug("Successfully opened stream to relay %s", relay_peer_id) except Exception as e: logger.error("Failed to open stream to relay %s: %s", relay_peer_id, str(e)) - raise ConnectionError(f"Could not open stream to relay {relay_peer_id}: {str(e)}") + raise ConnectionError( + f"Could not open stream to relay {relay_peer_id}: {str(e)}" + ) try: # First try to make a reservation if enabled @@ -268,7 +274,7 @@ async def _make_reservation( logger.debug("Message type: %s", reserve_msg.type) logger.debug("Peer ID: %s", self.host.get_id()) logger.debug("Raw message: %s", reserve_msg) - + try: await stream.write(reserve_msg.SerializeToString()) logger.debug("Successfully sent reservation request") @@ -281,23 +287,33 @@ async def _make_reservation( with trio.fail_after(STREAM_READ_TIMEOUT): try: resp_bytes = await stream.read() - logger.debug("Received reservation response: %d bytes", len(resp_bytes)) + logger.debug( + "Received reservation response: %d bytes", len(resp_bytes) + ) resp = HopMessage() resp.ParseFromString(resp_bytes) logger.debug("=== PARSED RESERVATION RESPONSE ===") logger.debug("Message type: %s", resp.type) - logger.debug("Status code: %s", getattr(resp.status, "code", "unknown")) - logger.debug("Status message: %s", getattr(resp.status, "message", "unknown")) + logger.debug( + "Status code: %s", getattr(resp.status, "code", "unknown") + ) + logger.debug( + "Status message: %s", getattr(resp.status, "message", "unknown") + ) logger.debug("Raw response: %s", resp) except Exception as e: - logger.error("Failed to read/parse reservation response: %s", str(e)) + logger.error( + "Failed to read/parse reservation response: %s", str(e) + ) raise # Access status attributes directly status_code = getattr(resp.status, "code", StatusCode.OK) status_msg = getattr(resp.status, "message", "Unknown error") - logger.debug("Reservation response: code=%s, message=%s", status_code, status_msg) + logger.debug( + "Reservation response: code=%s, message=%s", status_code, status_msg + ) if status_code != StatusCode.OK: logger.warning( From 20182a26f7e436a6b3e6ab081cd5af93ebd665d9 Mon Sep 17 00:00:00 2001 From: Winter-Soren Date: Sat, 19 Jul 2025 15:04:51 +0530 Subject: [PATCH 7/7] fixed pre-commit hook issues --- examples/circuit_relay/relay_example.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/examples/circuit_relay/relay_example.py b/examples/circuit_relay/relay_example.py index 888b20357..47b62552c 100644 --- a/examples/circuit_relay/relay_example.py +++ b/examples/circuit_relay/relay_example.py @@ -14,7 +14,9 @@ python relay_example.py --role destination --port 8001 --relay-addr RELAY_PEER_ID # Third terminal - start the source: - python relay_example.py --role source --relay-addr RELAY_PEER_ID --dest-id DESTINATION_PEER_ID + python relay_example.py --role source \ + --relay-addr RELAY_PEER_ID \ + --dest-id DESTINATION_PEER_ID """ import argparse @@ -125,7 +127,7 @@ async def setup_relay_node(port: int, seed: int | None = None) -> None: logger.info("Circuit relay protocol started") # Create and register the transport - transport = CircuitV2Transport(host, protocol, relay_config) + CircuitV2Transport(host, protocol, relay_config) logger.info("Circuit relay transport initialized") print("\nRelay node is running. Use the following address to connect:") @@ -238,7 +240,10 @@ async def setup_destination_node( async def setup_source_node( relay_addr: str, dest_id: str, seed: int | None = None ) -> None: - """Set up and run a source node that connects to the destination through the relay.""" + """ + Set up and run a source node that connects to the destination + through the relay. + """ logger.info("Starting source node...") if not relay_addr: @@ -339,8 +344,8 @@ async def setup_source_node( logger.info(f"This is the relay peer id: {relay_peer_id}") # Create a proper peer info with a relay address - # The destination peer should be reachable through a p2p-circuit address - # Format: /p2p-circuit/p2p/DESTINATION_PEER_ID + # The destination peer should be reachable through a + # p2p-circuit address circuit_addr = multiaddr.Multiaddr(f"/p2p-circuit/p2p/{dest_id}") dest_peer_info = PeerInfo(dest_peer_id, [circuit_addr]) logger.info(f"This is the dest peer info: {dest_peer_info}") @@ -348,7 +353,8 @@ async def setup_source_node( # Dial through the relay try: logger.info( - f"Attempting to dial destination {dest_peer_id} through relay {relay_peer_id}" + f"Attempting to dial destination {dest_peer_id} " + f"through relay {relay_peer_id}" ) connection = await transport.dial_peer_info( @@ -367,7 +373,8 @@ async def setup_source_node( ) if stream: logger.info( - f"Opened stream to destination with protocol {EXAMPLE_PROTOCOL_ID}" + f"Opened stream to destination with protocol " + f"{EXAMPLE_PROTOCOL_ID}" ) # Send a message @@ -378,7 +385,8 @@ async def setup_source_node( # Wait for response response = await stream.read(MAX_READ_LEN) logger.info( - f"Received response: {response.decode() if response else 'No response'}" + f"Received response: " + f"{response.decode() if response else 'No response'}" ) # Close the stream