diff --git a/README.md b/README.md index 06ff24f..4f46333 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,23 @@ **fastpubsub** is a lightweight publish-subscribe messaging system built with FastAPI and PostgreSQL. It provides a simple HTTP API for message publishing and subscription management with powerful features like message filtering, delivery guarantees, dead-letter queues, and automatic retries with exponential backoff. The system is built with asyncio for efficient concurrent operations and uses SQLAlchemy's async engine with psycopg's native async support. +### 🎯 What is fastpubsub? + +fastpubsub is **not** intended to replace production-grade messaging systems like Google Cloud Pub/Sub, NATS, or Apache Kafka. Instead, it brings key pub/sub features to **simple architectures** where you already have PostgreSQL available. If you're running a small to medium-sized application with PostgreSQL as your primary database, fastpubsub lets you add reliable messaging capabilities without introducing additional infrastructure complexity. + +**Use fastpubsub when you:** +- Already use PostgreSQL and want to avoid managing separate message brokers +- Need pub/sub functionality for small to medium workloads +- Prefer simplicity over maximum throughput +- Want a single database for both application data and messaging +- Need reliable message delivery with retries and dead-letter queues + +**Consider dedicated message brokers when you:** +- Need to handle millions of messages per second +- Require horizontal scalability across multiple datacenters +- Need advanced features like message streaming or complex routing +- Want to decouple your messaging infrastructure from your database + ### ✨ Key Features - 🎯 **Topic-based messaging** - Organize messages by topics @@ -27,6 +44,17 @@ fastpubsub uses PostgreSQL as its backend, leveraging stored procedures and JSON - **Database**: PostgreSQL with custom functions for message management, accessed via async SQLAlchemy - **Cleanup Workers**: Background jobs for message maintenance +## 🔄 Message Flow + +1. **Publish**: Messages are published to a topic +2. **Route**: Messages are routed to all subscriptions for that topic +3. **Filter**: Subscriptions with filters only receive matching messages +4. **Consume**: Consumers fetch messages in batches +5. **Process**: Consumer processes the message +6. **ACK/NACK**: Consumer acknowledges success or requests retry +7. **Retry**: Failed messages are retried with exponential backoff +8. **DLQ**: Messages exceeding max attempts move to the dead letter queue + ## 🐳 Quick Start with Docker All commands use the official Docker image from [Docker Hub](https://hub.docker.com/r/allisson/fastpubsub). @@ -945,16 +973,254 @@ scrape_configs: scrape_interval: 15s ``` -## 🔄 Message Flow +### Example 9: Multiple Consumers Processing the Same Subscription -1. **Publish**: Messages are published to a topic -2. **Route**: Messages are routed to all subscriptions for that topic -3. **Filter**: Subscriptions with filters only receive matching messages -4. **Consume**: Consumers fetch messages in batches -5. **Process**: Consumer processes the message -6. **ACK/NACK**: Consumer acknowledges success or requests retry -7. **Retry**: Failed messages are retried with exponential backoff -8. **DLQ**: Messages exceeding max attempts move to the dead letter queue +Running multiple consumers in parallel for the same subscription helps process messages faster: + +```bash +# Terminal 1: Start consumer worker 1 +while true; do + MESSAGES=$(curl -s "http://localhost:8000/subscriptions/email-sender/messages?consumer_id=worker-1&batch_size=10") + echo "$MESSAGES" | jq -r '.data[].id' | while read -r msg_id; do + # Process message here + echo "Worker 1 processing: $msg_id" + # Acknowledge after processing + curl -X POST http://localhost:8000/subscriptions/email-sender/acks \ + -H "Content-Type: application/json" \ + -d "[\"$msg_id\"]" + done + sleep 1 +done + +# Terminal 2: Start consumer worker 2 +while true; do + MESSAGES=$(curl -s "http://localhost:8000/subscriptions/email-sender/messages?consumer_id=worker-2&batch_size=10") + echo "$MESSAGES" | jq -r '.data[].id' | while read -r msg_id; do + echo "Worker 2 processing: $msg_id" + curl -X POST http://localhost:8000/subscriptions/email-sender/acks \ + -H "Content-Type: application/json" \ + -d "[\"$msg_id\"]" + done + sleep 1 +done +``` + +Each consumer uses a unique `consumer_id` to identify itself. Messages are locked to prevent duplicate processing across consumers. + +### Example 10: Error Handling and Retry Pattern + +Proper error handling ensures reliable message processing: + +```python +import asyncio +import httpx +import logging +from typing import Dict, Any + +logger = logging.getLogger(__name__) + + +# Define custom exceptions (implement based on your business logic) +class RetriableError(Exception): + """Temporary error that should be retried""" + pass + + +class PermanentError(Exception): + """Permanent error that should not be retried""" + pass + + +async def process_single_message(payload: Dict[str, Any]): + """ + Process a single message - implement your business logic here. + + Raise RetriableError for temporary failures (network issues, service unavailable). + Raise PermanentError for permanent failures (invalid data, business rule violation). + """ + # Example implementation + if not payload.get("email"): + raise PermanentError("Missing required field: email") + + try: + # Your actual processing logic here + # For example: send email, update database, call external API, etc. + logger.info(f"Processing message: {payload}") + except ConnectionError: + # Temporary network issue - retry later + raise RetriableError("Network connection failed") + + +async def process_messages(subscription_id: str, consumer_id: str): + """Consumer implementation with proper error handling""" + base_url = "http://localhost:8000" + + async with httpx.AsyncClient() as client: + while True: + try: + # Fetch messages + response = await client.get( + f"{base_url}/subscriptions/{subscription_id}/messages", + params={"consumer_id": consumer_id, "batch_size": 10}, + timeout=30.0 + ) + response.raise_for_status() + messages = response.json()["data"] + + if not messages: + await asyncio.sleep(1) + continue + + # Process each message + ack_ids = [] + nack_ids = [] + + for message in messages: + msg_id = message["id"] + try: + await process_single_message(message["payload"]) + ack_ids.append(msg_id) + logger.info(f"Successfully processed message {msg_id}") + except RetriableError as e: + # Temporary error - retry later + nack_ids.append(msg_id) + logger.warning(f"Retriable error for {msg_id}: {e}") + except PermanentError as e: + # Permanent error - ack to prevent retries + ack_ids.append(msg_id) + logger.error(f"Permanent error for {msg_id}: {e}") + + # Acknowledge successful/permanent-error messages + if ack_ids: + await client.post( + f"{base_url}/subscriptions/{subscription_id}/acks", + json=ack_ids, + timeout=10.0 + ) + + # NACK retriable errors for retry with backoff + if nack_ids: + await client.post( + f"{base_url}/subscriptions/{subscription_id}/nacks", + json=nack_ids, + timeout=10.0 + ) + + except httpx.HTTPError as e: + logger.error(f"HTTP error: {e}") + await asyncio.sleep(5) + except Exception as e: + logger.error(f"Unexpected error: {e}") + await asyncio.sleep(5) + + +if __name__ == "__main__": + # Run the consumer + asyncio.run(process_messages("email-sender", "worker-1")) +``` + +**Best practices shown:** +- Distinguish between retriable and permanent errors +- ACK permanent errors to prevent infinite retries +- NACK retriable errors to trigger exponential backoff +- Use timeouts to prevent hanging +- Log processing status for debugging + +### Example 11: Monitoring and Alerting Setup + +Set up monitoring to track system health: + +```bash +# Create a monitoring script (monitor.sh) +#!/bin/bash +SUBSCRIPTION_ID="email-sender" +API_URL="http://localhost:8000" + +# Get metrics +METRICS=$(curl -s "$API_URL/subscriptions/$SUBSCRIPTION_ID/metrics") + +AVAILABLE=$(echo "$METRICS" | jq -r '.available') +DELIVERED=$(echo "$METRICS" | jq -r '.delivered') +DLQ=$(echo "$METRICS" | jq -r '.dlq') + +echo "Subscription: $SUBSCRIPTION_ID" +echo "Available messages: $AVAILABLE" +echo "Delivered messages: $DELIVERED" +echo "DLQ messages: $DLQ" + +# Alert if DLQ has messages +if [ "$DLQ" -gt 0 ]; then + echo "⚠️ WARNING: $DLQ messages in dead letter queue!" + # Send alert (e.g., to Slack, PagerDuty, etc.) + # curl -X POST YOUR_WEBHOOK_URL -d "DLQ has $DLQ messages" +fi + +# Alert if messages are piling up +if [ "$AVAILABLE" -gt 1000 ]; then + echo "⚠️ WARNING: $AVAILABLE messages waiting (consumer may be slow)" +fi + +# Alert if too many messages are stuck in delivered state +if [ "$DELIVERED" -gt 100 ]; then + echo "⚠️ WARNING: $DELIVERED messages in delivered state (possible consumer crash)" +fi +``` + +### Example 12: Topic Fan-out Pattern + +One message published to multiple subscriptions for different purposes: + +```bash +# 1. Create a topic for order events +curl -X POST http://localhost:8000/topics \ + -H "Content-Type: application/json" \ + -d '{"id": "orders"}' + +# 2. Create multiple subscriptions for different purposes +# Subscription for sending emails +curl -X POST http://localhost:8000/subscriptions \ + -H "Content-Type: application/json" \ + -d '{ + "id": "order-emails", + "topic_id": "orders" + }' + +# Subscription for updating inventory +curl -X POST http://localhost:8000/subscriptions \ + -H "Content-Type: application/json" \ + -d '{ + "id": "order-inventory", + "topic_id": "orders" + }' + +# Subscription for analytics (only completed orders) +curl -X POST http://localhost:8000/subscriptions \ + -H "Content-Type: application/json" \ + -d '{ + "id": "order-analytics", + "topic_id": "orders", + "filter": {"status": ["completed"]} + }' + +# 3. Publish an order event +curl -X POST http://localhost:8000/topics/orders/messages \ + -H "Content-Type: application/json" \ + -d '[ + { + "order_id": "12345", + "customer_email": "customer@example.com", + "status": "completed", + "total": 99.99 + } + ]' + +# Result: All three subscriptions receive the message +# - order-emails: Sends confirmation email +# - order-inventory: Updates stock levels +# - order-analytics: Records completed order for analytics +``` + +This pattern allows you to decouple different parts of your system while maintaining a single source of truth for events. ## 🎯 Best Practices