This guide covers publish/subscribe operations in the MatrixOne Python SDK, enabling real-time messaging and event-driven architectures.
MatrixOne's Pub/Sub system provides:
- Real-time Messaging: Publish and subscribe to topics for real-time communication
- Event-driven Architecture: Build reactive applications with event-based patterns
- Topic Management: Create, list, and manage topics
- Message Filtering: Subscribe to specific message patterns
- Async Support: Full async/await support for high-performance applications
- Durability: Reliable message delivery with persistence options
- Transaction Support: Atomic pub/sub operations with
session()
Use client.session() for atomic publication and subscription setup:
from matrixone import Client
from sqlalchemy import insert, select
client = Client()
client.connect(database='test')
# Atomic publication creation
with client.session() as session:
# Create database publication
publication = session.pubsub.create_database_publication(
name='analytics_pub',
database='analytics',
account='subscriber_account'
)
# Insert initial data in same transaction
session.execute(
insert(AnalyticsData).values(
metric='users',
value=1000,
timestamp=func.now()
)
)
# Both operations commit together
client.disconnect()from matrixone import Client
from sqlalchemy import select, insert, func
client = Client()
client.connect(database='test')
# Complex pub/sub setup with verification
with client.session() as session:
# Create table publication
publication = session.pubsub.create_table_publication(
name='users_pub',
database='production',
table='users',
account='analytics_account'
)
# Verify table exists and has data
stmt = select(func.count()).select_from(text('production.users'))
count = session.execute(stmt).scalar()
if count == 0:
raise Exception("Cannot publish empty table")
print(f"Publishing table with {count} rows")
# Create subscription on subscriber side
subscription = session.pubsub.create_subscription(
name='users_sub',
publication_name='users_pub',
publication_account='production_account'
)
# All operations are atomic
client.disconnect()from matrixone import Client
from sqlalchemy import insert, select
client = Client()
client.connect(database='test')
# Share data atomically
with client.session() as session:
# Insert data
session.execute(
insert(SharedData).values(
name='Dataset A',
data='Important data',
created_at=func.now()
)
)
# Create publication
publication = session.pubsub.create_database_publication(
name='shared_data_pub',
database='shared',
account='partner_account'
)
# Verify publication created
pubs = session.pubsub.list_publications()
if not any(p.name == 'shared_data_pub' for p in pubs):
raise Exception("Publication creation failed")
# Commits all operations together
client.disconnect()from matrixone import Client
from matrixone.config import get_connection_params
# Connect to MatrixOne
connection_params = get_connection_params()
client = Client(*connection_params)
client.connect(*connection_params)
# Get Pub/Sub manager
pubsub = client.pubsub# Create a topic
topic_name = "user_events"
topic = pubsub.create_topic(topic_name)
print(f"Created topic: {topic.name}")
# Create topic with configuration
topic = pubsub.create_topic(
name="system_logs",
description="System logging events",
retention_hours=24
)# List all topics
topics = pubsub.list_topics()
for topic in topics:
print(f"Topic: {topic.name}, Subscribers: {topic.subscriber_count}")
# Get specific topic
topic = pubsub.get_topic("user_events")
if topic:
print(f"Topic exists: {topic.name}")# Delete a topic
pubsub.delete_topic("user_events")
print("Topic deleted")# Publish a simple message
message = "User logged in: john_doe"
pubsub.publish("user_events", message)
print("Message published")
# Publish with metadata
message_data = {
"event": "user_login",
"user_id": "john_doe",
"timestamp": "2024-01-15T10:30:00Z",
"ip_address": "192.168.1.100"
}
pubsub.publish("user_events", message_data)
print("Structured message published")# Publish multiple messages
messages = [
{"event": "user_login", "user_id": "alice"},
{"event": "user_logout", "user_id": "bob"},
{"event": "user_register", "user_id": "charlie"}
]
for message in messages:
pubsub.publish("user_events", message)
print(f"Published {len(messages)} messages")# Subscribe to a topic
def message_handler(message):
print(f"Received: {message.data}")
print(f"From topic: {message.topic}")
print(f"Timestamp: {message.timestamp}")
subscription = pubsub.subscribe("user_events", message_handler)
print("Subscribed to user_events")
# Keep subscription active
import time
time.sleep(10) # Listen for 10 seconds
# Unsubscribe
subscription.unsubscribe()# Subscribe with message filtering
def login_handler(message):
if message.data.get("event") == "user_login":
print(f"User login: {message.data.get('user_id')}")
subscription = pubsub.subscribe(
"user_events",
login_handler,
filter={"event": "user_login"}
)
# Subscribe to multiple event types
def user_activity_handler(message):
event = message.data.get("event")
user_id = message.data.get("user_id")
print(f"User activity: {user_id} - {event}")
subscription = pubsub.subscribe(
"user_events",
user_activity_handler,
filter={
"event": {"$in": ["user_login", "user_logout", "user_register"]}
}
)import asyncio
from matrixone import AsyncClient
async def async_publishing():
# Connect asynchronously
connection_params = get_connection_params()
async_client = AsyncClient(*connection_params)
await async_client.connect(*connection_params)
# Get async Pub/Sub manager
pubsub = async_client.pubsub
# Async publish
await pubsub.publish_async("user_events", {
"event": "async_user_login",
"user_id": "async_user"
})
await async_client.disconnect()
# Run async publishing
asyncio.run(async_publishing())async def async_subscription():
connection_params = get_connection_params()
async_client = AsyncClient(*connection_params)
await async_client.connect(*connection_params)
pubsub = async_client.pubsub
async def async_message_handler(message):
print(f"Async received: {message.data}")
# Async subscribe
subscription = await pubsub.subscribe_async(
"user_events",
async_message_handler
)
# Keep subscription active
await asyncio.sleep(10)
# Unsubscribe
await subscription.unsubscribe_async()
await async_client.disconnect()
asyncio.run(async_subscription())class UserEventSystem:
def __init__(self):
self.client = Client(*get_connection_params())
self.client.connect(*get_connection_params())
self.pubsub = self.client.pubsub
self.setup_subscriptions()
def setup_subscriptions(self):
# Subscribe to user events
self.pubsub.subscribe("user_events", self.handle_user_event)
# Subscribe to system events
self.pubsub.subscribe("system_events", self.handle_system_event)
def handle_user_event(self, message):
event_data = message.data
event_type = event_data.get("event")
if event_type == "user_login":
self.on_user_login(event_data)
elif event_type == "user_logout":
self.on_user_logout(event_data)
elif event_type == "user_register":
self.on_user_register(event_data)
def on_user_login(self, data):
user_id = data.get("user_id")
print(f"User {user_id} logged in")
# Update user status, send notifications, etc.
def on_user_logout(self, data):
user_id = data.get("user_id")
print(f"User {user_id} logged out")
# Clean up sessions, update statistics, etc.
def on_user_register(self, data):
user_id = data.get("user_id")
print(f"New user registered: {user_id}")
# Send welcome email, create user profile, etc.
def publish_user_event(self, event_type, user_id, metadata=None):
event_data = {
"event": event_type,
"user_id": user_id,
"timestamp": datetime.now().isoformat(),
**(metadata or {})
}
self.pubsub.publish("user_events", event_data)
def handle_system_event(self, message):
# Handle system-level events
print(f"System event: {message.data}")
# Usage
user_system = UserEventSystem()
user_system.publish_user_event("user_login", "john_doe", {
"ip_address": "192.168.1.100",
"user_agent": "Mozilla/5.0..."
})class MicroserviceA:
def __init__(self):
self.client = Client(*get_connection_params())
self.client.connect(*get_connection_params())
self.pubsub = self.client.pubsub
self.setup_communication()
def setup_communication(self):
# Subscribe to requests from other services
self.pubsub.subscribe("service_a_requests", self.handle_request)
# Subscribe to responses
self.pubsub.subscribe("service_a_responses", self.handle_response)
def handle_request(self, message):
request_data = message.data
request_id = request_data.get("request_id")
# Process request
result = self.process_request(request_data)
# Publish response
self.pubsub.publish("service_b_responses", {
"request_id": request_id,
"result": result,
"status": "success"
})
def process_request(self, data):
# Business logic here
return {"processed": True, "data": data}
def handle_response(self, message):
# Handle responses from other services
print(f"Received response: {message.data}")
class MicroserviceB:
def __init__(self):
self.client = Client(*get_connection_params())
self.client.connect(*get_connection_params())
self.pubsub = self.client.pubsub
def send_request(self, request_data):
request_id = f"req_{int(time.time())}"
# Publish request
self.pubsub.publish("service_a_requests", {
"request_id": request_id,
**request_data
})
# Subscribe to response
response_received = False
response_data = None
def response_handler(message):
nonlocal response_received, response_data
if message.data.get("request_id") == request_id:
response_data = message.data
response_received = True
self.pubsub.subscribe("service_b_responses", response_handler)
# Wait for response (with timeout)
timeout = 10 # seconds
start_time = time.time()
while not response_received and (time.time() - start_time) < timeout:
time.sleep(0.1)
return response_dataclass RealTimeAnalytics:
def __init__(self):
self.client = Client(*get_connection_params())
self.client.connect(*get_connection_params())
self.pubsub = self.client.pubsub
self.metrics = {}
self.setup_analytics()
def setup_analytics(self):
# Subscribe to various event streams
self.pubsub.subscribe("user_events", self.track_user_metrics)
self.pubsub.subscribe("system_events", self.track_system_metrics)
self.pubsub.subscribe("business_events", self.track_business_metrics)
def track_user_metrics(self, message):
event_data = message.data
event_type = event_data.get("event")
# Update user metrics
if event_type not in self.metrics:
self.metrics[event_type] = 0
self.metrics[event_type] += 1
# Real-time dashboard updates
self.update_dashboard()
def track_system_metrics(self, message):
# Track system performance metrics
print(f"System metric: {message.data}")
def track_business_metrics(self, message):
# Track business KPIs
print(f"Business metric: {message.data}")
def update_dashboard(self):
# Send metrics to dashboard
self.pubsub.publish("dashboard_updates", {
"metrics": self.metrics,
"timestamp": datetime.now().isoformat()
})
def get_metrics(self):
return self.metricsRobust error handling for production applications:
from matrixone.exceptions import PubSubError, ConnectionError
try:
# Pub/Sub operations
pubsub.publish("user_events", {"event": "test"})
except PubSubError as e:
print(f"Pub/Sub error: {e}")
except ConnectionError as e:
print(f"Connection error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
# Retry mechanism for failed publishes
def publish_with_retry(pubsub, topic, message, max_retries=3):
for attempt in range(max_retries):
try:
pubsub.publish(topic, message)
return True
except Exception as e:
print(f"Publish attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
return FalseBest practices for optimal performance:
# Batch message publishing
def batch_publish(pubsub, topic, messages, batch_size=100):
for i in range(0, len(messages), batch_size):
batch = messages[i:i + batch_size]
for message in batch:
pubsub.publish(topic, message)
# Efficient message filtering
def efficient_subscription(pubsub, topic, handler, filters=None):
# Use specific filters to reduce message processing
return pubsub.subscribe(topic, handler, filter=filters)
# Connection pooling for high-throughput applications
class PubSubService:
def __init__(self):
self.client = Client(*get_connection_params())
self.client.connect(*get_connection_params())
self.pubsub = self.client.pubsub
self.lock = threading.Lock()
def thread_safe_publish(self, topic, message):
with self.lock:
return self.pubsub.publish(topic, message)Common issues and solutions:
- Message not received
- Verify topic name and subscription setup
- Check message filters and format
- Ensure subscription is active
- Performance issues
- Use batch operations for large message volumes
- Optimize message filtering
- Consider async operations for high-throughput scenarios
- Connection issues
- Verify MatrixOne server is running
- Check connection parameters
- Ensure proper network connectivity
- Message ordering
- Messages may not arrive in exact publish order
- Use timestamps for ordering if needed
- Consider message sequencing for critical applications
For more information, see the :doc:`api/client` and :doc:`best_practices`.