diff --git a/docs/assessment/scheduler/README.md b/docs/assessment/scheduler/README.md new file mode 100644 index 0000000000..4a437b53ba --- /dev/null +++ b/docs/assessment/scheduler/README.md @@ -0,0 +1,24 @@ +# Installing the Usage Collection Scheduler + +## Why do we need a usage collection scheduler? +- Some data warehouses do not preserve query history for longer than a few days or are limited to only a few rows. As an example, Amazon Redshift only preserves usage history for around 2-5 days +- Having adequate usage history is critical to provide an accurate Total Cost of Ownership (TCO) estimate +- Having too little history could produce an inaccurate representation of data warehouse query patterns and system utilization + +## Supported operating systems +The usage collection scheduler can be installed on the following operating systems: + +1. MacOS +2. Linux + +## Local vs. Remote Deployments + +The usage collection scheduler can be executed in the following deployments: + +1. **Local scheduler** - Scheduler is installed on a local laptop or workstation, provided that the local system can connect to a bastion host and forward connectivity to the data warehouse +2. **Remote scheduler** - Scheduler is installed directly on a bastion host in a separate subnet or a compute instance deployed within the data warehouse's subnet + + +## Automatic restart + +The usage collection scheduler utilizes the host operating system to run as a background process. As a result, the process will automatically resume processing after a system restart; no intervention is needed during a host system restart. The scheduler will automatically detect usage history gaps and proactively backfill when possible. diff --git a/docs/assessment/scheduler/linux/README.md b/docs/assessment/scheduler/linux/README.md new file mode 100644 index 0000000000..1960449326 --- /dev/null +++ b/docs/assessment/scheduler/linux/README.md @@ -0,0 +1,68 @@ +# (Linux) Installing the Usage Collection Scheduler + +## Installation Steps: + +1. Open a new Terminal window and navigate to the remorph repository + +2. Copy the unit file to `/etc/systemd/system/`: + +```bash +$ cp ./src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service /etc/systemd/system/ +``` + +3. Copy the timer file to `/etc/systemd/system/`: + +```bash +$ cp ./src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer /etc/systemd/system/ +``` + +4. Reload the systmd process to pick up the new files: + +```bash +$ sudo systemctl daemon-reload +``` + +5. Enable the system timer + +```bash +$ sudo systemctl enable remorph_usage_collection.timer +``` + +6. Start the system timer + +```bash +$ sudo systemctl start remorph_usage_collection.timer +``` + +## Description of the Unit File Elements + +1. **Description**: A description of the service + +2. **After**: Ensures the service starts only after the specified system service is up (network in this case) + +3. **ExecStart**: Specifies the command and arguments (python3 interpreter and the script path) + +4. **WorkingDirectory**: The directory where the script should run + +5. **StandardOutput**: Redirects the standard out to the system logs + +6. **StandardError**: Redirects the error out to the system logs + +7. **Restart**: Restarts the script if the system reboots + +8. **User**: Runs the script as the specified user + +9. **WantedBy**: Ensures the service starts when the system reaches multi-user mode + + +## Description of the Timer File Elements + +1. **Description**: A description of the timer + +2. **Persistent** : Ensures the script runs after reboots + +3. **OnBootSec**: Ensures the script runs 1 min after bootin + +4. **OnUnitActiveSec**: Runs the script every 15 minutes + +5. **WantedBy**: Ensures the timer is activated when the system is running timers diff --git a/docs/assessment/scheduler/macos/README.md b/docs/assessment/scheduler/macos/README.md new file mode 100644 index 0000000000..ecbd65ad45 --- /dev/null +++ b/docs/assessment/scheduler/macos/README.md @@ -0,0 +1,37 @@ +# (MacOS) Installing the Usage Collection Scheduler + +## Installation Steps: + +1. Open a new Terminal window and navigate to the remorph repository + +2. Copy the plist file to `~/Library/LaunchAgents/` for a user-specific task or `/Library/LaunchDaemons/` for a system-wide task: + +```bash +$ cp ./src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist ~/Library/LaunchAgents/ +``` + +3. Next, load the process by executing the following command: + +```bash +$ launchctl load ~/Library/LaunchAgents/com.remorph.usagecollection.plist +``` + +4. Grant the usage collection script with execution permissions: + +```bash +$ chmod +x ./src/databricks/labs/remorph/assessments/scheduler/usage_collector.py +``` + +## Description of the `plist` Elements + +1. **Label**: A unique identifier for the job + +2. **ProgramArguments**: Specifies the command and arguments (python3 interpreter and the script path) + +3. **StartInterval**: Runs the script every 900 seconds (15 minutes) + +4. **RunAtLoad**: Ensures the script runs immediately after loading + +5. **StandardOutPath**: Logs output to `~/Downloads/remorph/scheduler/stdout` for debugging + +6. **StandardErrorPath**: Logs errors to `~/Downloads/remorph/scheduler/stderr` for debugging diff --git a/pyproject.toml b/pyproject.toml index 2b89c5dc93..317eebdf22 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,7 @@ dependencies = [ "mypy~=1.10.0", "numpy==1.26.4", "pandas==1.4.1", + "snowflake-sqlalchemy>=1.7.3" ] [project.entry-points.databricks] diff --git a/src/databricks/labs/remorph/assessments/duckdb_manager.py b/src/databricks/labs/remorph/assessments/duckdb_manager.py new file mode 100644 index 0000000000..0406bc0058 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/duckdb_manager.py @@ -0,0 +1,27 @@ +import duckdb + + +class DuckDBManager: + """Manages a connection to a single DuckDB database instance.""" + + _instance = None + _connection = None + + def __new__(cls, db_path: str = "remorph_profiler_db.duckdb"): + if cls._instance is None: + cls._instance = super(DuckDBManager, cls).__new__(cls) + cls._instance._init_connection(db_path) + return cls._instance + + def _init_connection(self, db_path: str): + self._db_path = db_path + self._connection = duckdb.connect(database=db_path) + + def get_connection(self): + return self._connection + + def close_connection(self): + if self._connection is not None: + self._connection.close() + DuckDBManager._instance = None + DuckDBManager._connection = None diff --git a/src/databricks/labs/remorph/assessments/pipeline.py b/src/databricks/labs/remorph/assessments/pipeline.py index 03f5ff9bbf..f9488ce78c 100644 --- a/src/databricks/labs/remorph/assessments/pipeline.py +++ b/src/databricks/labs/remorph/assessments/pipeline.py @@ -5,6 +5,7 @@ import subprocess import yaml import duckdb +from enum import StrEnum from databricks.labs.remorph.connections.credential_manager import cred_file @@ -17,6 +18,12 @@ DB_NAME = "profiler_extract.db" +class StepExecutionStatus(StrEnum): + COMPLETE = "COMPLETE" + ERROR = "ERROR" + SKIPPED = "SKIPPED" + + class PipelineClass: def __init__(self, config: PipelineConfig, executor: DatabaseManager): self.config = config @@ -28,16 +35,26 @@ def execute(self): for step in self.config.steps: if step.flag == "active": logging.debug(f"Executing step: {step.name}") - self._execute_step(step) + self.execute_step(step) logging.info("Pipeline execution completed") - def _execute_step(self, step: Step): - if step.type == "sql": - self._execute_sql_step(step) - elif step.type == "python": - self._execute_python_step(step) - else: - logging.error(f"Unsupported step type: {step.type}") + def execute_step(self, step: Step) -> StrEnum: + try: + if step.type == "sql": + logging.info(f"Executing SQL step {step.name}") + self._execute_sql_step(step) + status = StepExecutionStatus.COMPLETE + elif step.type == "python": + logging.info(f"Executing Python step {step.name}") + self._execute_python_step(step) + status = StepExecutionStatus.COMPLETE + else: + logging.error(f"Unsupported step type: {step.type}") + raise RuntimeError(f"Unsupported step type: {step.type}") + except RuntimeError as e: + logging.error(e) + status = StepExecutionStatus.ERROR + return status def _execute_sql_step(self, step: Step): logging.debug(f"Reading query from file: {step.extract_source}") @@ -86,7 +103,8 @@ def _save_to_db(self, result, step_name: str, mode: str, batch_size: int = 1000) self._create_dir(self.db_path_prefix) conn = duckdb.connect(str(self.db_path_prefix) + '/' + DB_NAME) columns = result.keys() - # TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable + # TODO: Add support for figuring out data types from SQLALCHEMY + # result object result.cursor.description is not reliable schema = ' STRING, '.join(columns) + ' STRING' # Handle write modes diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service new file mode 100644 index 0000000000..4785c4c78b --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service @@ -0,0 +1,14 @@ +[Unit] +Description=Remorph usage collection service +After=network.target + +[Service] +ExecStart=/usr/bin/python3 ~/Downloads/remorph/scheduler/usage_collector.py +WorkingDirectory=~/Downloads/remorph/scheduler/ +StandardOutput=journal +StandardError=journal +Restart=always +User=root + +[Install] +WantedBy=multi-user.target diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer new file mode 100644 index 0000000000..da58c12cd5 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer @@ -0,0 +1,14 @@ +[Unit] +Description=Remorph usage collection every 15 minutes +After=network.target + +[Service] +ExecStart=/usr/bin/python3 ~/Downloads/remorph/scheduler/usage_collector.py +WorkingDirectory=~/Downloads/remorph/scheduler/ +StandardOutput=journal +StandardError=journal +Restart=always +User=root + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist b/src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist new file mode 100644 index 0000000000..dfedf1d702 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist @@ -0,0 +1,27 @@ + + + + + Label + com.remorph.usagecollection + + ProgramArguments + + /Library/Frameworks/Python.framework/Versions/3.11/bin/python3 + ~/Downloads/remorph/scheduler/usage_collector.py + + + StartInterval + 900 + + RunAtLoad + + + StandardOutPath + ~/Downloads/remorph/scheduler/stdout/usagecollection.log + + StandardErrorPath + ~/Downloads/remorph/scheduler/stderr/usagecollection_error.log + + + diff --git a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py new file mode 100644 index 0000000000..e255955597 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py @@ -0,0 +1,152 @@ +import time +import logging +from dataclasses import dataclass + +from datetime import datetime +from enum import StrEnum + +from databricks.labs.remorph.assessments.duckdb_manager import DuckDBManager +from databricks.labs.remorph.assessments.pipeline import PipelineClass, StepExecutionStatus +from databricks.labs.remorph.assessments.profiler_config import Step + + +def get_hours_since_last_run(last_run_datetime: datetime) -> int: + elapsed_time = datetime.now() - last_run_datetime + elapsed_seconds = elapsed_time.total_seconds() + return int(divmod(elapsed_seconds, 3600)[0]) + + +def get_days_since_last_run(last_run_datetime: datetime) -> int: + elapsed_time = datetime.now() - last_run_datetime + elapsed_seconds = elapsed_time.total_seconds() + return int(divmod(elapsed_seconds, 86400)[0]) + + +class ScheduledFrequency(StrEnum): + ONCE = "ONCE" + DAILY = "DAILY" + WEEKLY = "WEEKLY" + + +@dataclass +class PollingStatus: + polling_dt: datetime + pipeline_name: str + step_name: str + status: str + + +class PipelineScheduler: + """A scheduler that executes Pipeline steps according to predefined schedules.""" + + def __init__(self, pipelines: list[PipelineClass], + polling_interval_secs: int = 5): + + self.duckdb_connection = DuckDBManager().get_connection() + self.polling_interval = polling_interval_secs + self.pipelines: list[PipelineClass] = pipelines + + # Create a table in DuckDB for maintaining pipeline state + self._init_db() + + def _init_db(self): + """Initializes a DuckDB database to store pipeline step execution state.""" + logging.info("Initializing pipeline state database...") + self.duckdb_connection.execute(query=""" + CREATE TABLE IF NOT EXISTS pipeline_step_state ( + step_name TEXT PRIMARY KEY, + pipeline_name TEXT, + last_run TIMESTAMP, + status TEXT + ) + """) + logging.info("DuckDB state table is ready to go!") + + def _get_last_run_time(self, pipeline_name: str, step_name: str) -> (datetime | None, str | None): + """Fetches the last execution time of a pipeline step from the database. """ + full_step_name = f"{pipeline_name}__{step_name}" + result = self.duckdb_connection.execute( + query="SELECT last_run, status FROM pipeline_step_state WHERE step_name = ?", + parameters=[full_step_name] + ).fetchone() + if result is not None: + last_run_time_str = result[0] + return last_run_time_str, result[1] + else: + return None, None + + def _record_run_time(self, pipeline_name: str, step_name: str, status: str = StepExecutionStatus.COMPLETE.value): + """Records the latest execution time of a pipeline step.""" + now = datetime.now() + full_step_name = f"{pipeline_name}__{step_name}" + self.duckdb_connection.execute(query=""" + INSERT INTO pipeline_step_state (step_name, pipeline_name, last_run, status) + VALUES (?, ?, ?, ?) + ON CONFLICT(step_name) + DO UPDATE + SET pipeline_name = excluded.pipeline_name, last_run = excluded.last_run, status = excluded.status + """, parameters=[full_step_name, pipeline_name, now, status]) + + def _should_run(self, pipeline_name: str, step: Step) -> bool: + """Determines if a pipeline step should run based on its schedule.""" + scheduled_frequency = step.frequency + last_run_time, status = self._get_last_run_time(pipeline_name, step.name) + if last_run_time is None or status == StepExecutionStatus.ERROR.value: + # First time running the Step + should_run = True + logging.info(f"First time running the step: '{step.name}'.") + else: + # The Step has been run once already + if scheduled_frequency == ScheduledFrequency.ONCE.value: + logging.info(f"Step '{step.name}' has already been run once. Skipping.") + should_run = False + # Check if it's been >= 24 hours since the last run + elif (scheduled_frequency == ScheduledFrequency.DAILY.value and + get_hours_since_last_run(last_run_time) >= 24): + should_run = True + logging.info(f"Running daily step '{step.name}' now.") + # Check if it's been >= 7 days since the last run + elif (scheduled_frequency == ScheduledFrequency.WEEKLY.value and + get_days_since_last_run(last_run_time) >= 7): + should_run = True + logging.info(f"Running weekly step '{step.name}' now.") + else: + # None of the triggering frequency conditions have been met + should_run = False + return should_run + + def _run_step(self, pipeline: PipelineClass, step: Step) -> str: + """Executes a pipeline step if it's time to run.""" + if self._should_run(pipeline.config.name, step): + status = str(pipeline.execute_step(step).value) + self._record_run_time(pipeline.config.name, step.name, status) + else: + status = StepExecutionStatus.SKIPPED.value + return status + + def run(self, max_num_cycles: int = None) -> list[PollingStatus]: + """Create an infinite loop over the pipeline steps""" + logging.info("PipelineScheduler has started...") + cycle_counter = 0 + polling_status = [] + while cycle_counter < max_num_cycles if max_num_cycles is not None else True: + + # Loop through the list of scheduled pipelines + # TODO: Parallelize this in the future to be more efficient + for pipeline in self.pipelines: + pipeline_steps = pipeline.config.steps + for pipeline_step in pipeline_steps: + # Inspect the execution frequency of the Step + # Possible frequencies include: "once", "daily", "weekly" (see `ScheduledFrequency` enum) + logging.info(f"Evaluating scheduling step '{pipeline_step.name}'.") + step_exec_status = self._run_step(pipeline, pipeline_step) + execution_status = PollingStatus(datetime.now(), pipeline.config.name, + pipeline_step.name, step_exec_status) + polling_status.append(execution_status) + + # Wait a bit before polling status + time.sleep(self.polling_interval) + if max_num_cycles is not None: + cycle_counter += 1 + + return polling_status diff --git a/src/databricks/labs/remorph/connections/database_manager.py b/src/databricks/labs/remorph/connections/database_manager.py index df9d678bb2..ead4089865 100644 --- a/src/databricks/labs/remorph/connections/database_manager.py +++ b/src/databricks/labs/remorph/connections/database_manager.py @@ -4,6 +4,7 @@ from sqlalchemy import create_engine from sqlalchemy.engine import Engine, Result, URL +from snowflake.sqlalchemy import URL as SnowflakeURL from sqlalchemy.orm import sessionmaker from sqlalchemy import text from sqlalchemy.exc import OperationalError @@ -44,6 +45,7 @@ def _create_connector(db_type: str, config: dict[str, Any]) -> DatabaseConnector "mssql": MSSQLConnector, "tsql": MSSQLConnector, "synapse": MSSQLConnector, + "postgres": PostgresConnector, } connector_class = connectors.get(db_type.lower()) @@ -56,7 +58,23 @@ def _create_connector(db_type: str, config: dict[str, Any]) -> DatabaseConnector class SnowflakeConnector(_BaseConnector): def _connect(self) -> Engine: - raise NotImplementedError("Snowflake connector not implemented") + # Snowflake does not follow a traditional SQL Alchemy connection string URL; they have their own. + # e.g., connection_string = (f"snowflake://{user}:{pw}@{account}") + # Query parameters are not currently supported (as of 1.7.3 release) + # https://docs.snowflake.com/en/developer-guide/python-connector/sqlalchemy#required-parameters + sqlalchemy_driver = "snowflake" + connection_string = SnowflakeURL( + drivername=sqlalchemy_driver, + account=self.config["account"], + user=self.config["user"], + password=self.config["password"], + database=self.config["database"], + schema=self.config["schema"], + warehouse=self.config["warehouse"], + role=self.config["role"], + timezone=self.config["timezone"] + ) + return create_engine(connection_string) class MSSQLConnector(_BaseConnector): @@ -78,6 +96,27 @@ def _connect(self) -> Engine: return create_engine(connection_string) +class PostgresConnector(_BaseConnector): + def _connect(self) -> Engine: + # Pull out additional query params from config + query_params = {} + for key, value in self.config.items(): + if key not in ["user", "password", "server", "database", "port"]: + query_params[key] = value + # Build the connection string to database + sqlalchemy_driver = "postgresql" + connection_string = URL.create( + drivername=sqlalchemy_driver, + username=self.config["user"], + password=self.config["password"], + host=self.config["server"], + port=self.config.get("port", 5432), + database=self.config["database"], + query=query_params, + ) + return create_engine(connection_string) + + class DatabaseManager: def __init__(self, db_type: str, config: dict[str, Any]): self.connector = _create_connector(db_type, config) diff --git a/tests/integration/assessments/test_pipeline_scheduler.py b/tests/integration/assessments/test_pipeline_scheduler.py new file mode 100644 index 0000000000..b13928c3b5 --- /dev/null +++ b/tests/integration/assessments/test_pipeline_scheduler.py @@ -0,0 +1,51 @@ +from pathlib import Path +import pytest + +from datetime import datetime + +from databricks.labs.remorph.assessments.pipeline import PipelineClass +from databricks.labs.remorph.assessments.scheduler.pipeline_scheduler import PipelineScheduler, \ + get_hours_since_last_run, get_days_since_last_run +from ..connections.helpers import get_db_manager + + +@pytest.fixture() +def extractor(): + return get_db_manager("remorph", "postgres") + + +@pytest.fixture(scope="module") +def pipeline_config(): + prefix = Path(__file__).parent + config_path = f"{prefix}/../../resources/assessments/scheduler/postgres_pipeline_config.yml" + config = PipelineClass.load_config_from_yaml(config_path) + + for step in config.steps: + step.extract_source = f"{prefix}/../../{step.extract_source}" + return config + + +def test_hours_since_last_run(): + date_start_str = "2025-04-08 10:30:00" + date_format = "%Y-%m-%d %H:%M:%S" + datetime_start = datetime.strptime(date_start_str, date_format) + actual_hours = get_hours_since_last_run(datetime_start) + expected_hours = divmod((datetime.now() - datetime_start).total_seconds(), 3600)[0] + assert actual_hours == expected_hours, "The calculated hours since last run does not match the expected value." + + +def test_days_since_last_run(): + date_start_str = "2025-04-08 10:30:00" + date_format = "%Y-%m-%d %H:%M:%S" + datetime_start = datetime.strptime(date_start_str, date_format) + actual_days = get_days_since_last_run(datetime_start) + expected_days = divmod((datetime.now() - datetime_start).total_seconds(), 86400)[0] + assert actual_days == expected_days, "The calculated days since last run does not match the expected value." + + +def test_pipeline_scheduler(pipeline_config, extractor): + simple_pipeline = PipelineClass(config=pipeline_config, executor=extractor) + pipelines = [simple_pipeline] + scheduler = PipelineScheduler(pipelines) + status = scheduler.run(max_num_cycles=3) + assert len(status) == 3, "The actual step execution statuses did not match the expected amount." diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index e1cc5a8fdd..bdc448acfe 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -65,6 +65,13 @@ def mock_credentials(): 'database': 'TEST_TSQL_JDBC', 'driver': 'ODBC Driver 18 for SQL Server', }, + 'postgres': { + 'user': 'TEST_PG_USER', + 'password': 'TEST_PG_PASS', + 'server': 'TEST_PG_JDBC', + 'database': 'TEST_PG_DB', + 'driver': 'PostgreSQL Unicode', + }, }, ): yield diff --git a/tests/resources/assessments/scheduler/postgres_pipeline_config.yml b/tests/resources/assessments/scheduler/postgres_pipeline_config.yml new file mode 100644 index 0000000000..4584bc7f51 --- /dev/null +++ b/tests/resources/assessments/scheduler/postgres_pipeline_config.yml @@ -0,0 +1,10 @@ +name: PostgresPipeline +version: "1.0" +extract_folder: /tmp/extracts/ +steps: + - name: usage + type: sql + extract_source: resources/assessments/target/postgres/usage.sql + mode: overwrite + frequency: weekly + flag: active