Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 275 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,23 @@

**fastpubsub** is a lightweight publish-subscribe messaging system built with FastAPI and PostgreSQL. It provides a simple HTTP API for message publishing and subscription management with powerful features like message filtering, delivery guarantees, dead-letter queues, and automatic retries with exponential backoff. The system is built with asyncio for efficient concurrent operations and uses SQLAlchemy's async engine with psycopg's native async support.

### 🎯 What is fastpubsub?

fastpubsub is **not** intended to replace production-grade messaging systems like Google Cloud Pub/Sub, NATS, or Apache Kafka. Instead, it brings key pub/sub features to **simple architectures** where you already have PostgreSQL available. If you're running a small to medium-sized application with PostgreSQL as your primary database, fastpubsub lets you add reliable messaging capabilities without introducing additional infrastructure complexity.

**Use fastpubsub when you:**
- Already use PostgreSQL and want to avoid managing separate message brokers
- Need pub/sub functionality for small to medium workloads
- Prefer simplicity over maximum throughput
- Want a single database for both application data and messaging
- Need reliable message delivery with retries and dead-letter queues

**Consider dedicated message brokers when you:**
- Need to handle millions of messages per second
- Require horizontal scalability across multiple datacenters
- Need advanced features like message streaming or complex routing
- Want to decouple your messaging infrastructure from your database

### ✨ Key Features

- 🎯 **Topic-based messaging** - Organize messages by topics
Expand All @@ -27,6 +44,17 @@ fastpubsub uses PostgreSQL as its backend, leveraging stored procedures and JSON
- **Database**: PostgreSQL with custom functions for message management, accessed via async SQLAlchemy
- **Cleanup Workers**: Background jobs for message maintenance

## 🔄 Message Flow

1. **Publish**: Messages are published to a topic
2. **Route**: Messages are routed to all subscriptions for that topic
3. **Filter**: Subscriptions with filters only receive matching messages
4. **Consume**: Consumers fetch messages in batches
5. **Process**: Consumer processes the message
6. **ACK/NACK**: Consumer acknowledges success or requests retry
7. **Retry**: Failed messages are retried with exponential backoff
8. **DLQ**: Messages exceeding max attempts move to the dead letter queue

## 🐳 Quick Start with Docker

All commands use the official Docker image from [Docker Hub](https://hub.docker.com/r/allisson/fastpubsub).
Expand Down Expand Up @@ -945,16 +973,254 @@ scrape_configs:
scrape_interval: 15s
```

## 🔄 Message Flow
### Example 9: Multiple Consumers Processing the Same Subscription

1. **Publish**: Messages are published to a topic
2. **Route**: Messages are routed to all subscriptions for that topic
3. **Filter**: Subscriptions with filters only receive matching messages
4. **Consume**: Consumers fetch messages in batches
5. **Process**: Consumer processes the message
6. **ACK/NACK**: Consumer acknowledges success or requests retry
7. **Retry**: Failed messages are retried with exponential backoff
8. **DLQ**: Messages exceeding max attempts move to the dead letter queue
Running multiple consumers in parallel for the same subscription helps process messages faster:

```bash
# Terminal 1: Start consumer worker 1
while true; do
MESSAGES=$(curl -s "http://localhost:8000/subscriptions/email-sender/messages?consumer_id=worker-1&batch_size=10")
echo "$MESSAGES" | jq -r '.data[].id' | while read -r msg_id; do
# Process message here
echo "Worker 1 processing: $msg_id"
# Acknowledge after processing
curl -X POST http://localhost:8000/subscriptions/email-sender/acks \
-H "Content-Type: application/json" \
-d "[\"$msg_id\"]"
done
sleep 1
done

# Terminal 2: Start consumer worker 2
while true; do
MESSAGES=$(curl -s "http://localhost:8000/subscriptions/email-sender/messages?consumer_id=worker-2&batch_size=10")
echo "$MESSAGES" | jq -r '.data[].id' | while read -r msg_id; do
echo "Worker 2 processing: $msg_id"
curl -X POST http://localhost:8000/subscriptions/email-sender/acks \
-H "Content-Type: application/json" \
-d "[\"$msg_id\"]"
done
sleep 1
done
```

Each consumer uses a unique `consumer_id` to identify itself. Messages are locked to prevent duplicate processing across consumers.

### Example 10: Error Handling and Retry Pattern

Proper error handling ensures reliable message processing:

```python
import asyncio
import httpx
import logging
from typing import Dict, Any

logger = logging.getLogger(__name__)


# Define custom exceptions (implement based on your business logic)
class RetriableError(Exception):
"""Temporary error that should be retried"""
pass


class PermanentError(Exception):
"""Permanent error that should not be retried"""
pass


async def process_single_message(payload: Dict[str, Any]):
"""
Process a single message - implement your business logic here.

Raise RetriableError for temporary failures (network issues, service unavailable).
Raise PermanentError for permanent failures (invalid data, business rule violation).
"""
# Example implementation
if not payload.get("email"):
raise PermanentError("Missing required field: email")

try:
# Your actual processing logic here
# For example: send email, update database, call external API, etc.
logger.info(f"Processing message: {payload}")
except ConnectionError:
# Temporary network issue - retry later
raise RetriableError("Network connection failed")


async def process_messages(subscription_id: str, consumer_id: str):
"""Consumer implementation with proper error handling"""
base_url = "http://localhost:8000"

async with httpx.AsyncClient() as client:
while True:
try:
# Fetch messages
response = await client.get(
f"{base_url}/subscriptions/{subscription_id}/messages",
params={"consumer_id": consumer_id, "batch_size": 10},
timeout=30.0
)
response.raise_for_status()
messages = response.json()["data"]

if not messages:
await asyncio.sleep(1)
continue

# Process each message
ack_ids = []
nack_ids = []

for message in messages:
msg_id = message["id"]
try:
await process_single_message(message["payload"])
ack_ids.append(msg_id)
logger.info(f"Successfully processed message {msg_id}")
except RetriableError as e:
# Temporary error - retry later
nack_ids.append(msg_id)
logger.warning(f"Retriable error for {msg_id}: {e}")
except PermanentError as e:
# Permanent error - ack to prevent retries
ack_ids.append(msg_id)
logger.error(f"Permanent error for {msg_id}: {e}")

# Acknowledge successful/permanent-error messages
if ack_ids:
await client.post(
f"{base_url}/subscriptions/{subscription_id}/acks",
json=ack_ids,
timeout=10.0
)

# NACK retriable errors for retry with backoff
if nack_ids:
await client.post(
f"{base_url}/subscriptions/{subscription_id}/nacks",
json=nack_ids,
timeout=10.0
)

except httpx.HTTPError as e:
logger.error(f"HTTP error: {e}")
await asyncio.sleep(5)
except Exception as e:
logger.error(f"Unexpected error: {e}")
await asyncio.sleep(5)


if __name__ == "__main__":
# Run the consumer
asyncio.run(process_messages("email-sender", "worker-1"))
```

**Best practices shown:**
- Distinguish between retriable and permanent errors
- ACK permanent errors to prevent infinite retries
- NACK retriable errors to trigger exponential backoff
- Use timeouts to prevent hanging
- Log processing status for debugging

### Example 11: Monitoring and Alerting Setup

Set up monitoring to track system health:

```bash
# Create a monitoring script (monitor.sh)
#!/bin/bash
SUBSCRIPTION_ID="email-sender"
API_URL="http://localhost:8000"

# Get metrics
METRICS=$(curl -s "$API_URL/subscriptions/$SUBSCRIPTION_ID/metrics")

AVAILABLE=$(echo "$METRICS" | jq -r '.available')
DELIVERED=$(echo "$METRICS" | jq -r '.delivered')
DLQ=$(echo "$METRICS" | jq -r '.dlq')

echo "Subscription: $SUBSCRIPTION_ID"
echo "Available messages: $AVAILABLE"
echo "Delivered messages: $DELIVERED"
echo "DLQ messages: $DLQ"

# Alert if DLQ has messages
if [ "$DLQ" -gt 0 ]; then
echo "⚠️ WARNING: $DLQ messages in dead letter queue!"
# Send alert (e.g., to Slack, PagerDuty, etc.)
# curl -X POST YOUR_WEBHOOK_URL -d "DLQ has $DLQ messages"
fi

# Alert if messages are piling up
if [ "$AVAILABLE" -gt 1000 ]; then
echo "⚠️ WARNING: $AVAILABLE messages waiting (consumer may be slow)"
fi

# Alert if too many messages are stuck in delivered state
if [ "$DELIVERED" -gt 100 ]; then
echo "⚠️ WARNING: $DELIVERED messages in delivered state (possible consumer crash)"
fi
```

### Example 12: Topic Fan-out Pattern

One message published to multiple subscriptions for different purposes:

```bash
# 1. Create a topic for order events
curl -X POST http://localhost:8000/topics \
-H "Content-Type: application/json" \
-d '{"id": "orders"}'

# 2. Create multiple subscriptions for different purposes
# Subscription for sending emails
curl -X POST http://localhost:8000/subscriptions \
-H "Content-Type: application/json" \
-d '{
"id": "order-emails",
"topic_id": "orders"
}'

# Subscription for updating inventory
curl -X POST http://localhost:8000/subscriptions \
-H "Content-Type: application/json" \
-d '{
"id": "order-inventory",
"topic_id": "orders"
}'

# Subscription for analytics (only completed orders)
curl -X POST http://localhost:8000/subscriptions \
-H "Content-Type: application/json" \
-d '{
"id": "order-analytics",
"topic_id": "orders",
"filter": {"status": ["completed"]}
}'

# 3. Publish an order event
curl -X POST http://localhost:8000/topics/orders/messages \
-H "Content-Type: application/json" \
-d '[
{
"order_id": "12345",
"customer_email": "[email protected]",
"status": "completed",
"total": 99.99
}
]'

# Result: All three subscriptions receive the message
# - order-emails: Sends confirmation email
# - order-inventory: Updates stock levels
# - order-analytics: Records completed order for analytics
```

This pattern allows you to decouple different parts of your system while maintaining a single source of truth for events.

## 🎯 Best Practices

Expand Down