Skip to content

Latest commit

 

History

History
559 lines (427 loc) · 17.3 KB

File metadata and controls

559 lines (427 loc) · 17.3 KB

MOCTL Command Line Interface Guide

This guide covers the MOCTL command line interface in the MatrixOne Python SDK, providing programmatic access to MatrixOne control operations.

⚠️ CRITICAL REQUIREMENT: All MOCTL operations require 'sys' account privileges. These operations cannot be executed with regular user accounts.

Overview

MOCTL (MatrixOne Control) provides:

  • Table Flushing: Force flush table data to disk
  • Incremental Checkpoint: Generate incremental checkpoints and truncate WAL
  • Global Checkpoint: Generate global checkpoints across all nodes

Prerequisites

Before using MOCTL operations, you must:

  1. Connect with 'sys' account privileges
  2. Ensure proper administrative permissions
  3. Have access to the target database and tables

Connection Examples:

# Method 1: Use sys#root format
client.connect(host, port, 'sys#root', password, database)

# Method 2: Use account parameter
client.connect(host, port, 'root', password, database, account='sys')

Getting Started

Basic Setup

from matrixone import Client
from matrixone.config import get_connection_params

# Connect to MatrixOne with sys account
connection_params = get_connection_params()
client = Client()
client.connect(host, port, 'sys#root', password, database)
# Alternative: client.connect(host, port, 'root', password, database, account='sys')

# Get MOCTL manager
moctl = client.moctl

Core Operations

Table Flushing

Force flush table data to disk:

# Flush a specific table
result = moctl.flush_table("production_db", "users")
print(f"Flush result: {result}")

# Flush multiple tables
tables_to_flush = [
    ("production_db", "users"),
    ("production_db", "orders"),
    ("production_db", "products")
]

for database, table in tables_to_flush:
    result = moctl.flush_table(database, table)
    print(f"Flushed {database}.{table}: {result}")

# Flush with error handling
try:
    result = moctl.flush_table("production_db", "large_table")
    if result.get("result", [{}])[0].get("returnStr") == "OK":
        print("Table flushed successfully")
    else:
        print(f"Flush failed: {result}")
except Exception as e:
    print(f"Flush error: {e}")

Incremental Checkpoint

Generate incremental checkpoint and truncate WAL:

# Force incremental checkpoint
result = moctl.increment_checkpoint()
print(f"Incremental checkpoint result: {result}")

# Checkpoint with validation
try:
    result = moctl.increment_checkpoint()
    if result.get("result", [{}])[0].get("returnStr") == "OK":
        print("Incremental checkpoint completed successfully")
    else:
        print(f"Checkpoint failed: {result}")
except Exception as e:
    print(f"Checkpoint error: {e}")

# Regular checkpoint scheduling
import time
from datetime import datetime

def schedule_checkpoints(interval_minutes=30):
    """Schedule regular incremental checkpoints"""
    while True:
        try:
            result = moctl.increment_checkpoint()
            print(f"{datetime.now()}: Checkpoint completed")
        except Exception as e:
            print(f"{datetime.now()}: Checkpoint failed: {e}")

        time.sleep(interval_minutes * 60)

Global Checkpoint

Generate global checkpoint across all nodes:

# Force global checkpoint
result = moctl.global_checkpoint()
print(f"Global checkpoint result: {result}")

# Global checkpoint with validation
try:
    result = moctl.global_checkpoint()
    if result.get("result", [{}])[0].get("returnStr") == "OK":
        print("Global checkpoint completed successfully")
    else:
        print(f"Global checkpoint failed: {result}")
except Exception as e:
    print(f"Global checkpoint error: {e}")

# Coordinated checkpoint strategy
def coordinated_checkpoint():
    """Perform coordinated checkpoint across cluster"""
    print("Starting coordinated checkpoint...")

    # First, flush critical tables
    critical_tables = [
        ("production_db", "users"),
        ("production_db", "orders"),
        ("production_db", "transactions")
    ]

    for database, table in critical_tables:
        try:
            moctl.flush_table(database, table)
            print(f"Flushed {database}.{table}")
        except Exception as e:
            print(f"Failed to flush {database}.{table}: {e}")

    # Then perform global checkpoint
    try:
        result = moctl.global_checkpoint()
        print("Coordinated checkpoint completed")
        return result
    except Exception as e:
        print(f"Coordinated checkpoint failed: {e}")
        return None

Async Operations

Async MOCTL Operations

Full async/await support for high-performance applications:

import asyncio
from matrixone import AsyncClient

async def async_moctl_operations():
    # Connect asynchronously with sys account
    connection_params = get_connection_params()
    async_client = AsyncClient()
    await async_client.connect(host, port, 'sys#root', password, database)
    # Alternative: await async_client.connect(host, port, 'root', password, database, account='sys')

    # Get async MOCTL manager
    moctl = async_client.moctl

    # Async table flush
    result = await moctl.flush_table("production_db", "users")
    print(f"Async flush result: {result}")

    # Async incremental checkpoint
    result = await moctl.increment_checkpoint()
    print(f"Async incremental checkpoint result: {result}")

    # Async global checkpoint
    result = await moctl.global_checkpoint()
    print(f"Async global checkpoint result: {result}")

    await async_client.disconnect()

# Run async operations
asyncio.run(async_moctl_operations())

Real-world Examples

Database Maintenance System

import time
from datetime import datetime, timedelta

class DatabaseMaintenanceSystem:
    def __init__(self):
        self.client = Client(*get_connection_params())
        self.client.connect(*get_connection_params())
        self.moctl = self.client.moctl

    def perform_maintenance(self):
        """Perform routine database maintenance"""
        print("Starting database maintenance...")

        # 1. Flush critical tables
        self.flush_critical_tables()

        # 2. Perform incremental checkpoint
        self.perform_incremental_checkpoint()

        # 3. Perform global checkpoint
        self.perform_global_checkpoint()

        print("Database maintenance completed")

    def flush_critical_tables(self):
        """Flush critical tables to disk"""
        critical_tables = [
            ("production_db", "users"),
            ("production_db", "orders"),
            ("production_db", "products"),
            ("analytics_db", "metrics"),
            ("analytics_db", "events")
        ]

        for database, table in critical_tables:
            try:
                result = self.moctl.flush_table(database, table)
                print(f"Flushed {database}.{table}")
            except Exception as e:
                print(f"Failed to flush {database}.{table}: {e}")

    def perform_incremental_checkpoint(self):
        """Perform incremental checkpoint"""
        try:
            result = self.moctl.increment_checkpoint()
            print("Incremental checkpoint completed")
        except Exception as e:
            print(f"Incremental checkpoint failed: {e}")

    def perform_global_checkpoint(self):
        """Perform global checkpoint"""
        try:
            result = self.moctl.global_checkpoint()
            print("Global checkpoint completed")
        except Exception as e:
            print(f"Global checkpoint failed: {e}")

    def schedule_maintenance(self, interval_hours=6):
        """Schedule regular maintenance"""
        while True:
            self.perform_maintenance()
            time.sleep(interval_hours * 3600)

# Usage
maintenance = DatabaseMaintenanceSystem()
maintenance.perform_maintenance()

Backup Preparation System

class BackupPreparationSystem:
    def __init__(self):
        self.client = Client(*get_connection_params())
        self.client.connect(*get_connection_params())
        self.moctl = self.client.moctl

    def prepare_for_backup(self):
        """Prepare database for backup by ensuring data consistency"""
        print("Preparing database for backup...")

        # 1. Flush all tables to ensure data is on disk
        self.flush_all_tables()

        # 2. Perform incremental checkpoint to truncate WAL
        self.checkpoint_before_backup()

        print("Database prepared for backup")

    def flush_all_tables(self):
        """Flush all tables in the database"""
        # Get list of all tables (this would be database-specific)
        all_tables = [
            ("production_db", "users"),
            ("production_db", "orders"),
            ("production_db", "products"),
            ("production_db", "categories"),
            ("production_db", "transactions")
        ]

        for database, table in all_tables:
            try:
                self.moctl.flush_table(database, table)
                print(f"Flushed {database}.{table}")
            except Exception as e:
                print(f"Failed to flush {database}.{table}: {e}")

    def checkpoint_before_backup(self):
        """Perform checkpoint before backup"""
        try:
            # First incremental checkpoint
            self.moctl.increment_checkpoint()
            print("Incremental checkpoint completed")

            # Then global checkpoint
            self.moctl.global_checkpoint()
            print("Global checkpoint completed")

        except Exception as e:
            print(f"Checkpoint failed: {e}")

# Usage
backup_prep = BackupPreparationSystem()
backup_prep.prepare_for_backup()

Performance Monitoring System

class PerformanceMonitoringSystem:
    def __init__(self):
        self.client = Client(*get_connection_params())
        self.client.connect(*get_connection_params())
        self.moctl = self.client.moctl

    def monitor_and_optimize(self):
        """Monitor performance and perform optimizations"""
        print("Starting performance monitoring...")

        # Monitor table sizes and flush large tables
        self.flush_large_tables()

        # Perform regular checkpoints
        self.perform_regular_checkpoints()

        print("Performance monitoring completed")

    def flush_large_tables(self):
        """Flush tables that are likely to be large"""
        large_tables = [
            ("production_db", "user_activity_logs"),
            ("production_db", "system_logs"),
            ("analytics_db", "event_logs"),
            ("analytics_db", "metrics_data")
        ]

        for database, table in large_tables:
            try:
                self.moctl.flush_table(database, table)
                print(f"Flushed large table {database}.{table}")
            except Exception as e:
                print(f"Failed to flush {database}.{table}: {e}")

    def perform_regular_checkpoints(self):
        """Perform regular checkpoints for performance"""
        try:
            # Incremental checkpoint for WAL truncation
            self.moctl.increment_checkpoint()
            print("Regular incremental checkpoint completed")

            # Global checkpoint for consistency
            self.moctl.global_checkpoint()
            print("Regular global checkpoint completed")

        except Exception as e:
            print(f"Regular checkpoint failed: {e}")

    def start_monitoring(self, interval_minutes=15):
        """Start continuous performance monitoring"""
        while True:
            self.monitor_and_optimize()
            time.sleep(interval_minutes * 60)

# Usage
monitor = PerformanceMonitoringSystem()
monitor.monitor_and_optimize()

Sys Account Requirements

All MOCTL operations require 'sys' account privileges. Here's how to handle permission issues:

Common Permission Errors

try:
    result = moctl.flush_table("database", "table")
except MoCtlError as e:
    if "permission" in str(e).lower() or "privilege" in str(e).lower():
        print("❌ Permission denied. Ensure you're connected with sys account:")
        print("   client.connect(host, port, 'sys#root', password, database)")
        print("   # or")
        print("   client.connect(host, port, 'root', password, database, account='sys')")
    else:
        print(f"MOCTL operation failed: {e}")

Verification

Verify your connection has sys privileges:

# Check current user and account
result = client.execute("SELECT USER(), CURRENT_USER()")
user_info = result.rows[0]
print(f"Connected as: {user_info[0]}")
print(f"Current user: {user_info[1]}")

# Verify sys account access
if 'sys' in user_info[0].lower() or 'sys' in user_info[1].lower():
    print("✅ Connected with sys account - MOCTL operations available")
else:
    print("❌ Not connected with sys account - MOCTL operations will fail")

Error Handling

Robust error handling for production applications:

from matrixone.exceptions import MoCtlError

try:
    # MOCTL operations
    result = moctl.flush_table("production_db", "users")
except MoCtlError as e:
    print(f"MOCTL error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

# Retry mechanism for MOCTL operations
def moctl_operation_with_retry(moctl, operation, max_retries=3):
    for attempt in range(max_retries):
        try:
            return operation()
        except Exception as e:
            print(f"Operation attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)  # Exponential backoff

# Safe table flushing with retry
def safe_flush_table(moctl, database, table, max_retries=3):
    def flush_operation():
        return moctl.flush_table(database, table)

    return moctl_operation_with_retry(moctl, flush_operation, max_retries)

Performance Optimization

Best practices for optimal performance:

# Batch table flushing
def batch_flush_tables(moctl, tables):
    results = []
    for database, table in tables:
        try:
            result = moctl.flush_table(database, table)
            results.append((database, table, result))
        except Exception as e:
            results.append((database, table, f"Error: {e}"))
    return results

# Efficient checkpoint scheduling
def efficient_checkpoint_schedule(moctl):
    # Flush critical tables first
    critical_tables = [
        ("production_db", "users"),
        ("production_db", "orders")
    ]

    for database, table in critical_tables:
        moctl.flush_table(database, table)

    # Then perform checkpoints
    moctl.increment_checkpoint()
    moctl.global_checkpoint()

# Connection pooling for high-throughput applications
class MoCtlService:
    def __init__(self):
        self.client = Client(*get_connection_params())
        self.client.connect(*get_connection_params())
        self.moctl = self.client.moctl
        self.lock = threading.Lock()

    def thread_safe_flush(self, database, table):
        with self.lock:
            return self.moctl.flush_table(database, table)

Troubleshooting

Common issues and solutions:

Table flush failures
  • Verify table exists and is accessible
  • Check database permissions
  • Ensure table is not locked by other operations
Checkpoint failures
  • Verify cluster is in healthy state
  • Check for ongoing transactions
  • Ensure sufficient disk space
Global checkpoint issues
  • Verify all nodes are accessible
  • Check network connectivity
  • Ensure cluster consistency
Performance issues
  • Use incremental checkpoints for frequent operations
  • Flush tables during low-activity periods
  • Monitor WAL size and truncate regularly

For more information, see the :doc:`api/client` and :doc:`best_practices`.