diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..8f1b488 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,246 @@ +# Contributing to py-tsbs-benchmark + +Thank you for your interest in contributing to py-tsbs-benchmark! This guide will help you get started. + +## ๐Ÿš€ Quick Start for Contributors + +1. **Fork the repository** on GitHub +2. **Clone your fork**: + ```bash + git clone https://github.com/YOUR_USERNAME/py-tsbs-benchmark.git + cd py-tsbs-benchmark + ``` +3. **Set up development environment**: + ```bash + poetry install --with dev + ``` +4. **Create a branch** for your changes: + ```bash + git checkout -b feature/your-improvement + ``` +5. **Make your changes** and test them +6. **Submit a pull request** + +## ๐Ÿงช Running Tests + +Before submitting your changes, make sure all tests pass: + +```bash +# Run all tests +poetry run pytest + +# Run tests with coverage +poetry run pytest --cov=py_tsbs_benchmark + +# Run specific test file +poetry run pytest tests/test_common.py -v +``` + +## ๐Ÿ“ Code Style + +We follow Python best practices: + +- **PEP 8** for code formatting +- **Type hints** where possible +- **Docstrings** for all functions and classes (Google/NumPy style) +- **Descriptive variable names** and comments + +### Code Formatting + +We recommend using Black and flake8: + +```bash +# Format code +poetry run black py_tsbs_benchmark/ tests/ + +# Check code style +poetry run flake8 py_tsbs_benchmark/ tests/ + +# Type checking +poetry run mypy py_tsbs_benchmark/ +``` + +## ๐Ÿ› ๏ธ Development Guidelines + +### Adding New Features + +1. **Write tests first** (TDD approach encouraged) +2. **Add comprehensive docstrings** with examples +3. **Include error handling** with proper logging +4. **Update documentation** if needed + +### Error Handling + +- Use specific exception types +- Add logging for debugging +- Provide helpful error messages +- Handle edge cases gracefully + +### Example of Good Error Handling: + +```python +import logging + +logger = logging.getLogger(__name__) + +def process_data(data): + """Process input data with proper error handling. + + Args: + data: Input data to process + + Returns: + Processed data + + Raises: + ValueError: If data is invalid + RuntimeError: If processing fails + """ + try: + if not data: + raise ValueError("Input data cannot be empty") + + logger.info(f"Processing {len(data)} items") + result = expensive_operation(data) + logger.info("Processing completed successfully") + return result + + except Exception as e: + logger.error(f"Processing failed: {e}", exc_info=True) + raise RuntimeError(f"Failed to process data: {e}") from e +``` + +## ๐ŸŽฏ Areas Looking for Contributions + +### Beginner-Friendly Issues + +- **Documentation improvements**: Fix typos, add examples, improve clarity +- **Error messages**: Make them more helpful and user-friendly +- **Unit tests**: Increase test coverage for edge cases +- **Code comments**: Add explanatory comments for complex logic + +### Intermediate Tasks + +- **Performance optimizations**: Profile and improve slow operations +- **Logging enhancements**: Add structured logging with appropriate levels +- **Configuration**: Add support for config files (YAML/TOML) +- **CLI improvements**: Better help messages, validation, progress bars + +### Advanced Projects + +- **New benchmark modes**: Different data patterns or workloads +- **Memory profiling**: Track and report memory usage during benchmarks +- **Async support**: Add async/await patterns where beneficial +- **CI/CD pipeline**: Set up GitHub Actions for testing and releases + +## ๐Ÿ“‹ Pull Request Guidelines + +### Before Submitting + +- [ ] Tests pass locally (`poetry run pytest`) +- [ ] Code is formatted (`poetry run black .`) +- [ ] No linting errors (`poetry run flake8`) +- [ ] Documentation updated if needed +- [ ] CHANGELOG.md updated (if applicable) + +### Pull Request Description + +Please include: + +1. **What** changes you made +2. **Why** you made them (link to issue if applicable) +3. **How** to test the changes +4. **Screenshots** or examples if relevant + +### Example PR Title and Description + +``` +feat: Add structured logging with configurable levels + +- Replaces print statements with proper logging +- Adds --log-level CLI option for controlling verbosity +- Includes request/response logging for debugging database issues +- Adds logging configuration in main() function + +Fixes #123 + +Testing: +- Run with --log-level DEBUG to see detailed logs +- Run with --log-level INFO for normal operation +- Verify no functionality changes for existing users +``` + +## ๐Ÿ› Reporting Issues + +When reporting bugs, please include: + +- Python version and OS +- QuestDB version +- Full error traceback +- Minimal reproduction steps +- Expected vs actual behavior + +## ๐Ÿ“š Development Environment + +### Required Tools + +- **Python 3.10+** +- **Poetry** for dependency management +- **QuestDB** for integration testing +- **Git** for version control + +### Optional but Recommended + +- **VS Code** with Python extension +- **Docker** for running QuestDB consistently +- **pytest-xdist** for parallel test execution + +### Environment Setup Script + +```bash +#!/bin/bash +# setup_dev.sh - One-time development setup + +# Install Poetry if not present +if ! command -v poetry &> /dev/null; then + curl -sSL https://install.python-poetry.org | python3 - +fi + +# Install dependencies +poetry install --with dev + +# Setup pre-commit hooks (if available) +poetry run pre-commit install 2>/dev/null || echo "Pre-commit not configured" + +# Start QuestDB in background +docker run -d -p 9000:9000 -p 9009:9009 --name questdb-dev questdb/questdb + +echo "โœ… Development environment ready!" +echo "๐Ÿ“ Run 'poetry run pytest' to verify setup" +``` + +## ๐Ÿค Code of Conduct + +We are committed to providing a welcoming and inclusive environment: + +- **Be respectful** and considerate +- **Be collaborative** and help others learn +- **Be patient** with newcomers and different skill levels +- **Give constructive feedback** in reviews + +## โ“ Questions? + +- **General questions**: Open a GitHub Discussion +- **Bug reports**: Create a GitHub Issue +- **Feature requests**: Create a GitHub Issue with "enhancement" label +- **Security issues**: Email the maintainers directly + +## ๐ŸŽ‰ Recognition + +Contributors will be: + +- Added to the AUTHORS file +- Mentioned in release notes +- Given credit in pull request merges + +Thank you for helping improve py-tsbs-benchmark! ๐Ÿš€ diff --git a/README.md b/README.md index ad80bde..bb80578 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,25 @@ # Benchmarking Ingestion of Pandas into QuestDB +[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/) +[![License: Apache 2.0](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) + +## ๐Ÿš€ Quick Start + +For the impatient, here's how to run a basic benchmark: + +```bash +# Clone and setup +git clone https://github.com/questdb/py-tsbs-benchmark.git +cd py-tsbs-benchmark +poetry install + +# Start QuestDB (in separate terminal) +questdb start + +# Run benchmark with data ingestion +poetry run bench_pandas --send --row-count 1000000 +``` + ## Background [QuestDB](https://questdb.io/) is our timeseries relational database with SQL query support. We support a dedicate protocol (called @@ -34,35 +54,102 @@ The data consists of: * 1 TIMESTAMP column (unix epoch nanoseconds, UTC) To run these benchmarks, you will need: -* Modern hardware with multiple cores and enough -ram to hold a large Pandas dataset in memory. -* Python 3.10 and [poetry](https://python-poetry.org/). -* and a recent version of QuestDB. +* Modern hardware with multiple cores and enough RAM to hold a large Pandas dataset in memory +* **Python 3.10+** and [poetry](https://python-poetry.org/) for dependency management +* A recent version of **QuestDB** (v7.0.1 or later recommended) + +You can follow through the setup and run commands below, or skip to see +[benchmark results](#results) from our test runs. + +## ๐Ÿ“ฆ Prerequisites + +### Python and Poetry + +1. **Python 3.10+**: Download from [python.org](https://www.python.org/downloads/) +2. **Poetry**: Install using the official installer: + ```bash + # Linux/macOS/Windows (WSL) + curl -sSL https://install.python-poetry.org | python3 - + + # Windows (PowerShell) + (Invoke-WebRequest -Uri https://install.python-poetry.org -UseBasicParsing).Content | python - + ``` -You can follow through the setup and the run commands or just scroll down to see -numbers from our benchmark runs. +### QuestDB + +Download and install QuestDB: +- **Docker**: `docker run -p 9000:9000 -p 9009:9009 questdb/questdb` +- **Binary**: Download from [questdb.io/get-questdb](https://questdb.io/get-questdb/) +- **Package managers**: `brew install questdb` (macOS) or see [installation guide](https://questdb.io/docs/get-started/docker/) ## Setup -### Python Client +### Python Environment and Dependencies -After cloning this git repo: +After cloning this repository: ```bash -poetry env use 3.10 +# Clone the repository +git clone https://github.com/questdb/py-tsbs-benchmark.git +cd py-tsbs-benchmark + +# Set up Python environment and install dependencies +poetry env use 3.10 # or python3.10 if poetry can't find it poetry install -``` -Note that each benchmark run will delete and re-create the `'cpu'` table. +# Verify installation +poetry run bench_pandas --help +``` -### Preparing QuestDB +### Starting QuestDB -Start a QuestDB instance. +Start a QuestDB instance before running benchmarks: ```bash +# Method 1: Using Docker (recommended for testing) +docker run -p 9000:9000 -p 9009:9009 questdb/questdb + +# Method 2: Using local installation questdb start + +# Method 3: Using Docker Compose (see docker-compose.yml if available) +docker-compose up questdb ``` +**Verify QuestDB is running**: Open [http://localhost:9000](http://localhost:9000) in your browser. + +## ๐Ÿ”ง Configuration + +### Basic Usage + +```bash +# Run serialization-only benchmark (no database) +poetry run bench_pandas --row-count 1000000 + +# Run full benchmark with database ingestion +poetry run bench_pandas --send --row-count 1000000 + +# Multi-threaded benchmark +poetry run bench_pandas --send --workers 4 --row-count 1000000 + +# Custom QuestDB connection +poetry run bench_pandas --send --host questdb.example.com --ilp-port 9009 --http-port 9000 +``` + +### Command Line Options + +| Option | Description | Default | +|--------|-------------|---------| +| `--row-count` | Number of rows to generate | 10,000,000 | +| `--scale` | Number of unique hostnames | 4,000 | +| `--workers` | Number of parallel threads | None (single-threaded) | +| `--send` | Send data to QuestDB | False (serialize only) | +| `--host` | QuestDB hostname | localhost | +| `--ilp-port` | ILP port | 9009 | +| `--http-port` | HTTP port | 9000 | +| `--op` | Operation type | dataframe | +| `--debug` | Enable debug logging | False | + ## Running ### The hardware we used @@ -377,3 +464,78 @@ cpu,hostname=host_7,region=ap-southeast-1,datacenter=ap-southeast-1a,rack=97,os= cpu,hostname=host_8,region=eu-central-1,datacenter=eu-central-1b,rack=43,os=Ubuntu16.04LTS,arch=x86,team=SF,service=18,service_version=0,service_environment=production usage_user=2.3683820719125404,usage_system=3.1496636608187587,usage_idle=1.0714252817838013,usage_nice=0.0,usage_iowait=3.658575628441112,usage_irq=0.0,usage_softirq=0.0,usage_steal=0.9944564076833474,usage_guest=3.606177791932647,usage_guest_nice=5.665699532249171 1451606480000000000 cpu,hostname=host_9,region=sa-east-1,datacenter=sa-east-1b,rack=82,os=Ubuntu15.10,arch=x86,team=CHI,service=14,service_version=1,service_environment=staging usage_user=2.711560205310839,usage_system=2.92632821713108,usage_idle=1.6924636783124183,usage_nice=0.8654306023153091,usage_iowait=5.201435533195961,usage_irq=0.0,usage_softirq=1.7215318876485612,usage_steal=0.6839422702175311,usage_guest=3.1192465146389465,usage_guest_nice=5.414096713475799 1451606490000000000 ``` + +## ๐Ÿงช Testing + +Run the test suite to verify everything is working: + +```bash +# Run all tests +poetry run python -m pytest tests/ -v + +# Run tests with coverage +poetry run python -m pytest tests/ --cov=py_tsbs_benchmark --cov-report=html + +# Run specific test modules +poetry run python -m pytest tests/test_common.py -v +poetry run python -m pytest tests/test_bench_pandas.py -v +``` + +## ๐Ÿค Contributing + +We welcome contributions! This project is designed to be beginner-friendly for those +wanting to contribute to open-source Python projects. + +### Getting Started with Development + +1. **Fork and clone** the repository +2. **Set up development environment**: + ```bash + poetry install --dev # Install with development dependencies + poetry run pre-commit install # Set up git hooks (if available) + ``` +3. **Run tests** to ensure everything works: + ```bash + poetry run python -m pytest tests/ -v + ``` + +### Development Guidelines + +- **Code Style**: Follow PEP 8, use type hints where possible +- **Documentation**: Add docstrings to all functions and classes +- **Testing**: Write unit tests for new functionality +- **Error Handling**: Use appropriate try-catch blocks and logging +- **Commit Messages**: Use clear, descriptive commit messages in English + +### Suggested Areas for Contribution + +1. **Performance optimizations** in data generation or serialization +2. **Additional benchmark metrics** (memory usage, latency percentiles) +3. **Support for different data types** beyond the current TSBS schema +4. **Improved error handling** and recovery mechanisms +5. **Documentation improvements** and examples +6. **CI/CD pipeline** setup and automation + +### Submitting Changes + +1. Create a feature branch: `git checkout -b feature/your-improvement` +2. Make your changes with proper tests and documentation +3. Run the test suite: `poetry run python -m pytest tests/ -v` +4. Commit your changes: `git commit -m "feat: add your improvement"` +5. Push and create a Pull Request with a clear description + +### Code of Conduct + +Please be respectful and constructive in all interactions. This project follows +the [Contributor Covenant](https://www.contributor-covenant.org/). + +## ๐Ÿ“„ License + +This project is licensed under the Apache License 2.0 - see the [LICENSE](LICENSE) file for details. + +## ๐Ÿ™‹โ€โ™€๏ธ Support + +- **Documentation**: [QuestDB Documentation](https://questdb.io/docs/) +- **Issues**: [GitHub Issues](https://github.com/questdb/py-tsbs-benchmark/issues) +- **Community**: [QuestDB Slack](https://slack.questdb.io/) +- **Python Client**: [py-questdb-client docs](https://py-questdb-client.readthedocs.io/) diff --git a/py_tsbs_benchmark/bench_pandas.py b/py_tsbs_benchmark/bench_pandas.py index 364c1d1..805655b 100644 --- a/py_tsbs_benchmark/bench_pandas.py +++ b/py_tsbs_benchmark/bench_pandas.py @@ -6,14 +6,30 @@ import sys import pprint import textwrap +import logging from concurrent.futures import ThreadPoolExecutor, Future from numba import vectorize, float64 from .common import CpuTable +# Set up logging +logger = logging.getLogger(__name__) + @vectorize([float64(float64, float64)]) def _clip_add(x, y): + """Add two float64 values and clip the result to [0.0, 100.0] range. + + This is a vectorized function optimized with Numba for performance. + Used to generate realistic CPU usage data that stays within valid bounds. + + Args: + x (float64): First value to add + y (float64): Second value to add + + Returns: + float64: Sum of x and y, clipped to range [0.0, 100.0] + """ z = x + y # Clip to the 0 and 100 boundaries if z < 0.0: @@ -102,6 +118,26 @@ def _clip_add(x, y): def gen_dataframe(seed, row_count, scale): + """Generate a synthetic TSBS-compatible CPU metrics DataFrame. + + Creates a pandas DataFrame with the same structure as the TSBS 'cpu' + dataset, containing 10 symbol columns (strings) and 10 numeric CPU usage + columns. The data simulates realistic server monitoring metrics across + multiple regions, datacenters, and hosts. + + Args: + seed (int): Random seed for reproducible data generation + row_count (int): Number of rows to generate + scale (int): Number of unique hostnames to cycle through + + Returns: + pd.DataFrame: DataFrame with TSBS cpu table schema containing: + - Symbol columns: hostname, region, datacenter, rack, os, arch, + team, service, service_version, service_environment + - Numeric columns: 10 CPU usage metrics (usage_user, + usage_system, etc.) + - timestamp: DateTime index with 10-second intervals + """ rand, np_rand = random.Random(seed), np.random.default_rng(seed) def mk_symbols_series(strings): @@ -151,7 +187,8 @@ def mk_cpu_series(): 'usage_steal': mk_cpu_series(), 'usage_guest': mk_cpu_series(), 'usage_guest_nice': mk_cpu_series(), - 'timestamp': pd.date_range('2016-01-01', periods=row_count, freq='10s'), + 'timestamp': pd.date_range( + '2016-01-01', periods=row_count, freq='10s'), }) df.index.name = 'cpu' @@ -159,6 +196,11 @@ def mk_cpu_series(): def parse_args(): + """Parse command line arguments for the benchmark script. + + Returns: + argparse.Namespace: Parsed command line arguments with defaults + """ seed = random.randrange(sys.maxsize) import argparse parser = argparse.ArgumentParser() @@ -171,16 +213,27 @@ def parse_args(): parser.add_argument('--host', type=str, default='localhost') parser.add_argument('--ilp-port', type=int, default=9009) parser.add_argument('--http-port', type=int, default=9000) - parser.add_argument('--op', choices=['dataframe', 'iterrows', 'itertuples'], - default='dataframe') + parser.add_argument('--op', + choices=['dataframe', 'iterrows', 'itertuples'], + default='dataframe') parser.add_argument('--workers', type=int, default=None) parser.add_argument('--worker-chunk-row-count', type=int, default=10_000) - parser.add_argument('--validation-query-timeout', type=float, default=120.0) + parser.add_argument('--validation-query-timeout', + type=float, default=120.0) parser.add_argument('--debug', action='store_true', default=False) return parser.parse_args() def chunk_up_dataframe(df, chunk_row_count): + """Split a DataFrame into smaller chunks for parallel processing. + + Args: + df (pd.DataFrame): DataFrame to split + chunk_row_count (int): Maximum number of rows per chunk + + Returns: + list[pd.DataFrame]: List of DataFrame chunks + """ dfs = [] for i in range(0, len(df), chunk_row_count): dfs.append(df.iloc[i:i + chunk_row_count]) @@ -188,6 +241,15 @@ def chunk_up_dataframe(df, chunk_row_count): def assign_dfs_to_workers(dfs, workers): + """Distribute DataFrame chunks evenly across workers using round-robin. + + Args: + dfs (list[pd.DataFrame]): List of DataFrame chunks + workers (int): Number of worker threads + + Returns: + list[list[pd.DataFrame]]: List of chunk lists, one per worker + """ dfs_by_worker = [[] for _ in range(workers)] for i, df in enumerate(dfs): dfs_by_worker[i % workers].append(df) @@ -195,22 +257,50 @@ def assign_dfs_to_workers(dfs, workers): def sanity_check_split(df, dfs): + """Verify that DataFrame chunks can be recombined to original DataFrame. + + Args: + df (pd.DataFrame): Original DataFrame + dfs (list[pd.DataFrame]): List of DataFrame chunks + + Raises: + AssertionError: If chunks don't match the original DataFrame + """ df2 = pd.concat(dfs) assert len(df) == len(df2) assert df.equals(df2) def sanity_check_split2(df, dfs_by_worker): + """Verify that worker-assigned chunks can be recombined to original. + + Args: + df (pd.DataFrame): Original DataFrame + dfs_by_worker (list[list[pd.DataFrame]]): Chunks assigned to workers + + Raises: + AssertionError: If chunks don't match the original DataFrame + """ df2 = pd.concat([ df for dfs in dfs_by_worker - for df in dfs]) + for df in dfs]) df2.sort_values(by='timestamp', inplace=True) assert len(df) == len(df2) assert df.equals(df2) def chunk_up_by_worker(df, workers, chunk_row_count): + """Split DataFrame into chunks and assign them to workers. + + Args: + df (pd.DataFrame): DataFrame to split + workers (int): Number of worker threads + chunk_row_count (int): Maximum number of rows per chunk + + Returns: + list[list[pd.DataFrame]]: Chunks assigned to each worker + """ dfs = chunk_up_dataframe(df, chunk_row_count) sanity_check_split(df, dfs) dfs_by_worker = assign_dfs_to_workers(dfs, workers) @@ -294,7 +384,7 @@ def serialize_one(args, df): buf = qi.Buffer() op = _OP_MAP[args.op] t0 = time.monotonic() - op(buf, df) + op(buf, df) t1 = time.monotonic() elapsed = t1 - t0 if args.write_ilp: @@ -445,36 +535,66 @@ def worker_job(op, sender, worker_dfs): def main(): + """Main benchmark execution function.""" args = parse_args() + + # Configure logging + log_level = logging.DEBUG if args.debug else logging.INFO + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), + ] + ) + + logger.info("Starting py-tsbs-benchmark") pretty_args = textwrap.indent(pprint.pformat(vars(args)), ' ') print(f'Running with params:\n{pretty_args}') + logger.debug(f"Configuration: {vars(args)}") - cpu_table = CpuTable(args.host, args.http_port) - - if args.send: - cpu_table.drop() - cpu_table.create() + try: + cpu_table = CpuTable(args.host, args.http_port) + logger.info(f"Connected to QuestDB at {args.host}:{args.http_port}") - df = gen_dataframe(args.seed, args.row_count, args.scale) - - if not args.workers: - size = serialize_one(args, df) - else: - if args.workers < 1: - raise ValueError('workers must be >= 1') - size = serialize_workers(args, df) + if args.send: + cpu_table.drop() + cpu_table.create() - if args.shell: - import code - code.interact(local=locals()) + logger.info(f"Generating DataFrame with {args.row_count} rows") + df = gen_dataframe(args.seed, args.row_count, args.scale) + logger.info("DataFrame generation completed") - if args.send: if not args.workers: - send_one(args, df, size) + logger.info("Starting single-threaded serialization") + size = serialize_one(args, df) else: - send_workers(args, df, size) - - cpu_table.block_until_rowcount( - args.row_count, timeout=args.validation_query_timeout) - else: - print('Not sending. Use --send to send to server.') + if args.workers < 1: + raise ValueError('workers must be >= 1') + logger.info(f"Starting multi-threaded serialization with " + f"{args.workers} workers") + size = serialize_workers(args, df) + + if args.shell: + import code + code.interact(local=locals()) + + if args.send: + if not args.workers: + logger.info("Starting single-threaded data transmission") + send_one(args, df, size) + else: + logger.info("Starting multi-threaded data transmission") + send_workers(args, df, size) + + logger.info("Validating row count in database") + cpu_table.block_until_rowcount( + args.row_count, timeout=args.validation_query_timeout) + logger.info("Benchmark completed successfully") + else: + print('Not sending. Use --send to send to server.') + logger.info("Benchmark completed (serialization only)") + + except Exception as e: + logger.error(f"Benchmark failed: {e}", exc_info=True) + raise diff --git a/py_tsbs_benchmark/common.py b/py_tsbs_benchmark/common.py index f184423..6d582a5 100644 --- a/py_tsbs_benchmark/common.py +++ b/py_tsbs_benchmark/common.py @@ -1,55 +1,144 @@ import requests import time +import logging + +# Set up logging +logger = logging.getLogger(__name__) + class CpuTable: + """Helper class for managing QuestDB 'cpu' table operations via HTTP API. + + Provides methods to create, drop, and query the cpu table used in + benchmarking. Uses QuestDB's HTTP query interface for table operations. + """ + def __init__(self, host, port): + """Initialize CpuTable with connection parameters. + + Args: + host (str): QuestDB server hostname + port (int): QuestDB HTTP port (typically 9000) + """ self.host = host self.port = port def _request(self, sql): - response = requests.get( - f'http://{self.host}:{self.port}/exec', - params={'query': sql}).json() - return response + """Execute SQL query via QuestDB HTTP API. + + Args: + sql (str): SQL query to execute + + Returns: + dict: JSON response from QuestDB server + + Raises: + requests.RequestException: If HTTP request fails + ValueError: If response is not valid JSON + """ + try: + logger.debug(f"Executing SQL: {sql[:100]}...") + response = requests.get( + f'http://{self.host}:{self.port}/exec', + params={'query': sql}, + timeout=30) + response.raise_for_status() + return response.json() + except requests.RequestException as e: + logger.error(f"HTTP request failed: {e}") + raise + except ValueError as e: + logger.error(f"Failed to parse JSON response: {e}") + raise def drop(self): - response = self._request('drop table cpu') - if response.get('ddl') == 'OK': - print(f'Dropped table cpu') - return True - elif response.get('error', '').startswith('table does not exist'): - print(f'Table cpu does not exist') - return False - else: - raise RuntimeError(f'Failed to drop table cpu: {response}') + """Drop the cpu table if it exists. + + Returns: + bool: True if table was dropped, False if it didn't exist + + Raises: + RuntimeError: If drop operation fails + """ + try: + logger.info("Attempting to drop cpu table") + response = self._request('drop table cpu') + if response.get('ddl') == 'OK': + print('Dropped table cpu') + logger.info("Successfully dropped cpu table") + return True + elif response.get('error', '').startswith('table does not exist'): + print('Table cpu does not exist') + logger.info("Table cpu does not exist, nothing to drop") + return False + else: + error_msg = f'Failed to drop table cpu: {response}' + logger.error(error_msg) + raise RuntimeError(error_msg) + except Exception as e: + logger.error(f"Error dropping table: {e}") + raise def create(self): - symbol_cols = [ - 'hostname', 'region', 'datacenter', 'rack', 'os', 'arch', - 'team', 'service', 'service_version', 'service_environment'] - double_cols = [ - 'usage_user', 'usage_system', 'usage_idle', 'usage_nice', - 'usage_iowait', 'usage_irq', 'usage_softirq', 'usage_steal', - 'usage_guest', 'usage_guest_nice'] - sql = f''' - create table cpu ( - {', '.join(f'{col} symbol' for col in symbol_cols)}, - {', '.join(f'{col} double' for col in double_cols)}, - timestamp timestamp) - timestamp(timestamp) - partition by day - ''' - response = self._request(sql) - if response.get('ddl') == 'OK': - print(f'Created table cpu') - else: - raise RuntimeError(f'Failed to create table cpu: {response}') + """Create the cpu table with TSBS schema. + + Creates table with 10 symbol columns, 10 double columns, and + a timestamp column. Sets up partitioning by day. + + Raises: + RuntimeError: If table creation fails + """ + try: + logger.info("Creating cpu table with TSBS schema") + symbol_cols = [ + 'hostname', 'region', 'datacenter', 'rack', 'os', 'arch', + 'team', 'service', 'service_version', 'service_environment'] + double_cols = [ + 'usage_user', 'usage_system', 'usage_idle', 'usage_nice', + 'usage_iowait', 'usage_irq', 'usage_softirq', 'usage_steal', + 'usage_guest', 'usage_guest_nice'] + sql = f''' + create table cpu ( + {', '.join(f'{col} symbol' for col in symbol_cols)}, + {', '.join(f'{col} double' for col in double_cols)}, + timestamp timestamp) + timestamp(timestamp) + partition by day + ''' + response = self._request(sql) + if response.get('ddl') == 'OK': + print('Created table cpu') + logger.info("Successfully created cpu table") + else: + error_msg = f'Failed to create table cpu: {response}' + logger.error(error_msg) + raise RuntimeError(error_msg) + except Exception as e: + logger.error(f"Error creating table: {e}") + raise def get_row_count(self): + """Get the current number of rows in the cpu table. + + Returns: + int: Number of rows in the cpu table + """ response = self._request('select count(*) from cpu') return response['dataset'][0][0] def block_until_rowcount(self, target_count, timeout=30.0): + """Block until the table reaches the target row count. + + Polls the table row count until it matches the target, with timeout. + Used for validation after data ingestion. + + Args: + target_count (int): Expected number of rows + timeout (float): Maximum time to wait in seconds + + Raises: + RuntimeError: If timeout is reached or row count exceeds target + """ t0 = time.monotonic() while True: row_count = self.get_row_count() diff --git a/pyproject.toml b/pyproject.toml index 4bbfc0b..35ba02c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,13 @@ pyarrow = "^10.0.1" numba = "^0.56.4" requests = "^2.28.1" +[tool.poetry.group.dev.dependencies] +pytest = "^7.0.0" +pytest-cov = "^4.0.0" +black = "^23.0.0" +flake8 = "^5.0.0" +mypy = "^1.0.0" + [tool.poetry.scripts] bench_pandas = "py_tsbs_benchmark.bench_pandas:main" bench_raw_ilp = "py_tsbs_benchmark.bench_raw_ilp:main" @@ -22,3 +29,26 @@ bench_raw_ilp = "py_tsbs_benchmark.bench_raw_ilp:main" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py", "*_test.py"] +python_classes = ["Test*"] +python_functions = ["test_*"] +addopts = [ + "--strict-markers", + "--disable-warnings", + "-v" +] + +[tool.coverage.run] +source = ["py_tsbs_benchmark"] +omit = ["tests/*"] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "raise AssertionError", + "raise NotImplementedError", +] diff --git a/tests/test_bench_pandas.py b/tests/test_bench_pandas.py new file mode 100644 index 0000000..a26fefc --- /dev/null +++ b/tests/test_bench_pandas.py @@ -0,0 +1,250 @@ +"""Unit tests for bench_pandas module.""" + +import unittest +import pandas as pd +import numpy as np + +from py_tsbs_benchmark.bench_pandas import ( + _clip_add, gen_dataframe, chunk_up_dataframe, + assign_dfs_to_workers, sanity_check_split, sanity_check_split2 +) + + +class TestBenchPandas(unittest.TestCase): + """Test cases for bench_pandas module.""" + + def test_clip_add_normal_values(self): + """Test _clip_add with normal values.""" + # Test values within range + result = _clip_add(30.0, 40.0) + self.assertEqual(result, 70.0) + + def test_clip_add_upper_bound(self): + """Test _clip_add with values exceeding upper bound.""" + result = _clip_add(60.0, 50.0) + self.assertEqual(result, 100.0) + + def test_clip_add_lower_bound(self): + """Test _clip_add with values below lower bound.""" + result = _clip_add(-10.0, -5.0) + self.assertEqual(result, 0.0) + + def test_clip_add_exact_bounds(self): + """Test _clip_add with exact boundary values.""" + self.assertEqual(_clip_add(0.0, 0.0), 0.0) + self.assertEqual(_clip_add(50.0, 50.0), 100.0) + + def test_gen_dataframe_structure(self): + """Test that gen_dataframe creates correct structure.""" + # Arrange + seed = 12345 + row_count = 1000 + scale = 100 + + # Act + df = gen_dataframe(seed, row_count, scale) + + # Assert + self.assertEqual(len(df), row_count) + self.assertEqual(df.index.name, 'cpu') + + # Check symbol columns exist + symbol_cols = [ + 'hostname', 'region', 'datacenter', 'rack', 'os', 'arch', + 'team', 'service', 'service_version', 'service_environment' + ] + for col in symbol_cols: + self.assertIn(col, df.columns) + self.assertEqual(str(df[col].dtype), 'string') + + # Check numeric columns exist + numeric_cols = [ + 'usage_user', 'usage_system', 'usage_idle', 'usage_nice', + 'usage_iowait', 'usage_irq', 'usage_softirq', 'usage_steal', + 'usage_guest', 'usage_guest_nice' + ] + for col in numeric_cols: + self.assertIn(col, df.columns) + self.assertEqual(df[col].dtype, np.float64) + + # Check timestamp column + self.assertIn('timestamp', df.columns) + self.assertTrue(pd.api.types.is_datetime64_any_dtype(df['timestamp'])) + + def test_gen_dataframe_reproducibility(self): + """Test that gen_dataframe is reproducible with same seed.""" + # Arrange + seed = 54321 + row_count = 100 + scale = 50 + + # Act + df1 = gen_dataframe(seed, row_count, scale) + df2 = gen_dataframe(seed, row_count, scale) + + # Assert + pd.testing.assert_frame_equal(df1, df2) + + def test_gen_dataframe_hostnames_scale(self): + """Test that hostname generation respects scale parameter.""" + # Arrange + seed = 99999 + row_count = 1000 + scale = 10 + + # Act + df = gen_dataframe(seed, row_count, scale) + + # Assert + unique_hostnames = df['hostname'].unique() + self.assertEqual(len(unique_hostnames), scale) + + # Check hostname pattern + for hostname in unique_hostnames: + self.assertTrue(hostname.startswith('host_')) + + def test_chunk_up_dataframe(self): + """Test DataFrame chunking.""" + # Arrange + df = pd.DataFrame({'a': range(100), 'b': range(100, 200)}) + chunk_size = 30 + + # Act + chunks = chunk_up_dataframe(df, chunk_size) + + # Assert + self.assertEqual(len(chunks), 4) # 100 / 30 = 3.33, so 4 chunks + self.assertEqual(len(chunks[0]), 30) + self.assertEqual(len(chunks[1]), 30) + self.assertEqual(len(chunks[2]), 30) + self.assertEqual(len(chunks[3]), 10) # Last chunk has remainder + + def test_assign_dfs_to_workers(self): + """Test DataFrame assignment to workers.""" + # Arrange + dfs = [pd.DataFrame({'a': [i]}) for i in range(10)] + workers = 3 + + # Act + result = assign_dfs_to_workers(dfs, workers) + + # Assert + self.assertEqual(len(result), workers) + # Check round-robin assignment + self.assertEqual(len(result[0]), 4) # indices 0, 3, 6, 9 + self.assertEqual(len(result[1]), 3) # indices 1, 4, 7 + self.assertEqual(len(result[2]), 3) # indices 2, 5, 8 + + def test_sanity_check_split_valid(self): + """Test sanity check with valid split.""" + # Arrange + df = pd.DataFrame({'a': range(10), 'b': range(10, 20)}) + chunks = [df.iloc[:5], df.iloc[5:]] + + # Act & Assert (should not raise) + sanity_check_split(df, chunks) + + def test_sanity_check_split_invalid_length(self): + """Test sanity check with invalid split length.""" + # Arrange + df = pd.DataFrame({'a': range(10), 'b': range(10, 20)}) + chunks = [df.iloc[:3], df.iloc[5:]] # Missing rows 3-4 + + # Act & Assert + with self.assertRaises(AssertionError): + sanity_check_split(df, chunks) + + def test_sanity_check_split2_valid(self): + """Test sanity check for worker assignment.""" + # Arrange + df = pd.DataFrame({ + 'a': range(6), + 'timestamp': pd.date_range('2023-01-01', periods=6, freq='1H') + }) + # Split into 2 workers, 3 chunks each + dfs_by_worker = [ + [df.iloc[:2], df.iloc[4:5]], # Worker 0: rows 0-1, 4 + [df.iloc[2:4], df.iloc[5:6]] # Worker 1: rows 2-3, 5 + ] + + # Act & Assert (should not raise) + sanity_check_split2(df, dfs_by_worker) + + def test_sanity_check_split2_invalid(self): + """Test sanity check for invalid worker assignment.""" + # Arrange + df = pd.DataFrame({ + 'a': range(6), + 'timestamp': pd.date_range('2023-01-01', periods=6, freq='1H') + }) + # Invalid split - missing a row + dfs_by_worker = [ + [df.iloc[:2]], # Worker 0: rows 0-1 + [df.iloc[2:5]] # Worker 1: rows 2-4 (missing row 5) + ] + + # Act & Assert + with self.assertRaises(AssertionError): + sanity_check_split2(df, dfs_by_worker) + + +class TestDataFrameGeneration(unittest.TestCase): + """Additional tests for DataFrame generation edge cases.""" + + def test_small_scale_large_rowcount(self): + """Test with small scale and large row count.""" + # Arrange + seed = 1111 + row_count = 1000 + scale = 5 # Small number of hosts + + # Act + df = gen_dataframe(seed, row_count, scale) + + # Assert + self.assertEqual(len(df), row_count) + unique_hostnames = df['hostname'].unique() + self.assertEqual(len(unique_hostnames), scale) + + # Each hostname should appear multiple times + hostname_counts = df['hostname'].value_counts() + self.assertTrue(all(count > 1 for count in hostname_counts)) + + def test_large_scale_small_rowcount(self): + """Test with large scale and small row count.""" + # Arrange + seed = 2222 + row_count = 50 + scale = 100 # More hosts than rows + + # Act + df = gen_dataframe(seed, row_count, scale) + + # Assert + self.assertEqual(len(df), row_count) + # Should have at most row_count unique hostnames + unique_hostnames = df['hostname'].unique() + self.assertLessEqual(len(unique_hostnames), row_count) + + def test_cpu_usage_values_realistic(self): + """Test that CPU usage values are within realistic bounds.""" + # Arrange + seed = 3333 + row_count = 1000 + scale = 100 + + # Act + df = gen_dataframe(seed, row_count, scale) + + # Assert - CPU usage values should be >= 0 and <= 100 + usage_cols = [col for col in df.columns if col.startswith('usage_')] + for col in usage_cols: + values = df[col] + self.assertTrue(all(val >= 0.0 for val in values), + f"Found negative values in {col}") + self.assertTrue(all(val <= 100.0 for val in values), + f"Found values > 100 in {col}") + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_common.py b/tests/test_common.py new file mode 100644 index 0000000..38208e6 --- /dev/null +++ b/tests/test_common.py @@ -0,0 +1,184 @@ +"""Unit tests for common module.""" + +import unittest +from unittest.mock import Mock, patch +import requests + +from py_tsbs_benchmark.common import CpuTable + + +class TestCpuTable(unittest.TestCase): + """Test cases for CpuTable class.""" + + def setUp(self): + """Set up test fixtures.""" + self.host = 'localhost' + self.port = 9000 + self.cpu_table = CpuTable(self.host, self.port) + + @patch('py_tsbs_benchmark.common.requests.get') + def test_request_success(self, mock_get): + """Test successful HTTP request.""" + # Arrange + mock_response = Mock() + mock_response.json.return_value = {'ddl': 'OK'} + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + # Act + result = self.cpu_table._request('SELECT 1') + + # Assert + self.assertEqual(result, {'ddl': 'OK'}) + mock_get.assert_called_once_with( + f'http://{self.host}:{self.port}/exec', + params={'query': 'SELECT 1'}, + timeout=30 + ) + + @patch('py_tsbs_benchmark.common.requests.get') + def test_request_http_error(self, mock_get): + """Test HTTP request failure.""" + # Arrange + mock_get.side_effect = requests.RequestException("Connection error") + + # Act & Assert + with self.assertRaises(requests.RequestException): + self.cpu_table._request('SELECT 1') + + @patch('py_tsbs_benchmark.common.requests.get') + def test_request_json_error(self, mock_get): + """Test JSON parsing error.""" + # Arrange + mock_response = Mock() + mock_response.raise_for_status.return_value = None + mock_response.json.side_effect = ValueError("Invalid JSON") + mock_get.return_value = mock_response + + # Act & Assert + with self.assertRaises(ValueError): + self.cpu_table._request('SELECT 1') + + def test_drop_table_success(self): + """Test successful table drop.""" + # Arrange + with patch.object(self.cpu_table, '_request') as mock_request: + mock_request.return_value = {'ddl': 'OK'} + + # Act + result = self.cpu_table.drop() + + # Assert + self.assertTrue(result) + mock_request.assert_called_once_with('drop table cpu') + + def test_drop_table_not_exists(self): + """Test drop table when table doesn't exist.""" + # Arrange + with patch.object(self.cpu_table, '_request') as mock_request: + mock_request.return_value = {'error': 'table does not exist'} + + # Act + result = self.cpu_table.drop() + + # Assert + self.assertFalse(result) + + def test_drop_table_failure(self): + """Test table drop failure.""" + # Arrange + with patch.object(self.cpu_table, '_request') as mock_request: + mock_request.return_value = {'error': 'Some other error'} + + # Act & Assert + with self.assertRaises(RuntimeError): + self.cpu_table.drop() + + def test_create_table_success(self): + """Test successful table creation.""" + # Arrange + with patch.object(self.cpu_table, '_request') as mock_request: + mock_request.return_value = {'ddl': 'OK'} + + # Act + self.cpu_table.create() + + # Assert + mock_request.assert_called_once() + call_args = mock_request.call_args[0][0] + self.assertIn('create table cpu', call_args) + self.assertIn('hostname symbol', call_args) + self.assertIn('usage_user double', call_args) + self.assertIn('timestamp timestamp', call_args) + + def test_create_table_failure(self): + """Test table creation failure.""" + # Arrange + with patch.object(self.cpu_table, '_request') as mock_request: + mock_request.return_value = {'error': 'Creation failed'} + + # Act & Assert + with self.assertRaises(RuntimeError): + self.cpu_table.create() + + def test_get_row_count(self): + """Test getting row count.""" + # Arrange + with patch.object(self.cpu_table, '_request') as mock_request: + mock_request.return_value = {'dataset': [[12345]]} + + # Act + count = self.cpu_table.get_row_count() + + # Assert + self.assertEqual(count, 12345) + mock_request.assert_called_once_with('select count(*) from cpu') + + @patch('py_tsbs_benchmark.common.time.sleep') + @patch('py_tsbs_benchmark.common.time.monotonic') + def test_block_until_rowcount_success(self, mock_time, mock_sleep): + """Test successful blocking until row count reached.""" + # Arrange + mock_time.side_effect = [0, 1] # Start time, end time + with patch.object(self.cpu_table, 'get_row_count') as mock_count: + mock_count.return_value = 1000 + + # Act + self.cpu_table.block_until_rowcount(1000, timeout=30.0) + + # Assert + mock_count.assert_called_once() + + @patch('py_tsbs_benchmark.common.time.sleep') + @patch('py_tsbs_benchmark.common.time.monotonic') + def test_block_until_rowcount_timeout(self, mock_time, mock_sleep): + """Test timeout when waiting for row count.""" + # Arrange + mock_time.side_effect = [0, 35] # Start time, timeout exceeded + with patch.object(self.cpu_table, 'get_row_count') as mock_count: + mock_count.return_value = 500 # Less than target + + # Act & Assert + with self.assertRaises(RuntimeError) as cm: + self.cpu_table.block_until_rowcount(1000, timeout=30.0) + + self.assertIn('Timed out', str(cm.exception)) + + @patch('py_tsbs_benchmark.common.time.sleep') + @patch('py_tsbs_benchmark.common.time.monotonic') + def test_block_until_rowcount_exceeds(self, mock_time, mock_sleep): + """Test when row count exceeds target.""" + # Arrange + mock_time.side_effect = [0, 1] + with patch.object(self.cpu_table, 'get_row_count') as mock_count: + mock_count.return_value = 1500 # More than target + + # Act & Assert + with self.assertRaises(RuntimeError) as cm: + self.cpu_table.block_until_rowcount(1000, timeout=30.0) + + self.assertIn('exceeds target', str(cm.exception)) + + +if __name__ == '__main__': + unittest.main()