Skip to content

Concurrent query python dataframe hang #426

@auxten

Description

@auxten

Here is the 100% reproduce script

"""
Minimal reproduction of chDB concurrent execution hang.

This script uses ONLY chdb and pandas, no DataStore dependencies.
It reproduces the concurrent execution issue when querying DataFrames.
"""

import sys
import os
import signal
import traceback
import threading
import concurrent.futures
import time
import uuid

import pandas as pd
import chdb


# Signal handler for debugging
def dump_stacks(signum=None, frame=None):
    """Dump all thread stacks."""
    print("\n" + "=" * 80)
    print("DUMPING ALL THREAD STACKS")
    print("=" * 80)
    
    for thread_id, stack in sys._current_frames().items():
        print(f"\nThread {thread_id} ({threading.current_thread().name}):")
        traceback.print_stack(stack)
    
    print("=" * 80)
    os._exit(1)


signal.signal(signal.SIGUSR1, dump_stacks)


def worker(worker_id, test_type='shared'):
    """Worker function that queries a DataFrame."""
    print(f"[Worker {worker_id}] Starting (type={test_type})...")
    sys.stdout.flush()
    
    try:
        # Create a DataFrame
        df = pd.DataFrame({
            'id': list(range(10)),
            'value': [i * worker_id for i in range(10)],
            'category': [f'cat_{i % 3}' for i in range(10)]
        })
        
        print(f"[Worker {worker_id}] Created DataFrame shape={df.shape}")
        sys.stdout.flush()
        
        # Generate unique variable name
        var_name = f"__test_df_{uuid.uuid4().hex}__"
        print(f"[Worker {worker_id}] Using var_name={var_name[:30]}...")
        sys.stdout.flush()
        
        # Register DataFrame in global namespace
        globals()[var_name] = df
        print(f"[Worker {worker_id}] Registered DataFrame in globals()")
        sys.stdout.flush()
        
        try:
            # Execute SQL query using Python() table function
            sql = f"SELECT * FROM Python({var_name}) WHERE value > 3"
            
            print(f"[Worker {worker_id}] About to execute SQL...")
            print(f"[Worker {worker_id}] SQL: {sql}")
            sys.stdout.flush()
            
            if test_type == 'static':
                # Use static method
                result = chdb.query(sql, 'DataFrame')
            elif test_type == 'connection':
                # Use connection object
                conn = chdb.connect()
                result = conn.query(sql, 'DataFrame')
                conn.close()
            else:  # shared_connection
                # This will be passed from outside
                result = worker_id  # placeholder
            
            print(f"[Worker {worker_id}] SQL executed, result shape={result.shape if hasattr(result, 'shape') else 'N/A'}")
            sys.stdout.flush()
            
            return result
            
        finally:
            # Clean up global namespace
            if var_name in globals():
                del globals()[var_name]
                print(f"[Worker {worker_id}] Cleaned up var_name from globals()")
                sys.stdout.flush()
    
    except Exception as e:
        print(f"[Worker {worker_id}] EXCEPTION: {e}")
        traceback.print_exc()
        return None


def test_static_query_concurrent():
    """Test using chdb.query() static method concurrently."""
    print("\n" + "=" * 80)
    print("TEST: Concurrent chdb.query() with Python() table function")
    print("=" * 80)
    print(f"PID: {os.getpid()}")
    print(f"To dump stacks if hung: kill -USR1 {os.getpid()}")
    print("=" * 80)
    
    num_workers = 3
    print(f"\nStarting {num_workers} concurrent workers...")
    sys.stdout.flush()
    
    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
            futures = [executor.submit(worker, i, 'static') for i in range(num_workers)]
            
            print("All futures submitted, waiting for completion...")
            sys.stdout.flush()
            
            completed = []
            for future in concurrent.futures.as_completed(futures, timeout=15):
                try:
                    result = future.result()
                    completed.append(result)
                    print(f"\n✓ Worker completed ({len(completed)}/{num_workers})")
                    sys.stdout.flush()
                except Exception as e:
                    print(f"\n✗ Worker raised exception: {e}")
                    traceback.print_exc()
            
            print(f"\n✓ All {len(completed)} workers completed successfully!")
            return True
            
    except concurrent.futures.TimeoutError:
        print(f"\n✗ TIMEOUT after 15 seconds!")
        print("Dumping thread stacks...")
        sys.stdout.flush()
        dump_stacks()
        return False
        
    except Exception as e:
        print(f"\n✗ ERROR: {e}")
        traceback.print_exc()
        return False


def test_connection_per_thread():
    """Test using separate connection per thread."""
    print("\n" + "=" * 80)
    print("TEST: Separate Connection Per Thread")
    print("=" * 80)
    
    num_workers = 3
    print(f"\nStarting {num_workers} concurrent workers (separate connections)...")
    sys.stdout.flush()
    
    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
            futures = [executor.submit(worker, i, 'connection') for i in range(num_workers)]
            
            completed = []
            for future in concurrent.futures.as_completed(futures, timeout=15):
                try:
                    result = future.result()
                    completed.append(result)
                    print(f"\n✓ Worker completed ({len(completed)}/{num_workers})")
                    sys.stdout.flush()
                except Exception as e:
                    print(f"\n✗ Worker raised exception: {e}")
            
            print(f"\n✓ All {len(completed)} workers completed!")
            return True
            
    except concurrent.futures.TimeoutError:
        print(f"\n✗ TIMEOUT!")
        dump_stacks()
        return False
    except Exception as e:
        print(f"\n✗ ERROR: {e}")
        traceback.print_exc()
        return False


def main():
    """Run tests."""
    print("=" * 80)
    print("Minimal chDB Concurrent Execution Reproduction")
    print("Pure chDB + pandas, NO DataStore")
    print("=" * 80)
    
    # Test static method
    result1 = test_static_query_concurrent()
    
    # Test separate connections
    result2 = test_connection_per_thread()
    
    # Summary
    print("\n" + "=" * 80)
    print("SUMMARY")
    print("=" * 80)
    print(f"Static chdb.query(): {'✓ PASS' if result1 else '✗ FAIL/TIMEOUT'}")
    print(f"Separate connections: {'✓ PASS' if result2 else '✗ FAIL/TIMEOUT'}")
    print("=" * 80)
    
    if result1 and result2:
        print("\n✓ chDB concurrent execution works fine!")
        print("The hang must be in DataStore logic, not chDB itself.")
    else:
        print("\n✗ chDB has concurrent execution issues!")
        print("This is a chDB bug that should be reported.")


if __name__ == "__main__":
    main()

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions