diff --git a/tests/interop/js_libp2p/js_node/.gitIgnore b/tests/interop/js_libp2p/js_node/.gitIgnore new file mode 100644 index 000000000..cef77aaa2 --- /dev/null +++ b/tests/interop/js_libp2p/js_node/.gitIgnore @@ -0,0 +1,5 @@ +/node_modules +/package-lock.json +/dist +.log +.github \ No newline at end of file diff --git a/tests/interop/js_libp2p/py_node/ping.py b/tests/interop/js_libp2p/py_node/ping.py new file mode 100644 index 000000000..a13a8acee --- /dev/null +++ b/tests/interop/js_libp2p/py_node/ping.py @@ -0,0 +1,398 @@ +import argparse +import logging + +from cryptography.hazmat.primitives.asymmetric import ( + x25519, +) +import multiaddr +import trio + +from libp2p import ( + generate_new_rsa_identity, + new_host, +) +from libp2p.custom_types import ( + TProtocol, +) +from libp2p.network.stream.net_stream import ( + INetStream, +) +from libp2p.peer.peerinfo import ( + info_from_p2p_addr, +) +from libp2p.security.noise.transport import Transport as NoiseTransport +from libp2p.stream_muxer.yamux.yamux import ( + Yamux, +) +from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID + +# Configure detailed logging +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler("ping_debug.log", mode="w", encoding="utf-8"), + ], +) + +PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0") +PING_LENGTH = 32 +RESP_TIMEOUT = 60 + + +async def handle_ping(stream: INetStream) -> None: + """Handle incoming ping requests from js-libp2p clients""" + peer_id = stream.muxed_conn.peer_id + print(f"[INFO] New ping stream opened by {peer_id}") + logging.info(f"Ping handler called for peer {peer_id}") + + ping_count = 0 + + try: + while True: + try: + print(f"[INFO] Waiting for ping data from {peer_id}...") + logging.debug(f"Stream state: {stream}") + data = await stream.read(PING_LENGTH) + + if not data: + print( + f"[INFO] No data received," + f"connection likely closed by {peer_id}" + ) + logging.debug("No data received, stream closed") + break + + if len(data) == 0: + print(f"[INFO] Empty data received, connection closed by {peer_id}") + logging.debug("Empty data received") + break + + ping_count += 1 + print( + f"[PING {ping_count}] Received ping from {peer_id}:" + f"{len(data)} bytes" + ) + logging.debug(f"Ping data: {data.hex()}") + + await stream.write(data) + print(f"[PING {ping_count}] Echoed ping back to {peer_id}") + + except Exception as e: + print(f"[ERROR] Error in ping loop with {peer_id}: {e}") + logging.exception("Ping loop error") + break + + except Exception as e: + print(f"[ERROR] Error handling ping from {peer_id}: {e}") + logging.exception("Ping handler error") + finally: + try: + print(f"[INFO] Closing ping stream with {peer_id}") + await stream.close() + except Exception as e: + logging.debug(f"Error closing stream: {e}") + + print(f"[INFO] Ping session completed with {peer_id} ({ping_count} pings)") + + +async def send_ping_sequence(stream: INetStream, count: int = 5) -> None: + """Send a sequence of pings compatible with js-libp2p.""" + peer_id = stream.muxed_conn.peer_id + print(f"[INFO] Starting ping sequence to {peer_id} ({count} pings)") + + import os + import time + + rtts = [] + + for i in range(1, count + 1): + try: + payload = os.urandom(PING_LENGTH) + print(f"[PING {i}/{count}] Sending ping to {peer_id}") + logging.debug(f"Sending payload: {payload.hex()}") + start_time = time.time() + + await stream.write(payload) + + with trio.fail_after(RESP_TIMEOUT): + response = await stream.read(PING_LENGTH) + + end_time = time.time() + rtt = (end_time - start_time) * 1000 + + if ( + response + and len(response) >= PING_LENGTH + and response[:PING_LENGTH] == payload + ): + rtts.append(rtt) + print(f"[PING {i}] Successful! RTT: {rtt:.2f}ms") + else: + print(f"[ERROR] Ping {i} failed: response mismatch or incomplete") + if response: + logging.debug(f"Expected: {payload.hex()}") + logging.debug(f"Received: {response.hex()}") + + if i < count: + await trio.sleep(1) + + except trio.TooSlowError: + print(f"[ERROR] Ping {i} timed out after {RESP_TIMEOUT}s") + except Exception as e: + print(f"[ERROR] Ping {i} failed: {e}") + logging.exception(f"Ping {i} error") + + if rtts: + avg_rtt = sum(rtts) / len(rtts) + min_rtt = min(rtts) + max_rtts = max(rtts) + success_count = len(rtts) + loss_rate = ((count - success_count) / count) * 100 + + print( + f" Packets: Sent={count}, Received={success_count}," + f" Lost={count - success_count}" + ) + print(f" Loss rate: {loss_rate:.1f}%") + print( + f" RTT: min={min_rtt:.2f}ms, avg={avg_rtt:.2f}ms," f"max={max_rtts:.2f}ms" + ) + else: + print(f"\n[STATS] All pings failed ({count} attempts)") + + +def create_noise_keypair(): + try: + x25519_private_key = x25519.X25519PrivateKey.generate() + + class NoisePrivateKey: + def __init__(self, key): + self._key = key + + def to_bytes(self): + return self._key.private_bytes_raw() + + def public_key(self): + return NoisePublicKey(self._key.public_key()) + + def get_public_key(self): + return NoisePublicKey(self._key.public_key()) + + class NoisePublicKey: + def __init__(self, key): + self._key = key + + def to_bytes(self): + return self._key.public_bytes_raw() + + return NoisePrivateKey(x25519_private_key) + except Exception as e: + logging.error(f"Failed to create Noise keypair: {e}") + return None + + +async def run_server(port: int) -> None: + """Run ping server that accepts connections from js-libp2p clients.""" + listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + + key_pair = generate_new_rsa_identity() + logging.debug("Generated RSA keypair") + + noise_privkey = create_noise_keypair() + logging.debug("Generated Noise keypair") + + noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey) + logging.debug(f"Noise transport initialized: {noise_transport}") + sec_opt = {TProtocol("/noise"): noise_transport} + muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux} + + logging.info(f"Using muxer: {muxer_opt}") + + host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt) + + print("[INFO] Starting py-libp2p ping server...") + + async with host.run(listen_addrs=[listen_addr]): + print(f"[INFO] Registering ping handler for protocol: {PING_PROTOCOL_ID}") + host.set_stream_handler(PING_PROTOCOL_ID, handle_ping) + + alt_protocols = [ + TProtocol("/ping/1.0.0"), + TProtocol("/libp2p/ping/1.0.0"), + ] + + for alt_proto in alt_protocols: + print(f"[INFO] Also registering handler for: {alt_proto}") + host.set_stream_handler(alt_proto, handle_ping) + + print("[INFO] Server started!") + print(f"[INFO] Peer ID: {host.get_id()}") + print(f"[INFO] Listening: /ip4/0.0.0.0/tcp/{port}") + print(f"[INFO] Primary Protocol: {PING_PROTOCOL_ID}") + # print(f"[INFO] Security: Noise encryption") + # print(f"[INFO] Muxer: Yamux stream multiplexing") + + print("\n[INFO] Registered protocols:") + print(f" - {PING_PROTOCOL_ID}") + for proto in alt_protocols: + print(f" - {proto}") + + peer_id = host.get_id() + print("\n[TEST] Test with js-libp2p:") + print(f" node ping.js client /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}") + + print("\n[TEST] Test with py-libp2p:") + print(f" python ping.py client /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}") + + print("\n[INFO] Waiting for connections...") + print("Press Ctrl+C to exit") + + await trio.sleep_forever() + + +async def run_client(destination: str, count: int = 5) -> None: + """Run ping client to test connectivity with another peer.""" + listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0") + + key_pair = generate_new_rsa_identity() + logging.debug("Generated RSA keypair") + + noise_privkey = create_noise_keypair() + logging.debug("Generated Noise keypair") + + noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey) + logging.debug(f"Noise transport initialized: {noise_transport}") + sec_opt = {TProtocol("/noise"): noise_transport} + muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux} + + logging.info(f"Using muxer: {muxer_opt}") + + host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt) + + print("[INFO] Starting py-libp2p ping client...") + + async with host.run(listen_addrs=[listen_addr]): + print(f"[INFO] Our Peer ID: {host.get_id()}") + print(f"[INFO] Target: {destination}") + print("[INFO] Security: Noise encryption") + print("[INFO] Muxer: Yamux stream multiplexing") + + try: + maddr = multiaddr.Multiaddr(destination) + info = info_from_p2p_addr(maddr) + target_peer_id = info.peer_id + + print(f"[INFO] Target Peer ID: {target_peer_id}") + print("[INFO] Connecting to peer...") + + await host.connect(info) + print("[INFO] Connection established!") + + protocols_to_try = [ + PING_PROTOCOL_ID, + TProtocol("/ping/1.0.0"), + TProtocol("/libp2p/ping/1.0.0"), + ] + + stream = None + + for proto in protocols_to_try: + try: + print(f"[INFO] Trying to open stream with protocol: {proto}") + stream = await host.new_stream(target_peer_id, [proto]) + print(f"[INFO] Stream opened with protocol: {proto}") + break + except Exception as e: + print(f"[ERROR] Failed to open stream with {proto}: {e}") + continue + + if not stream: + print("[ERROR] Failed to open stream with any ping protocol") + return 1 + + await send_ping_sequence(stream, count) + + await stream.close() + + except Exception as e: + print(f"[ERROR] Client error: {e}") + import traceback + + traceback.print_exc() + return 1 + + print("\n[INFO] Client stopped") + return 0 + + +def main() -> None: + """Main function with argument parsing.""" + description = """ + py-libp2p ping tool for interoperability testing with js-libp2p. + Uses Noise encryption and Yamux multiplexing for compatibility. + + Server mode: Listens for ping requests from js-libp2p or py-libp2p clients. + Client mode: Sends ping requests to js-libp2p or py-libp2p servers. + """ + + example_maddr = ( + "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" + ) + + parser = argparse.ArgumentParser( + description=description, + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=f""" +Examples: + python ping.py server # Start server on port 8000 + python ping.py server --port 9000 # Start server on port 9000 + python ping.py client {example_maddr} + python ping.py client {example_maddr} --count 10 + """, + ) + + subparsers = parser.add_subparsers(dest="mode", help="Operation mode") + + server_parser = subparsers.add_parser("server", help="Run as ping server") + server_parser.add_argument( + "--port", "-p", type=int, default=8000, help="Port to listen on (default: 8000)" + ) + + client_parser = subparsers.add_parser("client", help="Run as ping client") + client_parser.add_argument("destination", help="Target peer multiaddr") + client_parser.add_argument( + "--count", + "-c", + type=int, + default=5, + help="Number of pings to send (default: 5)", + ) + + args = parser.parse_args() + + if not args.mode: + parser.print_help() + return 1 + + try: + if args.mode == "server": + trio.run(run_server, args.port) + elif args.mode == "client": + return trio.run(run_client, args.destination, args.count) + except KeyboardInterrupt: + print("\n[INFO] Goodbye!") + return 0 + except Exception as e: + print(f"[ERROR] Fatal error: {e}") + import traceback + + traceback.print_exc() + return 1 + + return 0 + + +if __name__ == "__main__": + exit(main())