diff --git a/README.md b/README.md index 7cc97de2..0cde8450 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Realtime highly decentralised chat app. ![libp2p topology](libp2p-hero.svg) -Showcasing [libp2p](https://libp2p.io/)'s superpowers in establishing ubiquitous peer-to-peer [connectivity](https://connectivity.libp2p.io/) in modern programming languages (Go, Rust, TypeScript) and runtimes (Web, native binary). +Showcasing [libp2p](https://libp2p.io/)'s superpowers in establishing ubiquitous peer-to-peer [connectivity](https://connectivity.libp2p.io/) in modern programming languages (Go, Rust, TypeScript, Python) and runtimes (Web, native binary). On top of this strong foundation, it layers a GossipSub: A Secure PubSub Protocol for Unstructured Decentralised P2P Overlays. By analogy, an event broker with distributed brokering, or a distributed PubSub protocol. @@ -20,11 +20,12 @@ Some of the cool and cutting-edge [transport protocols](https://connectivity.lib ## Packages -| Package | Description | WebTransport | WebRTC | WebRTC-direct | QUIC | TCP | -| :-------------------------- | :------------------------------ | ------------ | ------ | ------------- | ---- | --- | -| [`js-peer`](./js-peer/) | Browser Chat Peer in TypeScript | ✅ | ✅ | ✅ | ❌ | ❌ | -| [`go-peer`](./go-peer/) | Chat peer implemented in Go | ✅ | ❌ | ✅ | ✅ | ✅ | -| [`rust-peer`](./rust-peer/) | Chat peer implemented in Rust | ❌ | ❌ | ✅ | ✅ | ❌ | +| Package | Description | WebTransport | WebRTC | WebRTC-direct | QUIC | TCP | +| :------------------------------ | :------------------------------- | ------------ | ------ | ------------- | ---- | --- | +| [`js-peer`](./js-peer/) | Browser Chat Peer in TypeScript | ✅ | ✅ | ✅ | ❌ | ❌ | +| [`go-peer`](./go-peer/) | Chat peer implemented in Go | ✅ | ❌ | ✅ | ✅ | ✅ | +| [`rust-peer`](./rust-peer/) | Chat peer implemented in Rust | ❌ | ❌ | ✅ | ✅ | ❌ | +| [`python-peer`](./python-peer/) | Chat peer implemented in Python | ❌ | ❌ | ❌ | ❌ | ✅ | ✅ - Protocol supported ❌ - Protocol not supported @@ -82,3 +83,26 @@ cargo run -- --help cd go-peer go run . ``` + +## Getting started: Python + +### 1. Install dependencies + +``` +cd python-peer +pip install -r requirements.txt +``` + +### 2. Start the Python peer + +``` +python main.py +``` + +This will start the Python peer with an interactive UI. You can connect to other peers using the `/connect` command. + +For more options, try: + +``` +python main.py --help +``` diff --git a/python-peer/Dockerfile b/python-peer/Dockerfile new file mode 100644 index 00000000..0c0d31bc --- /dev/null +++ b/python-peer/Dockerfile @@ -0,0 +1,34 @@ +FROM python:3.9-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + build-essential \ + libssl-dev \ + libffi-dev \ + python3-dev \ + git \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first to leverage Docker cache +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the rest of the application +COPY . . + +# Generate protobuf files +RUN pip install --no-cache-dir protobuf && \ + python -m pip install --no-cache-dir protobuf-compiler && \ + protoc --python_out=. direct_message.proto + +# Expose port for libp2p +EXPOSE 9095/tcp +EXPOSE 9095/udp + +# Command to run the application +ENTRYPOINT ["python", "main.py"] + +# Default arguments +CMD ["--headless"] \ No newline at end of file diff --git a/python-peer/README.md b/python-peer/README.md new file mode 100644 index 00000000..3ce1026d --- /dev/null +++ b/python-peer/README.md @@ -0,0 +1,71 @@ +# Python libp2p Universal Connectivity + +This is a Python implementation of the Universal Connectivity chat application, demonstrating libp2p's capabilities in establishing peer-to-peer connections across different platforms and languages. + +## Features + +- Cross-platform connectivity with JS, Go, and Rust implementations +- Support for multiple transport protocols: + - WebRTC Direct + - TCP + - (WebTransport and QUIC support planned for future releases) +- PubSub using GossipSub for group messaging +- Direct messaging between peers +- File sharing capabilities +- Peer discovery using mDNS and DHT + +## Getting Started + +### Prerequisites + +- Python 3.8 or later + +### Installation + +1. Clone the repository +2. Install dependencies: + +```bash +pip install -r requirements.txt +``` + +### Running the application + +Start the Python peer: + +```bash +python main.py +``` + +Optional arguments: +- `--nick NAME`: Set your nickname (default: generated from peer ID) +- `--identity PATH`: Path to identity key file (default: identity.key) +- `--connect ADDR`: Multiaddr to connect to (can be repeated for multiple peers) + +## Architecture + +The Python peer implementation consists of several key components: + +1. **Node Configuration**: Setup of the libp2p node with appropriate transports and protocols +2. **Chat Room**: Implementation of the GossipSub-based group chat +3. **Direct Messaging**: Protocol for peer-to-peer direct messages +4. **File Exchange**: Protocol for sharing files between peers +5. **UI**: Terminal-based user interface + +## Integration with Other Implementations + +This Python implementation is compatible with the JS, Go, and Rust peers in the Universal Connectivity project. It can: + +- Connect to bootstrap nodes +- Discover peers via mDNS and DHT +- Exchange messages via GossipSub +- Send direct messages to peers +- Share files with peers + +## Development + +See the `CONTRIBUTING.md` file for guidelines on contributing to this project. + +## License + +This project is licensed under the dual MIT/Apache-2.0 license - see the LICENSE-MIT and LICENSE-APACHE files for details. \ No newline at end of file diff --git a/python-peer/chatroom.py b/python-peer/chatroom.py new file mode 100644 index 00000000..e1eaf7bd --- /dev/null +++ b/python-peer/chatroom.py @@ -0,0 +1,406 @@ +import asyncio +import json +import logging +from dataclasses import dataclass +from typing import List, Optional, Dict, Any +import uuid +import time + +logger = logging.getLogger("app") + +@dataclass +class ChatMessage: + """Represents a message in the chat.""" + message: str + sender_id: str + sender_nick: str + + def to_json(self) -> str: + """Serialize the message to JSON format.""" + return json.dumps({ + "message": self.message, + "sender_id": self.sender_id, + "sender_nick": self.sender_nick + }) + + @classmethod + def from_json(cls, json_str: str) -> 'ChatMessage': + """Deserialize a message from JSON format.""" + try: + data = json.loads(json_str) + return cls( + message=data.get("message", ""), + sender_id=data.get("sender_id", "unknown"), + sender_nick=data.get("sender_nick", "unknown") + ) + except json.JSONDecodeError: + logger.warning(f"Failed to decode message: {json_str}") + return cls( + message="[Invalid message format]", + sender_id="system", + sender_nick="system" + ) + +class ChatRoom: + """ + Represents a subscription to a chat room using libp2p PubSub. + """ + def __init__(self, node, chat_topic: str, file_topic: str, discovery_topic: str, nickname: str): + """ + Initialize a chat room. + + Args: + node: The libp2p node + chat_topic: The topic name for chat messages + file_topic: The topic name for file sharing + discovery_topic: The topic name for peer discovery + nickname: The user's nickname + """ + self.node = node + self.chat_topic_name = chat_topic + self.file_topic_name = file_topic + self.discovery_topic_name = discovery_topic + self.nickname = nickname + + # Message queues + self.messages = asyncio.Queue(maxsize=128) + self.sys_messages = asyncio.Queue(maxsize=128) + + # PubSub topics + self.chat_topic = None + self.file_topic = None + self.discovery_topic = None + + # Subscriptions + self.chat_subscription = None + self.file_subscription = None + self.discovery_subscription = None + + # Initialize a dictionary to store file information + self.files = {} + + @classmethod + async def join(cls, node, chat_topic: str, file_topic: str, discovery_topic: str, nickname: str) -> 'ChatRoom': + """ + Join a chat room. + + Args: + node: The libp2p node + chat_topic: The topic name for chat messages + file_topic: The topic name for file sharing + discovery_topic: The topic name for peer discovery + nickname: The user's nickname + + Returns: + ChatRoom: A new chat room instance + """ + chat_room = cls(node, chat_topic, file_topic, discovery_topic, nickname) + + # Get the pubsub service + if not hasattr(node, "pubsub") or not node.pubsub: + raise RuntimeError("PubSub service is not available on the node") + + # Join the chat topic + chat_room.chat_topic = await node.pubsub.subscribe(chat_topic) + + # Join the file topic + chat_room.file_topic = await node.pubsub.subscribe(file_topic) + + # Join the discovery topic + chat_room.discovery_topic = await node.pubsub.subscribe(discovery_topic) + + # Start background tasks for processing messages + asyncio.create_task(chat_room._handle_chat_messages()) + asyncio.create_task(chat_room._handle_file_messages()) + asyncio.create_task(chat_room._handle_discovery_messages()) + + # Register handler for file transfer protocol + await node.set_stream_handler("/universal-connectivity-file/1", chat_room._handle_file_request) + + logger.info(f"Joined chat room on topics: {chat_topic}, {file_topic}, {discovery_topic}") + return chat_room + + async def _handle_chat_messages(self) -> None: + """Process incoming chat messages.""" + while True: + try: + # Get the next message + msg = await self.chat_topic.next_message() + + # Skip messages from ourselves + if msg.from_peer == self.node.get_id(): + continue + + # Parse the message + try: + message_text = msg.data.decode('utf-8') + chat_message = ChatMessage( + message=message_text, + sender_id=str(msg.from_peer), + sender_nick=str(msg.from_peer)[-8:] # Use last 8 chars of peer ID as nickname + ) + + # Add to message queue + await self.messages.put(chat_message) + except Exception as e: + logger.error(f"Error processing chat message: {e}") + + except asyncio.CancelledError: + logger.info("Chat message handler task cancelled") + break + except Exception as e: + logger.error(f"Error in chat message handler: {e}") + await asyncio.sleep(1) # Avoid tight loop on errors + + async def _handle_file_messages(self) -> None: + """Process incoming file announcement messages.""" + while True: + try: + # Get the next message + msg = await self.file_topic.next_message() + + # Skip messages from ourselves + if msg.from_peer == self.node.get_id(): + continue + + # The message should contain a file ID + file_id = msg.data.decode('utf-8') + + # Request the file from the sender + try: + file_data = await self._request_file(msg.from_peer, file_id) + + # Store the file + self.files[file_id] = { + "data": file_data, + "from_peer": str(msg.from_peer), + "timestamp": time.time() + } + + # Notify about the file + await self.messages.put(ChatMessage( + message=f"File: {file_id} ({len(file_data)} bytes) from {str(msg.from_peer)}", + sender_id=str(msg.from_peer), + sender_nick=str(msg.from_peer)[-8:] + )) + + except Exception as e: + logger.error(f"Error requesting file {file_id} from {msg.from_peer}: {e}") + + except asyncio.CancelledError: + logger.info("File message handler task cancelled") + break + except Exception as e: + logger.error(f"Error in file message handler: {e}") + await asyncio.sleep(1) # Avoid tight loop on errors + + async def _handle_discovery_messages(self) -> None: + """Process incoming peer discovery messages.""" + while True: + try: + # Get the next message + msg = await self.discovery_topic.next_message() + + # Skip messages from ourselves + if msg.from_peer == self.node.get_id(): + continue + + # The message should contain peer information + try: + peer_info = json.loads(msg.data.decode('utf-8')) + peer_id = peer_info.get("peer_id") + addrs = peer_info.get("addrs", []) + + if peer_id and addrs: + logger.info(f"Discovered peer {peer_id} with addresses: {addrs}") + + # Try to connect if not already connected + if not self.node.is_connected(peer_id): + for addr in addrs: + try: + await self.node.connect(f"{addr}/p2p/{peer_id}") + logger.info(f"Connected to discovered peer {peer_id}") + break + except Exception as e: + logger.warning(f"Failed to connect to {peer_id} at {addr}: {e}") + except Exception as e: + logger.error(f"Error processing discovery message: {e}") + + except asyncio.CancelledError: + logger.info("Discovery message handler task cancelled") + break + except Exception as e: + logger.error(f"Error in discovery message handler: {e}") + await asyncio.sleep(1) # Avoid tight loop on errors + + async def _handle_file_request(self, stream) -> None: + """Handle incoming file requests.""" + try: + # Read the request (length-prefixed file ID) + length_bytes = await stream.read(8) # Read up to 8 bytes for the length prefix + length = int.from_bytes(length_bytes, byteorder='big') + + if length > 1000000: # Limit to 1MB file IDs + logger.warning(f"File ID too large: {length} bytes") + await stream.reset() + return + + file_id = await stream.read(length) + file_id_str = file_id.decode('utf-8') + + # Check if we have this file + if file_id_str in self.files: + file_data = self.files[file_id_str]["data"] + + # Send the file (length-prefixed) + await stream.write(len(file_data).to_bytes(8, byteorder='big')) + await stream.write(file_data) + logger.info(f"Sent file {file_id_str} ({len(file_data)} bytes)") + else: + # File not found - send empty response + await stream.write((0).to_bytes(8, byteorder='big')) + logger.warning(f"Requested file {file_id_str} not found") + + # Close the stream + await stream.close() + + except Exception as e: + logger.error(f"Error handling file request: {e}") + try: + await stream.reset() + except: + pass + + async def _request_file(self, peer_id, file_id: str) -> bytes: + """ + Request a file from a peer. + + Args: + peer_id: The peer to request the file from + file_id: The ID of the file to request + + Returns: + bytes: The file data + """ + # Create a new stream to the peer + stream = await self.node.new_stream(peer_id, ["/universal-connectivity-file/1"]) + + try: + # Send the request (length-prefixed file ID) + file_id_bytes = file_id.encode('utf-8') + await stream.write(len(file_id_bytes).to_bytes(8, byteorder='big')) + await stream.write(file_id_bytes) + + # Read the response (length-prefixed file data) + length_bytes = await stream.read(8) + length = int.from_bytes(length_bytes, byteorder='big') + + if length == 0: + raise ValueError(f"File {file_id} not found on peer {peer_id}") + + if length > 500_000_000: # Limit to 500MB files + raise ValueError(f"File too large: {length} bytes") + + # Read the file data + file_data = await stream.read(length) + + # Close the stream + await stream.close() + + return file_data + + except Exception as e: + logger.error(f"Error requesting file: {e}") + try: + await stream.reset() + except: + pass + raise + + async def publish(self, message: str) -> None: + """ + Publish a message to the chat topic. + + Args: + message: The message to publish + """ + if not message: + return + + try: + # Create a chat message + chat_message = ChatMessage( + message=message, + sender_id=str(self.node.get_id()), + sender_nick=self.nickname + ) + + # Publish the message + await self.chat_topic.publish(message.encode('utf-8')) + logger.debug(f"Published message: {message}") + + except Exception as e: + logger.error(f"Error publishing message: {e}") + # Add error message to system messages + await self.sys_messages.put(ChatMessage( + message=f"Failed to send message: {e}", + sender_id="system", + sender_nick="system" + )) + + async def share_file(self, file_data: bytes, file_name: Optional[str] = None) -> str: + """ + Share a file with the chat room. + + Args: + file_data: The file data to share + file_name: Optional file name + + Returns: + str: The file ID used to reference the file + """ + # Generate a file ID + file_id = file_name or f"file-{uuid.uuid4()}" + + # Store the file + self.files[file_id] = { + "data": file_data, + "from_peer": str(self.node.get_id()), + "timestamp": time.time() + } + + try: + # Announce the file + await self.file_topic.publish(file_id.encode('utf-8')) + + # Notify about the file locally + await self.sys_messages.put(ChatMessage( + message=f"Shared file: {file_id} ({len(file_data)} bytes)", + sender_id="system", + sender_nick="system" + )) + + return file_id + + except Exception as e: + logger.error(f"Error sharing file: {e}") + # Add error message to system messages + await self.sys_messages.put(ChatMessage( + message=f"Failed to share file: {e}", + sender_id="system", + sender_nick="system" + )) + raise + + def list_peers(self) -> List[str]: + """ + List peers subscribed to the chat topic. + + Returns: + List[str]: List of peer IDs + """ + try: + return self.node.pubsub.get_peers(self.chat_topic_name) + except Exception as e: + logger.error(f"Error listing peers: {e}") + return [] \ No newline at end of file diff --git a/python-peer/direct_message.proto b/python-peer/direct_message.proto new file mode 100644 index 00000000..bce83b17 --- /dev/null +++ b/python-peer/direct_message.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +package dm; + +service DirectMessage { + rpc DirectMessage (DirectMessageRequest) returns (DirectMessageResponse) {} +} + +message Metadata { + string clientVersion = 1; // client version + int64 timestamp = 2; // unix time +} + +enum Status { + UNKNOWN = 0; + OK = 200; + ERROR = 500; +} + +message DirectMessageRequest { + Metadata metadata = 1; + string content = 2; + string type = 3; +} + +message DirectMessageResponse{ + Metadata metadata = 1; + Status status = 2; + optional string statusText = 3; +} \ No newline at end of file diff --git a/python-peer/direct_message.py b/python-peer/direct_message.py new file mode 100644 index 00000000..4d2cad89 --- /dev/null +++ b/python-peer/direct_message.py @@ -0,0 +1,236 @@ +import asyncio +import json +import logging +import time +from enum import Enum +from typing import Dict, Any, Optional, Callable, List + +logger = logging.getLogger("app") + +# Protocol definitions +DIRECT_MESSAGE_PROTOCOL = "/universal-connectivity/direct-message/1.0.0" +MIME_TEXT_PLAIN = "text/plain" +DM_CLIENT_VERSION = "0.0.1" + +class Status(Enum): + UNKNOWN = 0 + OK = 200 + ERROR = 500 + +class DirectMessageService: + """ + Implementation of the direct messaging protocol. + + This service allows sending direct messages to specific peers + rather than broadcasting to the entire pubsub topic. + """ + def __init__(self, node): + """ + Initialize the direct messaging service. + + Args: + node: The libp2p node + """ + self.node = node + self.dm_peers = set() # Set of peers supporting direct messaging + self.message_handlers = [] # List of message handler callbacks + + async def start(self) -> None: + """Start the direct messaging service.""" + # Register the direct message protocol handler + await self.node.set_stream_handler(DIRECT_MESSAGE_PROTOCOL, self._handle_direct_message) + logger.info(f"Registered direct message protocol handler: {DIRECT_MESSAGE_PROTOCOL}") + + async def stop(self) -> None: + """Stop the direct messaging service.""" + # Remove the protocol handler + await self.node.remove_stream_handler(DIRECT_MESSAGE_PROTOCOL) + logger.info("Removed direct message protocol handler") + + def add_message_handler(self, handler: Callable) -> None: + """ + Add a handler for incoming direct messages. + + Args: + handler: Callback function that takes (from_peer, content, type) + """ + self.message_handlers.append(handler) + + def remove_message_handler(self, handler: Callable) -> None: + """ + Remove a message handler. + + Args: + handler: The handler to remove + """ + if handler in self.message_handlers: + self.message_handlers.remove(handler) + + def handle_peer_connected(self, peer_id: str) -> None: + """ + Handle a peer connection event. + + Args: + peer_id: The connected peer ID + """ + self.dm_peers.add(peer_id) + + def handle_peer_disconnected(self, peer_id: str) -> None: + """ + Handle a peer disconnection event. + + Args: + peer_id: The disconnected peer ID + """ + if peer_id in self.dm_peers: + self.dm_peers.remove(peer_id) + + def is_dm_peer(self, peer_id: str) -> bool: + """ + Check if a peer supports direct messaging. + + Args: + peer_id: The peer ID to check + + Returns: + bool: True if the peer supports direct messaging + """ + return peer_id in self.dm_peers + + async def send_message(self, peer_id: str, content: str, msg_type: str = MIME_TEXT_PLAIN) -> bool: + """ + Send a direct message to a peer. + + Args: + peer_id: The peer ID to send the message to + content: The message content + msg_type: The message type/MIME type + + Returns: + bool: True if the message was sent successfully + """ + if not content: + logger.error("Message content cannot be empty") + return False + + # Create a stream to the peer + try: + stream = await self.node.new_stream(peer_id, [DIRECT_MESSAGE_PROTOCOL]) + except Exception as e: + logger.error(f"Failed to create stream to peer {peer_id}: {e}") + return False + + try: + # Prepare the request + request = { + "content": content, + "type": msg_type, + "metadata": { + "client_version": DM_CLIENT_VERSION, + "timestamp": int(time.time() * 1000) # milliseconds + } + } + + # Send the request + request_json = json.dumps(request).encode('utf-8') + length_prefix = len(request_json).to_bytes(4, byteorder='big') + await stream.write(length_prefix + request_json) + + # Read the response + response_length_bytes = await stream.read(4) + if not response_length_bytes: + logger.error("No response received") + return False + + response_length = int.from_bytes(response_length_bytes, byteorder='big') + response_json = await stream.read(response_length) + + # Parse the response + response = json.loads(response_json.decode('utf-8')) + + if not response.get("metadata"): + logger.error("No metadata in response") + return False + + if response.get("status") != Status.OK.value: + logger.error(f"Received error status: {response.get('status')}") + return False + + # Add to DM peers set if successful + self.dm_peers.add(peer_id) + + return True + + except Exception as e: + logger.error(f"Error sending direct message: {e}") + return False + + finally: + # Close the stream + try: + await stream.close() + except Exception as e: + logger.error(f"Error closing stream: {e}") + + async def _handle_direct_message(self, stream) -> None: + """ + Handle incoming direct messages. + + Args: + stream: The incoming stream + """ + try: + # Read the request length + length_bytes = await stream.read(4) + if not length_bytes: + logger.error("Empty message received") + await stream.reset() + return + + request_length = int.from_bytes(length_bytes, byteorder='big') + if request_length > 1000000: # Limit to 1MB messages + logger.warning(f"Message too large: {request_length} bytes") + await stream.reset() + return + + # Read the request body + request_json = await stream.read(request_length) + request = json.loads(request_json.decode('utf-8')) + + # Prepare the response + response = { + "status": Status.OK.value, + "metadata": { + "client_version": DM_CLIENT_VERSION, + "timestamp": int(time.time() * 1000) + } + } + + # Send the response + response_json = json.dumps(response).encode('utf-8') + length_prefix = len(response_json).to_bytes(4, byteorder='big') + await stream.write(length_prefix + response_json) + + # Close the stream + await stream.close() + + # Add the sender to the DM peers set + peer_id = stream.conn.remote_peer_id + self.dm_peers.add(peer_id) + + # Notify message handlers + content = request.get("content", "") + msg_type = request.get("type", MIME_TEXT_PLAIN) + + for handler in self.message_handlers: + try: + await handler(peer_id, content, msg_type) + except Exception as e: + logger.error(f"Error in message handler: {e}") + + except Exception as e: + logger.error(f"Error handling direct message: {e}") + try: + await stream.reset() + except: + pass \ No newline at end of file diff --git a/python-peer/discovery.py b/python-peer/discovery.py new file mode 100644 index 00000000..b2003816 --- /dev/null +++ b/python-peer/discovery.py @@ -0,0 +1,113 @@ +import asyncio +import json +import logging +import time +from typing import List, Dict, Any + +logger = logging.getLogger("app") + +async def setup_discovery(node, service_tag: str) -> None: + """ + Set up mDNS discovery for local peer finding. + + Args: + node: The libp2p node + service_tag: The service tag for discovery + """ + try: + await node.setup_mdns(service_tag) + logger.info(f"mDNS discovery set up with service tag: {service_tag}") + except Exception as e: + logger.error(f"Failed to set up mDNS discovery: {e}") + +class DiscoveryNotifee: + """ + Notification handler for peer discovery events. + """ + def __init__(self, node): + self.node = node + + async def peer_discovered(self, peer_info): + """Handle peer discovery events.""" + peer_id = peer_info.get("id") + addrs = peer_info.get("addrs", []) + + if not peer_id or not addrs: + return + + logger.info(f"Discovered peer: {peer_id}") + + # Skip if it's our own peer ID + if peer_id == str(self.node.get_id()): + return + + # Try to connect if not already connected + if not self.node.is_connected(peer_id): + for addr in addrs: + try: + await self.node.connect(f"{addr}/p2p/{peer_id}") + logger.info(f"Connected to discovered peer: {peer_id}") + break + except Exception as e: + logger.warning(f"Failed to connect to {peer_id} at {addr}: {e}") + +async def discover_peers(node, service_tag: str) -> None: + """ + Actively discover peers using DHT and other mechanisms. + + Args: + node: The libp2p node + service_tag: The service tag for discovery + """ + # Set up a notifee for discovery events + notifee = DiscoveryNotifee(node) + + # Register for peer discovery events + if hasattr(node, "on_peer_discovered"): + node.on_peer_discovered(notifee.peer_discovered) + + # Periodically announce our presence to the discovery topic + pubsub_discovery_topic = "universal-connectivity-browser-peer-discovery" + + while True: + try: + # Announce our addrs to the discovery topic + addrs = [str(addr) for addr in node.get_addrs()] + + announce_msg = json.dumps({ + "peer_id": str(node.get_id()), + "addrs": addrs, + "timestamp": time.time() + }) + + # Publish to the discovery topic if we have a pubsub + if hasattr(node, "pubsub") and node.pubsub: + await node.pubsub.publish(pubsub_discovery_topic, announce_msg.encode('utf-8')) + logger.debug(f"Published discovery announcement: {announce_msg}") + + # Use Kademlia DHT for discovery if available + if hasattr(node, "dht") and node.dht: + try: + # Advertise our presence + await node.dht.provide(service_tag.encode('utf-8')) + + # Find other peers + peers = await node.dht.find_providers(service_tag.encode('utf-8')) + + for peer in peers: + if peer.get("id") == str(node.get_id()): + continue + + await notifee.peer_discovered(peer) + except Exception as e: + logger.error(f"DHT discovery error: {e}") + + # Wait before next announcement + await asyncio.sleep(60) # Announce every minute + + except asyncio.CancelledError: + logger.info("Discovery task cancelled") + break + except Exception as e: + logger.error(f"Error in discover_peers: {e}") + await asyncio.sleep(30) # Back off on errors \ No newline at end of file diff --git a/python-peer/identity.py b/python-peer/identity.py new file mode 100644 index 00000000..c88a62ff --- /dev/null +++ b/python-peer/identity.py @@ -0,0 +1,63 @@ +import os +import logging +from typing import Optional + +from libp2p.crypto.keys import KeyPair, PrivateKey, PublicKey +from libp2p.crypto.secp256k1 import create_new_key_pair +from libp2p.identity.identify.protocol import IdentifyProtocol + +logger = logging.getLogger("app") + +async def load_or_create_identity(identity_path: str) -> PrivateKey: + """ + Load an existing identity key from the given path or create a new one if it doesn't exist. + + Args: + identity_path: Path to the identity key file + + Returns: + PrivateKey: The node's private key + """ + try: + # Check if the identity file exists + if os.path.exists(identity_path): + logger.info(f"Loading existing identity from {identity_path}") + with open(identity_path, "rb") as f: + private_key_bytes = f.read() + # Deserialize the private key + private_key = PrivateKey.deserialize(private_key_bytes) + logger.info(f"Loaded identity with public key: {private_key.get_public_key().serialize().hex()[:16]}...") + return private_key + + # Create a new identity if none exists + logger.info(f"No existing identity found. Creating new identity at {identity_path}") + key_pair = create_new_key_pair() + private_key = key_pair.private_key + + # Save the private key to the file + with open(identity_path, "wb") as f: + f.write(private_key.serialize()) + + logger.info(f"Created and saved new identity with public key: {private_key.get_public_key().serialize().hex()[:16]}...") + return private_key + + except Exception as e: + logger.error(f"Error loading or creating identity: {e}") + # If there was an error, create an ephemeral key without saving it + logger.info("Creating ephemeral identity") + key_pair = create_new_key_pair() + return key_pair.private_key + +async def setup_identify_protocol(node) -> IdentifyProtocol: + """ + Set up the identify protocol for the node. + + Args: + node: The libp2p node + + Returns: + IdentifyProtocol: The identify protocol instance + """ + identify_protocol = IdentifyProtocol(node) + await node.set_stream_handler(identify_protocol.get_protocol_id(), identify_protocol.handler) + return identify_protocol \ No newline at end of file diff --git a/python-peer/libp2p_node_config.py b/python-peer/libp2p_node_config.py new file mode 100644 index 00000000..66b58ae5 --- /dev/null +++ b/python-peer/libp2p_node_config.py @@ -0,0 +1,85 @@ +import logging +from typing import Dict, Any, List + +from libp2p.crypto.keys import PrivateKey +from libp2p.security.noise.transport import NoiseTransport +from libp2p.security.secio import SecioTransport +from libp2p.stream_muxer.mplex.mplex import MPLEXStreamMuxer +from libp2p.stream_muxer.yamux.yamux import YAMUXStreamMuxer +from libp2p.transport.tcp.tcp import TCPTransport +from libp2p.transport.upgrader import TransportUpgrader +from libp2p.relay.circuit.transport import CircuitRelay + +logger = logging.getLogger("app") + +def create_node_config(private_key: PrivateKey) -> Dict[str, Any]: + """ + Create and return a configuration dictionary for a libp2p node. + + Args: + private_key: The node's private key for identity + + Returns: + Dict[str, Any]: Configuration dictionary for libp2p node creation + """ + # Define listening addresses + listen_addrs = [ + "/ip4/0.0.0.0/tcp/9095", # TCP for general connectivity + "/ip6/::/tcp/9095", # IPv6 TCP + ] + + # Create the node configuration + return { + "identity": private_key, + "listen_addrs": listen_addrs, + + # Transport configuration + "transport_opt": { + "transports": [ + TCPTransport(), + # Add WebRTC later when Python implementation is available + # Add QUIC later when Python implementation is available + ] + }, + + # Security configuration - security transport modules + "security_opt": { + "security_transports": [ + NoiseTransport(), # Noise protocol for encryption + SecioTransport(), # Fallback security transport + ] + }, + + # Stream muxer configuration + "muxer_opt": { + "stream_muxers": [ + MPLEXStreamMuxer(), # Mplex for multiplexing + YAMUXStreamMuxer(), # YAMUX as an alternative + ] + }, + + # Enable the relay service + "relay_opt": { + "enabled": True, + "hop": True, # Allow the node to serve as a relay hop for other peers + "active": True, # Actively establish and maintain relay connections + "discover": True, # Discover relay nodes + }, + + # Enable NAT port mapping + "nat_opt": { + "enabled": True, + }, + + # Enable the pubsub service with GossipSub + "pubsub_opt": { + "enabled": True, + "router_type": "gossipsub", # Use GossipSub + "allow_publish_to_zero_peers": True, # Allow publishing even if no peers are connected + "sign_messages": True, # Sign all messages + "strict_signing": True, # Require valid signatures + }, + + # User agent for identification + "user_agent": "universal-connectivity/python-peer", + } \ No newline at end of file diff --git a/python-peer/main.py b/python-peer/main.py new file mode 100644 index 00000000..20c243b2 --- /dev/null +++ b/python-peer/main.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +import asyncio +import logging +import os +import random +import string +import sys +import time +from typing import List, Optional, Dict, Any + +import click +from rich.console import Console +from rich.logging import RichHandler + +from libp2p import create_libp2p_node +from libp2p_node_config import create_node_config +from chatroom import ChatRoom, ChatMessage +from identity import load_or_create_identity +from ui import ChatUI +from discovery import setup_discovery, discover_peers + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(message)s", + datefmt="[%X]", + handlers=[RichHandler(rich_tracebacks=True)] +) +logger = logging.getLogger("app") + +# Constants +DISCOVERY_SERVICE_TAG = "universal-connectivity" +CHAT_TOPIC = "universal-connectivity" +CHAT_FILE_TOPIC = "universal-connectivity-file" +PUBSUB_DISCOVERY_TOPIC = "universal-connectivity-browser-peer-discovery" + +def generate_default_nick(peer_id: str) -> str: + """Generate a default nickname based on username and peer ID.""" + username = os.environ.get("USER", "user") + short_id = peer_id[-8:] if peer_id else ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) + return f"{username}-{short_id}" + +async def setup_libp2p_node(identity_path: str) -> Any: + """Initialize and return a libp2p node.""" + private_key = await load_or_create_identity(identity_path) + config = create_node_config(private_key) + + # Create the libp2p node + node = await create_libp2p_node(config) + + logger.info(f"Node started with peer ID: {node.get_id().pretty()}") + for addr in node.get_addrs(): + logger.info(f"Listening on: {addr}/p2p/{node.get_id().pretty()}") + + return node + +async def connect_to_peers(node: Any, peers: List[str]) -> None: + """Connect to a list of peers by multiaddress.""" + for peer_addr in peers: + try: + logger.info(f"Connecting to peer: {peer_addr}") + await node.connect(peer_addr) + logger.info(f"Successfully connected to {peer_addr}") + except Exception as e: + logger.error(f"Failed to connect to {peer_addr}: {e}") + +@click.command() +@click.option("--nick", default="", help="Nickname to use in chat (generated if empty)") +@click.option("--identity", default="identity.key", help="Path to identity key file") +@click.option("--headless", is_flag=True, help="Run without chat UI") +@click.option("--connect", multiple=True, help="Multiaddr to connect to (can be used multiple times)") +def main(nick: str, identity: str, headless: bool, connect: List[str]): + """Universal Connectivity Chat Application - Python Peer""" + console = Console() + console.print("[bold blue]Starting Universal Connectivity Python Peer[/bold blue]") + + # Start the asyncio event loop + loop = asyncio.get_event_loop() + + try: + # Setup the libp2p node + node = loop.run_until_complete(setup_libp2p_node(identity)) + + # Connect to specified peers + if connect: + loop.run_until_complete(connect_to_peers(node, connect)) + + # Setup peer discovery + loop.run_until_complete(setup_discovery(node, DISCOVERY_SERVICE_TAG)) + + # Start DHT-based peer discovery + discovery_task = loop.create_task(discover_peers(node, DISCOVERY_SERVICE_TAG)) + + # Use the provided nickname or generate a default one + nickname = nick if nick else generate_default_nick(node.get_id().pretty()) + + # Create the chat room + chat_room = loop.run_until_complete( + ChatRoom.join(node, CHAT_TOPIC, CHAT_FILE_TOPIC, PUBSUB_DISCOVERY_TOPIC, nickname) + ) + + # System message about the node's identity + chat_room.sys_messages.put_nowait( + ChatMessage( + message=f"PeerID: {node.get_id().pretty()}", + sender_id="system", + sender_nick="system" + ) + ) + + for addr in node.get_addrs(): + chat_room.sys_messages.put_nowait( + ChatMessage( + message=f"Listening on: {addr}/p2p/{node.get_id().pretty()}", + sender_id="system", + sender_nick="system" + ) + ) + + if headless: + # Run in headless mode without UI + console.print("[yellow]Running in headless mode. Press Ctrl+C to exit.[/yellow]") + try: + loop.run_forever() + except KeyboardInterrupt: + pass + else: + # Start the UI + ui = ChatUI(chat_room, node) + ui_task = loop.create_task(ui.run()) + + # Run until UI exits + loop.run_until_complete(ui_task) + + except Exception as e: + console.print(f"[bold red]Error:[/bold red] {str(e)}") + logger.exception("An error occurred") + return 1 + finally: + # Clean up + tasks = asyncio.all_tasks(loop) + for task in tasks: + task.cancel() + + loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) + loop.close() + + return 0 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/python-peer/requirements.txt b/python-peer/requirements.txt new file mode 100644 index 00000000..e5dd03f0 --- /dev/null +++ b/python-peer/requirements.txt @@ -0,0 +1,9 @@ +py-libp2p>=0.1.5 +aiohttp>=3.8.4 +async-timeout>=4.0.2 +protobuf>=4.22.0 +multiaddr>=0.0.9 +click>=8.1.3 +rich>=13.3.5 +aioconsole>=0.6.0 +cryptography>=40.0.0 \ No newline at end of file diff --git a/python-peer/ui.py b/python-peer/ui.py new file mode 100644 index 00000000..4158eebc --- /dev/null +++ b/python-peer/ui.py @@ -0,0 +1,428 @@ +import asyncio +import logging +import os +import time +from typing import Dict, List, Optional, Any + +from rich.console import Console +from rich.panel import Panel +from rich.text import Text +from rich.layout import Layout +from rich.table import Table +from rich.live import Live +from rich.align import Align +from rich import box +import aioconsole + +from chatroom import ChatRoom, ChatMessage + +logger = logging.getLogger("app") + +class ChatUI: + """ + Text-based UI for the chat application. + """ + def __init__(self, chat_room: ChatRoom, node): + """ + Initialize the chat UI. + + Args: + chat_room: The chat room instance + node: The libp2p node + """ + self.chat_room = chat_room + self.node = node + self.console = Console() + self.messages: List[Dict[str, Any]] = [] + self.max_messages = 100 + self.command_handlers = { + "/help": self._handle_help, + "/peers": self._handle_peers, + "/nick": self._handle_nick, + "/connect": self._handle_connect, + "/file": self._handle_file, + "/clear": self._handle_clear, + "/quit": self._handle_quit, + } + self.running = False + self.layout = self._create_layout() + + def _create_layout(self) -> Layout: + """Create the UI layout.""" + layout = Layout() + + # Split into top (messages) and bottom (input) + layout.split( + Layout(name="main", ratio=9), + Layout(name="input", ratio=1) + ) + + # Split top into messages and info + layout["main"].split_row( + Layout(name="messages", ratio=3), + Layout(name="info", ratio=1) + ) + + return layout + + def _build_messages_panel(self) -> Panel: + """Build the panel that displays messages.""" + # Create a text object to hold all messages + text = Text() + + # Add each message to the text + for msg in self.messages[-self.max_messages:]: + timestamp = time.strftime("%H:%M:%S", time.localtime(msg.get("timestamp", time.time()))) + + if msg.get("type") == "system": + # System messages in yellow + text.append(f"{timestamp} [System] ", style="yellow") + text.append(f"{msg['content']}\n", style="yellow") + else: + # Chat messages with nickname in appropriate color + nick = msg.get("nick", "unknown") + nick_style = "bright_blue" if nick == self.chat_room.nickname else "bright_green" + + text.append(f"{timestamp} ", style="dim") + text.append(f"[{nick}] ", style=nick_style) + text.append(f"{msg['content']}\n") + + # Create and return a panel containing the text + return Panel( + text, + title="Messages", + border_style="blue", + box=box.ROUNDED + ) + + def _build_info_panel(self) -> Panel: + """Build the panel that displays peer information.""" + # Create a table for peer information + table = Table(box=box.SIMPLE, expand=True) + table.add_column("Peer ID", style="cyan") + table.add_column("Status", style="green") + + # Add each connected peer to the table + peers = self.node.get_connected_peers() + for peer in peers: + short_id = str(peer)[-12:] + table.add_row(short_id, "connected") + + # Add a help footer + help_text = Text("\nCommands: /help, /peers, /nick, /connect, /file, /clear, /quit", style="dim") + + # Create a layout for info panel content + info_layout = Layout() + info_layout.split( + Layout(Align(table, vertical="top"), name="peers", ratio=4), + Layout(Align(help_text, vertical="bottom"), name="help", ratio=1) + ) + + # Create and return a panel containing the info layout + return Panel( + info_layout, + title=f"Connected Peers ({len(peers)})", + border_style="green", + box=box.ROUNDED + ) + + def _build_input_panel(self) -> Panel: + """Build the input panel.""" + text = Text(f"Type a message or command (as {self.chat_room.nickname})...", style="bright_blue") + return Panel( + text, + title="Input", + border_style="yellow", + box=box.ROUNDED + ) + + def _render_ui(self) -> None: + """Render the complete UI.""" + # Update the layout with the latest content + self.layout["messages"].update(self._build_messages_panel()) + self.layout["info"].update(self._build_info_panel()) + self.layout["input"].update(self._build_input_panel()) + + # Return the layout for rendering + return self.layout + + async def run(self) -> None: + """Run the chat UI.""" + self.running = True + + # Create tasks for handling different message queues + chat_task = asyncio.create_task(self._handle_chat_messages()) + sys_task = asyncio.create_task(self._handle_sys_messages()) + input_task = asyncio.create_task(self._handle_user_input()) + + # Clear the screen + os.system('cls' if os.name == 'nt' else 'clear') + + # Add welcome message + self.messages.append({ + "content": f"Welcome to Universal Connectivity Chat! You are connected as: {self.chat_room.nickname}", + "timestamp": time.time(), + "type": "system" + }) + + # Start the Live display + with Live(self._render_ui(), refresh_per_second=4) as live: + self.live = live + + # Wait for any task to complete (or an exception) + done, pending = await asyncio.wait( + [chat_task, sys_task, input_task], + return_when=asyncio.FIRST_COMPLETED + ) + + # Cancel pending tasks + for task in pending: + task.cancel() + + # Check for exceptions + for task in done: + try: + await task + except Exception as e: + logger.error(f"Error in UI task: {e}") + + self.running = False + + async def _handle_chat_messages(self) -> None: + """Process incoming chat messages.""" + while self.running: + try: + # Get the next message + msg = await self.chat_room.messages.get() + + # Add to the message list + self.messages.append({ + "content": msg.message, + "nick": msg.sender_nick, + "sender_id": msg.sender_id, + "timestamp": time.time(), + "type": "chat" + }) + + # Update the UI + if hasattr(self, "live"): + self.live.update(self._render_ui()) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error handling chat message: {e}") + await asyncio.sleep(1) + + async def _handle_sys_messages(self) -> None: + """Process system messages.""" + while self.running: + try: + # Get the next message + msg = await self.chat_room.sys_messages.get() + + # Add to the message list + self.messages.append({ + "content": msg.message, + "timestamp": time.time(), + "type": "system" + }) + + # Update the UI + if hasattr(self, "live"): + self.live.update(self._render_ui()) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error handling system message: {e}") + await asyncio.sleep(1) + + async def _handle_user_input(self) -> None: + """Handle user input.""" + while self.running: + try: + # Get user input + user_input = await aioconsole.ainput("") + + if not user_input: + continue + + # Handle commands + if user_input.startswith("/"): + command_parts = user_input.split(" ", 1) + command = command_parts[0].lower() + args = command_parts[1] if len(command_parts) > 1 else "" + + # Call the appropriate command handler + handler = self.command_handlers.get(command) + if handler: + await handler(args) + else: + # Unknown command + self.messages.append({ + "content": f"Unknown command: {command}. Type /help for available commands.", + "timestamp": time.time(), + "type": "system" + }) + else: + # Regular message - send to chat + await self.chat_room.publish(user_input) + + # Add to local messages + self.messages.append({ + "content": user_input, + "nick": self.chat_room.nickname, + "sender_id": str(self.node.get_id()), + "timestamp": time.time(), + "type": "chat" + }) + + # Update the UI + if hasattr(self, "live"): + self.live.update(self._render_ui()) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error handling user input: {e}") + await asyncio.sleep(1) + + async def _handle_help(self, args: str) -> None: + """Handle the /help command.""" + help_text = """ +Available commands: +/help - Show this help message +/peers - List connected peers +/nick - Change your nickname +/connect - Connect to a peer by multiaddress +/file - Share a file +/clear - Clear the message history +/quit - Exit the application +""" + self.messages.append({ + "content": help_text, + "timestamp": time.time(), + "type": "system" + }) + + async def _handle_peers(self, args: str) -> None: + """Handle the /peers command.""" + peers = self.node.get_connected_peers() + if not peers: + self.messages.append({ + "content": "No peers connected.", + "timestamp": time.time(), + "type": "system" + }) + return + + peers_text = "Connected peers:\n" + for i, peer in enumerate(peers, 1): + peers_text += f"{i}. {peer}\n" + + self.messages.append({ + "content": peers_text, + "timestamp": time.time(), + "type": "system" + }) + + async def _handle_nick(self, args: str) -> None: + """Handle the /nick command.""" + if not args: + self.messages.append({ + "content": "Usage: /nick ", + "timestamp": time.time(), + "type": "system" + }) + return + + old_nick = self.chat_room.nickname + self.chat_room.nickname = args + + self.messages.append({ + "content": f"Nickname changed from {old_nick} to {args}", + "timestamp": time.time(), + "type": "system" + }) + + async def _handle_connect(self, args: str) -> None: + """Handle the /connect command.""" + if not args: + self.messages.append({ + "content": "Usage: /connect ", + "timestamp": time.time(), + "type": "system" + }) + return + + try: + await self.node.connect(args) + self.messages.append({ + "content": f"Connected to {args}", + "timestamp": time.time(), + "type": "system" + }) + except Exception as e: + self.messages.append({ + "content": f"Failed to connect to {args}: {e}", + "timestamp": time.time(), + "type": "system" + }) + + async def _handle_file(self, args: str) -> None: + """Handle the /file command.""" + if not args: + self.messages.append({ + "content": "Usage: /file ", + "timestamp": time.time(), + "type": "system" + }) + return + + try: + # Read the file + with open(args, "rb") as f: + file_data = f.read() + + # Share the file + file_id = await self.chat_room.share_file(file_data, os.path.basename(args)) + + self.messages.append({ + "content": f"File shared: {args} (ID: {file_id}, {len(file_data)} bytes)", + "timestamp": time.time(), + "type": "system" + }) + except Exception as e: + self.messages.append({ + "content": f"Failed to share file {args}: {e}", + "timestamp": time.time(), + "type": "system" + }) + + async def _handle_clear(self, args: str) -> None: + """Handle the /clear command.""" + self.messages.clear() + self.messages.append({ + "content": "Message history cleared.", + "timestamp": time.time(), + "type": "system" + }) + + async def _handle_quit(self, args: str) -> None: + """Handle the /quit command.""" + self.messages.append({ + "content": "Exiting application...", + "timestamp": time.time(), + "type": "system" + }) + + if hasattr(self, "live"): + self.live.update(self._render_ui()) + + # Wait a moment for the message to be displayed + await asyncio.sleep(1) + + # Stop the UI + self.running = False + raise asyncio.CancelledError("User quit") \ No newline at end of file