This guide will help you get started with the MatrixOne Python SDK quickly. For detailed configuration options, see :doc:`configuration_guide`.
!DANGER!
🚨 CRITICAL: Column Naming Convention
Always use lowercase with underscores (snake_case) for column names!
MatrixOne does not support SQL standard double-quoted identifiers, causing SELECT queries to fail when using camelCase column names with SQLAlchemy ORM.
# ❌ DON'T: CamelCase - Will fail!
class User(Base):
userName = Column(String(50)) # SELECT will fail
userId = Column(Integer)
# ✅ DO: snake_case - Works perfectly
class User(Base):
user_name = Column(String(50)) # All operations work
user_id = Column(Integer)Problem: CamelCase generates SELECT "userName" ❌ (not supported by MatrixOne)
Solution: snake_case generates SELECT user_name ✅ (works perfectly)
Note
Connection API Requirements
The connect() method requires keyword arguments (not positional):
database- Required, no default valuehost- Default:'localhost'port- Default:6001user- Default:'root'password- Default:'111'
Minimal connection (uses all defaults):
client.connect(database='test')By default, all features (IVF, HNSW, fulltext) are automatically enabled via on_connect=[ConnectionAction.ENABLE_ALL].
from matrixone import Client
from sqlalchemy import Column, Integer, String, DateTime
from matrixone.orm import declarative_base
# Create and connect to MatrixOne
client = Client()
client.connect(
host="127.0.0.1",
port=6001,
user="root",
password="111",
database="test"
)
# Define table model
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(200))
created_at = Column(DateTime)
# Create table using model
client.create_table(User)
# Insert data using insert API
client.insert("users", {
"id": 1,
"name": "John Doe",
"email": "john@example.com",
"created_at": "2024-01-01 10:00:00"
})
# Simple query using execute API
result = client.execute("SELECT * FROM users WHERE id = ?", (1,))
print(result.fetchall())
# Complex query using query builder
result = client.query(User).select("*").filter(User.id == 1).execute()
print(result.fetchall())
# Update data using query API
client.query(User).update({"name": "Jane Doe"}).filter(User.id == 1).execute()
# Delete data using query API
client.query(User).filter(User.id == 1).delete()
# Drop table using drop_table API
client.drop_table(User)
client.disconnect()Use client.session() for atomic transactions. All operations within a session succeed or fail together with automatic commit/rollback.
from matrixone import Client
from sqlalchemy import select, insert, update, delete
from sqlalchemy import Column, Integer, String
from matrixone.orm import declarative_base
client = Client()
client.connect(database='test')
# Define model
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(255))
age = Column(Integer)
status = Column(String(20))
# Create table
client.create_table(User)
# Basic transaction with automatic commit/rollback
with client.session() as session:
# All operations are atomic
session.execute(insert(User).values(name='Alice', email='alice@example.com', age=30))
session.execute(update(User).where(User.age < 18).values(status='minor'))
# Query within transaction
stmt = select(User).where(User.age > 25)
result = session.execute(stmt)
users = result.scalars().all()
# Commits automatically on success, rolls back on error
# Error handling with automatic rollback
try:
with client.session() as session:
session.execute(insert(User).values(name='Bob', age=25))
# This will fail and trigger automatic rollback
session.execute(insert(InvalidTable).values(data='test'))
except Exception as e:
print(f"Transaction failed and rolled back: {e}")
client.disconnect()Key Benefits:
- ✅ Atomic operations - all succeed or fail together
- ✅ Automatic rollback on errors
- ✅ Access to all managers (snapshots, clones, load_data, etc.)
- ✅ Full SQLAlchemy ORM support
If you have existing SQLAlchemy code, you can wrap your sessions to add MatrixOne features without refactoring:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from matrixone import Client
from matrixone.session import Session as MatrixOneSession
# Your existing SQLAlchemy setup
engine = create_engine('mysql+pymysql://root:111@localhost:6001/test')
SessionFactory = sessionmaker(bind=engine)
sqlalchemy_session = SessionFactory()
# Create MatrixOne client
mo_client = Client()
mo_client.connect(host='localhost', port=6001, user='root', password='111', database='test')
# Wrap your existing session with MatrixOne features
mo_session = MatrixOneSession(
client=mo_client,
wrap_session=sqlalchemy_session
)
try:
# Your existing SQLAlchemy operations still work
result = mo_session.execute("SELECT * FROM users")
# Now you can also use MatrixOne-specific features
mo_session.stage.create_s3('backup_stage', bucket='my-backups', path='')
mo_session.snapshots.create('daily_backup', level='database')
mo_session.load_data.read_csv('/data/users.csv', table='users')
mo_session.commit()
finally:
mo_session.close()Perfect For:
- 🔄 Gradual migration from pure SQLAlchemy to MatrixOne
- 🏢 Adding MatrixOne features to existing enterprise applications
- 📦 Legacy code modernization without complete refactoring
- 🔧 Testing MatrixOne features alongside existing code
Async Version:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from matrixone import AsyncClient
from matrixone.session import AsyncSession as MatrixOneAsyncSession
# Existing async SQLAlchemy setup
async_engine = create_async_engine('mysql+aiomysql://root:111@localhost:6001/test')
AsyncSessionFactory = async_sessionmaker(bind=async_engine)
sqlalchemy_async_session = AsyncSessionFactory()
# Create async MatrixOne client
mo_async_client = AsyncClient()
await mo_async_client.connect(host='localhost', port=6001, user='root', password='111', database='test')
# Wrap existing async session
mo_async_session = MatrixOneAsyncSession(
client=mo_async_client,
wrap_session=sqlalchemy_async_session
)
try:
# Standard async SQLAlchemy operations
result = await mo_async_session.execute("SELECT * FROM users")
# MatrixOne async features now available
await mo_async_session.stage.create_local('export_stage', '/exports/')
await mo_async_session.snapshots.create('async_backup', level='database')
await mo_async_session.commit()
finally:
await mo_async_session.close()The SDK provides seamless SQLAlchemy integration with ORM-style operations:
from matrixone import Client
from matrixone.orm import Base, Column, Integer, String
from sqlalchemy import select, insert, update, delete, and_, or_
# Define ORM models
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(255))
age = Column(Integer)
client = Client()
client.connect(database='test')
client.create_table(User)
# ORM-style INSERT
stmt = insert(User).values(name='John', email='john@example.com', age=30)
client.execute(stmt)
# ORM-style SELECT with WHERE
stmt = select(User).where(User.age > 25)
result = client.execute(stmt)
for user in result.scalars():
print(f"User: {user.name}, Age: {user.age}")
# ORM-style SELECT with complex WHERE
stmt = select(User).where(
and_(
User.age > 18,
or_(User.status == 'active', User.status == 'pending')
)
)
result = client.execute(stmt)
# ORM-style UPDATE
stmt = update(User).where(User.id == 1).values(email='newemail@example.com')
result = client.execute(stmt)
print(f"Updated {result.affected_rows} rows")
# ORM-style DELETE
stmt = delete(User).where(User.age < 18)
result = client.execute(stmt)
client.disconnect()Recommended Practices:
- ✅ Use SQLAlchemy statements (
select,insert,update,delete) - ✅ Use
session()for multi-statement transactions - ✅ Use
client.execute()for single-statement operations - ✅ Prefer SQLAlchemy statements over raw SQL strings
Full async/await support with async sessions for non-blocking operations:
import asyncio
from matrixone import AsyncClient
from matrixone.orm import Base, Column, Integer, String, DECIMAL
from sqlalchemy import select, insert, update
async def async_quickstart():
client = AsyncClient()
await client.connect(database='test')
# Define table model
Base = declarative_base()
class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String(200))
price = Column(DECIMAL(10, 2))
category = Column(String(50))
# Create table
await client.create_table(Product)
# ORM-style async INSERT
stmt = insert(Product).values(
id=1, name='Laptop', price=999.99, category='Electronics'
)
await client.execute(stmt)
# ORM-style async SELECT
stmt = select(Product).where(Product.category == 'Electronics')
result = await client.execute(stmt)
for product in result.scalars():
print(f"Product: {product.name}, Price: ${product.price}")
# Async transaction with session
async with client.session() as session:
# All operations are atomic
await session.execute(
insert(Product).values(id=2, name='Phone', price=699.99, category='Electronics')
)
await session.execute(
update(Product).where(Product.id == 1).values(price=899.99)
)
# Concurrent queries with asyncio.gather
product_result, count_result = await asyncio.gather(
session.execute(select(Product).where(Product.category == 'Electronics')),
session.execute(select(func.count(Product.id)))
)
# Commits automatically
# Clean up
await client.drop_table(Product)
await client.disconnect()
asyncio.run(async_quickstart())Async Advantages:
- ✅ Non-blocking operations - don't block the event loop
- ✅ Concurrent execution with
asyncio.gather() - ✅ Perfect for web frameworks (FastAPI, aiohttp)
- ✅ Same transaction guarantees as sync version
from matrixone import Client
from matrixone.config import get_connection_params
from sqlalchemy import Column, Integer, String, Text
from matrixone.orm import declarative_base
from matrixone.sqlalchemy_ext import Vectorf32
# Get connection parameters
host, port, user, password, database = get_connection_params()
client = Client()
client.connect(host=host, port=port, user=user, password=password, database=database)
# Define vector table model
Base = declarative_base()
class Document(Base):
__tablename__ = 'documents'
id = Column(Integer, primary_key=True)
title = Column(String(200))
content = Column(Text)
embedding = Column(Vectorf32(384)) # 384-dimensional vector
# Create table using model
client.create_table(Document)
# ⚠️ Insert initial data BEFORE creating IVF index (recommended)
client.insert("documents", {
"id": 1,
"title": "Introduction to AI",
"content": "Artificial Intelligence is a field of computer science...",
"embedding": [0.1] * 384 # Example 384-dimensional vector
})
# Enable IVF indexing
client.vector_ops.enable_ivf()
# Create IVF index after initial data (better clustering)
client.vector_ops.create_ivf(Document, name="idx_embedding", column="embedding", lists=100)
# IVF supports dynamic updates (can continue inserting)
client.insert("documents", {"id": 2, ...}) # ✅ Works fine
# Vector similarity search using simple interface (first argument is positional)
query_vector = [0.1] * 384
results = client.vector_ops.similarity_search(
Document,
vector_column="embedding",
query_vector=query_vector,
limit=5,
distance_type="l2"
)
print("Similarity search results:", results)
# Complex vector query using query builder
result = client.query("documents").select("*").where(
"l2_distance(embedding, ?) < ?",
(query_vector, 0.5)
).order_by("l2_distance(embedding, ?)", query_vector).limit(10).execute()
for row in result.fetchall():
print(f"Document: {row[1]}, Distance: {row[3]}")
# ⭐ IMPORTANT: Monitor IVF index health for production systems
stats = client.vector_ops.get_ivf_stats(Document, "embedding")
counts = stats['distribution']['centroid_count']
balance_ratio = max(counts) / min(counts) if min(counts) > 0 else float('inf')
print(f"Index health - Centroids: {len(counts)}, Balance ratio: {balance_ratio:.2f}")
if balance_ratio > 2.5:
print("⚠️ Warning: Index needs rebuilding for optimal performance")
# Drop vector index
client.vector_ops.drop(Document, "idx_embedding")
# Clean up
client.drop_table(Document)
client.disconnect()from matrixone import Client
from matrixone.config import get_connection_params
from sqlalchemy import Column, BigInteger, String
from matrixone.orm import declarative_base
from matrixone.sqlalchemy_ext import Vectorf32
# Get connection parameters
host, port, user, password, database = get_connection_params()
client = Client()
client.connect(host=host, port=port, user=user, password=password, database=database)
# Define vector table model
Base = declarative_base()
class Product(Base):
__tablename__ = 'products'
# IMPORTANT: HNSW index requires BigInteger primary key
id = Column(BigInteger, primary_key=True, autoincrement=True)
name = Column(String(200))
features = Column(Vectorf32(128)) # 128-dimensional feature vector
# Create table using model
client.create_table(Product)
# Insert vector data using client API
client.insert(Product, {
"name": "Smartphone",
"features": [0.2] * 128 # Example 128-dimensional vector
})
# Enable HNSW indexing
client.vector_ops.enable_hnsw()
# Create HNSW index (first argument is positional)
client.vector_ops.create_hnsw(Product, name="idx_features", column="features", m=16, ef_construction=200)
# Vector similarity search (first argument is positional)
query_vector = [0.2] * 128
results = client.vector_ops.similarity_search(
Product,
vector_column="features",
query_vector=query_vector,
limit=5,
distance_type="cosine"
)
print("HNSW similarity search results:", results)
# Drop vector index
client.vector_ops.drop(Product, "idx_features")
# Clean up
client.drop_table(Product)
client.disconnect()from sqlalchemy import Column, Integer, String, DECIMAL, DateTime
from matrixone.orm import declarative_base
from matrixone import Client
from matrixone.config import get_connection_params
# Define ORM models
Base = declarative_base()
class Account(Base):
__tablename__ = 'accounts'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False)
balance = Column(DECIMAL(10, 2), nullable=False)
created_at = Column(DateTime, nullable=False)
# Get connection and create client
host, port, user, password, database = get_connection_params()
client = Client()
client.connect(host=host, port=port, user=user, password=password, database=database)
# Create table using ORM model
client.create_table(Account)
# Insert data using client API
accounts_data = [
{"name": "Alice", "balance": 1000.00, "created_at": "2024-01-01 10:00:00"},
{"name": "Bob", "balance": 500.00, "created_at": "2024-01-01 10:00:00"}
]
client.batch_insert(Account, accounts_data)
# Query using client API
accounts = client.query(Account).filter(Account.balance > 600).all()
for account in accounts:
print(f"{account.name}: ${account.balance}")
# Update using ORM
session.query(Account).filter(Account.name == "Alice").update({"balance": 1200.00})
session.commit()
# Clean up using ORM
client.drop_table(Account)
session.close()
client.disconnect()from matrixone import Client
from matrixone.config import get_connection_params
from matrixone.sqlalchemy_ext import create_vector_column
import numpy as np
# Get connection parameters
host, port, user, password, database = get_connection_params()
client = Client()
client.connect(host=host, port=port, user=user, password=password, database=database)
# Create vector table using create_table API
client.create_table("documents", {
"id": "int",
"title": "varchar(200)",
"content": "text",
"embedding": "vecf32(384)" # 384-dimensional f32 vector
}, primary_key="id")
# Create vector index using vector_ops API
client.vector_ops.enable_ivf()
client.vector_ops.create_ivf(
table_name_or_model="documents",
name="idx_embedding",
column="embedding",
lists=50,
op_type="vector_l2_ops"
)
# Insert documents with embeddings using insert API
documents = [
{
"id": 1,
"title": "AI Research",
"content": "Artificial intelligence research paper",
"embedding": np.random.rand(384).astype(np.float32).tolist()
},
{
"id": 2,
"title": "ML Guide",
"content": "Machine learning tutorial",
"embedding": np.random.rand(384).astype(np.float32).tolist()
}
]
for doc in documents:
client.insert("documents", doc)
# Vector similarity search using vector_query API
query_vector = np.random.rand(384).astype(np.float32).tolist()
results = client.vector_ops.similarity_search(
table_name_or_model="documents",
vector_column="embedding",
query_vector=query_vector,
limit=5,
distance_type="l2"
)
print("Vector Search Results:")
for result in results.rows:
print(f"Document: {result[1]} (Distance: {result[-1]:.4f})")
# ⭐ CRITICAL: Check IVF index health - Essential for production monitoring
stats = client.vector_ops.get_ivf_stats("documents", "embedding")
# Analyze index balance
distribution = stats['distribution']
counts = distribution['centroid_count']
total_centroids = len(counts)
total_vectors = sum(counts)
balance_ratio = max(counts) / min(counts) if min(counts) > 0 else float('inf')
print(f"\nIVF Index Health:")
print(f" - Total centroids: {total_centroids}")
print(f" - Total vectors: {total_vectors}")
print(f" - Balance ratio: {balance_ratio:.2f} {'✓' if balance_ratio <= 2.5 else '⚠️'}")
if balance_ratio > 2.5:
print(f" - ⚠️ Index imbalanced - consider rebuilding")
print(f" - See vector_guide for detailed monitoring procedures")
# Clean up using drop_table API
client.drop_table("documents")
client.disconnect()import asyncio
from matrixone import AsyncClient
from matrixone.config import get_connection_params
import numpy as np
async def async_vector_example():
# Get connection parameters
host, port, user, password, database = get_connection_params()
client = AsyncClient()
await client.connect(host=host, port=port, user=user, password=password, database=database)
# Create vector table using async create_table API
await client.create_table("products", {
"id": "int",
"name": "varchar(200)",
"description": "text",
"features": "vecf64(512)" # 512-dimensional f64 vector
}, primary_key="id")
# Create vector index using async vector_ops API
await client.vector_ops.enable_ivf()
await client.vector_ops.create_ivf(
table_name_or_model="products",
name="idx_features",
column="features",
lists=100,
op_type="vector_cosine_ops"
)
# Insert products with feature vectors using async insert API
products = [
{
"id": 1,
"name": "Smartphone",
"description": "Latest smartphone with AI features",
"features": np.random.rand(512).astype(np.float64).tolist()
},
{
"id": 2,
"name": "Laptop",
"description": "High-performance laptop for professionals",
"features": np.random.rand(512).astype(np.float64).tolist()
}
]
for product in products:
await client.insert("products", product)
# Vector similarity search using async vector_query API
query_vector = np.random.rand(512).astype(np.float64).tolist()
results = await client.vector_ops.similarity_search(
table_name_or_model="products",
vector_column="features",
query_vector=query_vector,
limit=3,
distance_type="cosine"
)
print("Async Vector Search Results:")
for result in results.rows:
print(f"Product: {result[1]} (Similarity: {1 - result[-1]:.4f})")
# ⭐ Monitor IVF index health asynchronously
stats = await client.vector_ops.get_ivf_stats("products", "features")
counts = stats['distribution']['centroid_count']
balance_ratio = max(counts) / min(counts) if min(counts) > 0 else float('inf')
print(f"\nAsync IVF Index Health:")
print(f" - Centroids: {len(counts)}, Balance: {balance_ratio:.2f}")
# Clean up using async drop_table API
await client.drop_table("products")
await client.disconnect()
asyncio.run(async_vector_example())from matrixone import Client
from matrixone.config import get_connection_params
def transaction_example():
host, port, user, password, database = get_connection_params()
client = Client()
client.connect(host=host, port=port, user=user, password=password, database=database)
# Create table using create_table API
client.create_table("orders", {
"id": "int",
"customer_id": "int",
"amount": "decimal(10,2)",
"status": "varchar(20)"
}, primary_key="id")
# Use transaction for atomic operations
with client.transaction() as tx:
# Insert order
tx.insert("orders", {
"id": 1,
"customer_id": 100,
"amount": 99.99,
"status": "pending"
})
# Update order status
tx.query("orders").update({"status": "confirmed"}).where("id = ?", 1).execute()
# If any operation fails, the entire transaction is rolled back
# Verify the transaction
result = client.query("orders").select("*").where("id = ?", 1).execute()
print("Order after transaction:", result.fetchall())
# Clean up
client.drop_table("orders")
client.disconnect()
transaction_example()from matrixone import Client
from matrixone.exceptions import ConnectionError, QueryError
from matrixone.config import get_connection_params
def robust_example():
client = None
try:
host, port, user, password, database = get_connection_params()
# Create client with error handling
client = Client()
client.connect(host=host, port=port, user=user, password=password, database=database)
# Create table with error handling
try:
client.create_table("test_table", {
"id": "int",
"name": "varchar(100)"
}, primary_key="id")
print("✓ Table created successfully")
except QueryError as e:
print(f"❌ Table creation failed: {e}")
# Insert data with error handling
try:
client.insert("test_table", {"id": 1, "name": "Test"})
print("✓ Data inserted successfully")
except QueryError as e:
print(f"❌ Data insertion failed: {e}")
# Query data with error handling
try:
result = client.query("test_table").select("*").execute()
print(f"✓ Query successful: {result.fetchall()}")
except QueryError as e:
print(f"❌ Query failed: {e}")
except ConnectionError as e:
print(f"❌ Connection failed: {e}")
except Exception as e:
print(f"❌ Unexpected error: {e}")
finally:
# Always clean up
if client:
try:
client.drop_table("test_table")
client.disconnect()
print("✓ Cleanup completed")
except Exception as e:
print(f"⚠️ Cleanup warning: {e}")
robust_example()from matrixone import Client
from matrixone.config import get_connection_params, print_config
def configuration_example():
# Use environment variables for configuration
# Set these in your environment:
# export MATRIXONE_HOST=127.0.0.1
# export MATRIXONE_PORT=6001
# export MATRIXONE_USER=root
# export MATRIXONE_PASSWORD=111
# export MATRIXONE_DATABASE=test
# Print current configuration
print_config()
# Get connection parameters from environment
host, port, user, password, database = get_connection_params()
# Create client with optimized settings
client = Client(
connection_timeout=30, # Connection timeout in seconds
query_timeout=300, # Query timeout in seconds
auto_commit=True, # Enable auto-commit for better performance
charset='utf8mb4', # Support for international characters
sql_log_mode='simple', # Simple SQL logging for production
slow_query_threshold=1.0 # Alert on queries > 1s
)
client.connect(host=host, port=port, user=user, password=password, database=database)
# Check backend capabilities
version = client.get_backend_version()
print(f"✓ Connected to MatrixOne {version}")
if client.is_feature_available('vector_search'):
print("✓ Vector search is available")
if client.is_feature_available('fulltext_search'):
print("✓ Fulltext search is available")
client.disconnect()
configuration_example()- Read the :doc:`api/index` for detailed API documentation
- Check out the :doc:`vector_guide` for comprehensive vector operations
- Explore :doc:`fulltext_guide` for text search capabilities
- Learn about :doc:`orm_guide` for ORM patterns
- Check out the :doc:`examples` for comprehensive usage examples
- Learn about :doc:`contributing` to contribute to the project
- Run
make examplesto test all examples with your MatrixOne setup - Use
make testto run the test suite and verify your setup