diff --git a/docs/remorph/docs/profiler/index.mdx b/docs/remorph/docs/profiler/index.mdx new file mode 100644 index 0000000000..7743981d8f --- /dev/null +++ b/docs/remorph/docs/profiler/index.mdx @@ -0,0 +1,30 @@ +--- +sidebar_position: 1 +--- + +## What is the Remorph Profiler? + +- The profiler collects usage and performance metrics from a source warehouse +- It leverages built-in scheduling components of the host operating system to execute usage collection queries + +## Why is the profiler usage collection needed? + +- Some data warehouses do not preserve query history for longer than a few days or are limited to only a few rows. As an example, Amazon Redshift only preserves usage history for around 2-5 days +- Having adequate usage history is critical in providing an _accurate_ Total Cost of Ownership (TCO) estimate +- Having too little history could produce an inaccurate representation of data warehouse query patterns and system utilization + +## Supported operating systems + +The profiler usage collection can be installed on the following operating systems: + +1. MacOS +2. Linux +3. [Windows](./windows/windows_installation.mdx) + +## How often does the profiler run? + +The profiler system task is scheduled to run every 15 minutes. When running, the system task checks the local state to see if usage collection has been run for the day and if there are any gaps in the usage history that should be reconciled. + +## Automatic restart + +The profiler usage collection service utilizes the host operating system to run as a background process. As a result, the process will automatically resume processing after a system restart; no intervention is needed during a host system restart. The profiler will automatically detect usage history gaps and proactively backfill when possible. diff --git a/docs/remorph/docs/profiler/windows/windows_installation.mdx b/docs/remorph/docs/profiler/windows/windows_installation.mdx new file mode 100644 index 0000000000..5996d1b537 --- /dev/null +++ b/docs/remorph/docs/profiler/windows/windows_installation.mdx @@ -0,0 +1,47 @@ +--- +sidebar_position: 1 +title: Windows Installation +--- + +# (Windows) Installing the Usage Collection Scheduler + +## Prerequisites + +The following requirements must be met before running the profiler scheduler installer: +1. Python 3.10 (or higher) +3. Administrator access to the Windows Task Manager +4. Cloned copy of the `remorph` project + +## Running the installer + +The profiler installer will create and register a new Windows system task under the current user's identity: + +- **Remorph Usage Collection** - A system task that schedules the usage collection pipeline as a system service + +### Installation steps: + +1. Open a new CMD window using Administrator privileges and navigate to the profiler directory: + +```bash +> cd $HOME\\.databricks\\labs\\remorph\\assessments\\profiler\\ +``` + +2. Run the installer script: + +```bash +> python install.py +``` + +### Un-installation steps: + +1. Open a new CMD window using Administrator privileges and navigate to the profiler directory: + +```bash +> cd $HOME\\.databricks\\labs\\remorph\\assessments\\profiler\\ +``` + +2. Run the uninstaller script: + +```bash +> python uninstall.py +``` diff --git a/src/databricks/labs/remorph/assessments/__init__.py b/src/databricks/labs/remorph/assessments/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/databricks/labs/remorph/assessments/duckdb_manager.py b/src/databricks/labs/remorph/assessments/duckdb_manager.py new file mode 100644 index 0000000000..0406bc0058 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/duckdb_manager.py @@ -0,0 +1,27 @@ +import duckdb + + +class DuckDBManager: + """Manages a connection to a single DuckDB database instance.""" + + _instance = None + _connection = None + + def __new__(cls, db_path: str = "remorph_profiler_db.duckdb"): + if cls._instance is None: + cls._instance = super(DuckDBManager, cls).__new__(cls) + cls._instance._init_connection(db_path) + return cls._instance + + def _init_connection(self, db_path: str): + self._db_path = db_path + self._connection = duckdb.connect(database=db_path) + + def get_connection(self): + return self._connection + + def close_connection(self): + if self._connection is not None: + self._connection.close() + DuckDBManager._instance = None + DuckDBManager._connection = None diff --git a/src/databricks/labs/remorph/assessments/pipeline.py b/src/databricks/labs/remorph/assessments/pipeline.py index 03f5ff9bbf..57c2eb92dc 100644 --- a/src/databricks/labs/remorph/assessments/pipeline.py +++ b/src/databricks/labs/remorph/assessments/pipeline.py @@ -3,6 +3,7 @@ import json import logging import subprocess +from enum import Enum import yaml import duckdb @@ -17,6 +18,12 @@ DB_NAME = "profiler_extract.db" +class StepExecutionStatus(Enum): + COMPLETE = "COMPLETE" + ERROR = "ERROR" + SKIPPED = "SKIPPED" + + class PipelineClass: def __init__(self, config: PipelineConfig, executor: DatabaseManager): self.config = config @@ -28,16 +35,17 @@ def execute(self): for step in self.config.steps: if step.flag == "active": logging.debug(f"Executing step: {step.name}") - self._execute_step(step) + self.execute_step(step) logging.info("Pipeline execution completed") - def _execute_step(self, step: Step): + def execute_step(self, step: Step): if step.type == "sql": self._execute_sql_step(step) elif step.type == "python": self._execute_python_step(step) else: logging.error(f"Unsupported step type: {step.type}") + return StepExecutionStatus.COMPLETE def _execute_sql_step(self, step: Step): logging.debug(f"Reading query from file: {step.extract_source}") diff --git a/src/databricks/labs/remorph/assessments/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/pipeline_scheduler.py new file mode 100644 index 0000000000..f9530c96d1 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/pipeline_scheduler.py @@ -0,0 +1,160 @@ +import time +import logging +from dataclasses import dataclass +from enum import Enum + +from datetime import datetime, timezone +from databricks.labs.remorph.assessments.duckdb_manager import DuckDBManager +from databricks.labs.remorph.assessments.pipeline import PipelineClass, StepExecutionStatus +from databricks.labs.remorph.assessments.profiler_config import Step + + +def get_hours_since_last_run(last_run_datetime: datetime) -> int: + """Returns the number of full hours that have elapsed since the given last run.""" + elapsed_time = datetime.now(timezone.utc) - last_run_datetime + elapsed_seconds = elapsed_time.total_seconds() + return round(elapsed_seconds // 3600) + + +def get_days_since_last_run(last_run_datetime: datetime) -> int: + """Returns the number of full days that have elapsed since the given last run.""" + elapsed_time = datetime.now(timezone.utc) - last_run_datetime + elapsed_seconds = elapsed_time.total_seconds() + return round(elapsed_seconds // 86400) + + +class ScheduledFrequency(Enum): + ONCE = "ONCE" + DAILY = "DAILY" + WEEKLY = "WEEKLY" + + +@dataclass +class PollingStatus: + polling_dt: datetime + pipeline_name: str + step_name: str + status: str + + +class PipelineScheduler: + """ + Executes a Pipeline's steps according to predefined Step schedules. + """ + + def __init__(self, pipelines: list[PipelineClass], polling_interval_secs: int = 5): + + self.duckdb_connection = DuckDBManager().get_connection() + self.polling_interval = polling_interval_secs + self.pipelines: list[PipelineClass] = pipelines + + # Create a table in DuckDB for maintaining pipeline state + self._init_db() + + def _init_db(self): + """Initializes a DuckDB database to store pipeline step execution state.""" + logging.info("Initializing pipeline state database...") + self.duckdb_connection.execute( + query=""" + CREATE TABLE IF NOT EXISTS pipeline_step_state ( + step_name TEXT, + pipeline_name TEXT, + last_run TIMESTAMPTZ, + status TEXT, + PRIMARY KEY (pipeline_name, step_name) + ) + """ + ) + logging.info("DuckDB state table is initialized.") + + def _get_last_run_time(self, pipeline_name: str, step_name: str) -> tuple[datetime | None, str | None]: + """Fetches the last execution time of a pipeline step from the database.""" + full_step_name = f"{pipeline_name}__{step_name}" + last_run = None + status = None + result = self.duckdb_connection.execute( + query="SELECT last_run, status FROM pipeline_step_state WHERE step_name = ?", parameters=[full_step_name] + ).fetchone() + if result is not None: + last_run, status = result + return last_run, status + + def _record_run_time(self, pipeline_name: str, step_name: str, status: str = StepExecutionStatus.COMPLETE.value): + """Records the latest execution time of a pipeline step.""" + now = datetime.now(timezone.utc) + full_step_name = f"{pipeline_name}__{step_name}" + self.duckdb_connection.execute( + query=""" + INSERT INTO pipeline_step_state (step_name, pipeline_name, last_run, status) + VALUES (?, ?, ?, ?) + ON CONFLICT(pipeline_name, step_name) + DO UPDATE SET + last_run = EXCLUDED.last_run, + status = EXCLUDED.status + """, + parameters=[full_step_name, pipeline_name, now, status], + ) + + def _should_run(self, pipeline_name: str, step: Step) -> bool: + """Determines if a pipeline step should run based on its schedule.""" + scheduled_frequency = step.frequency + last_run_time, status = self._get_last_run_time(pipeline_name, step.name) + if last_run_time is None or status == StepExecutionStatus.ERROR.value: + # First time running the Step + should_run = True + logging.info(f"First time running the step: '{step.name}'.") + # The Step has been run once already + elif scheduled_frequency == ScheduledFrequency.ONCE.value: + logging.info(f"Step '{step.name}' has already been run once. Skipping.") + should_run = False + # Check if it's been >= 24 hours since the last run + elif scheduled_frequency == ScheduledFrequency.DAILY.value and get_hours_since_last_run(last_run_time) >= 24: + should_run = True + logging.info(f"Running daily step '{step.name}' now.") + # Check if it's been >= 7 days since the last run + elif scheduled_frequency == ScheduledFrequency.WEEKLY.value and get_days_since_last_run(last_run_time) >= 7: + should_run = True + logging.info(f"Running weekly step '{step.name}' now.") + else: + # None of the triggering frequency conditions have been met + should_run = False + return should_run + + def _run_step(self, pipeline: PipelineClass, step: Step) -> str: + """Executes a pipeline step if it's time to run.""" + if self._should_run(pipeline.config.name, step): + status = str(pipeline.execute_step(step).value) + self._record_run_time(pipeline.config.name, step.name, status) + else: + status = StepExecutionStatus.SKIPPED.value + return status + + def run(self, num_polling_cycles: int = 1) -> list[PollingStatus]: + """ + Loops over a pipeline's steps and checks if step has been executed + according to its scheduled frequency. + """ + logging.info("PipelineScheduler has started...") + cycle_counter = 0 + polling_status = [] + while cycle_counter < num_polling_cycles: + + # Loop through the list of scheduled pipelines + # TODO: Parallelize this in the future to be more efficient + for pipeline in self.pipelines: + pipeline_steps = pipeline.config.steps + for pipeline_step in pipeline_steps: + # Inspect the execution frequency of the Step + # Possible frequencies include: "once", "daily", "weekly" (see `ScheduledFrequency` enum) + logging.info(f"Evaluating scheduling step '{pipeline_step.name}'.") + step_exec_status = self._run_step(pipeline, pipeline_step) + execution_status = PollingStatus( + datetime.now(timezone.utc), pipeline.config.name, pipeline_step.name, step_exec_status + ) + polling_status.append(execution_status) + + # Wait a bit before polling status + time.sleep(self.polling_interval) + cycle_counter += 1 + + return polling_status diff --git a/src/databricks/labs/remorph/assessments/profiler/__init__.py b/src/databricks/labs/remorph/assessments/profiler/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/databricks/labs/remorph/assessments/profiler/install.py b/src/databricks/labs/remorph/assessments/profiler/install.py new file mode 100644 index 0000000000..024236adbc --- /dev/null +++ b/src/databricks/labs/remorph/assessments/profiler/install.py @@ -0,0 +1,122 @@ +import platform +from subprocess import run, CalledProcessError +from pathlib import Path + +import logging +import getpass + + +USAGE_COLLECTION_TASK_NAME = "Remorph Usage Collection" + + +def schedule_windows_task(): + """ + Schedules the profiler usage collection task to execute using schtasks. + """ + pipeline_scheduler_path = f"{Path.cwd().parent}\\pipeline_scheduler.py" + usage_collection_command = f"python {pipeline_scheduler_path}" + logging.info(f"Scheduling usage collection task: {usage_collection_command}") + try: + run( + [ + "SCHTASKS", + "/CREATE", + "/TN", + USAGE_COLLECTION_TASK_NAME, + "/TR", + usage_collection_command, + "/SC", + "MINUTE", + "/MO", + "15", # TODO: Make this configurable + "/RL", + "HIGHEST", + "/F", + "/RU", + getpass.getuser(), + ], + check=True, + text=True, + ) + except CalledProcessError as e: + error_msg = e.stderr + raise RuntimeError(f"Failed to schedule the usage collection task for Windows: {error_msg}") from e + + +def launch_installer(): + """ + Schedules usage collection task using the hosts system scheduler. + """ + system = platform.system() + logging.info(f"Running profiler installer for: {system}") + if system == "Windows": + try: + schedule_windows_task() + except RuntimeError as e: + logging.error(f"Failed to schedule the usage collection task for Windows: {e}") + raise RuntimeError(f"Failed to schedule the usage collection task for Windows: {e}") from e + else: + raise RuntimeError(f"Unsupported operating system: {system}") + + +def is_systemd_installed() -> bool: + """ + Validates if systemd is installed on the host. + Returns: + bool - True if systemd is installed + False if systemd not found on the system + """ + try: + run(["systemctl", "-version"], capture_output=True, check=True) + return True + except CalledProcessError as cpe: + logging.error(f"Systemd not found on the system: {cpe}") + return False + + +def is_task_scheduler_installed() -> bool: + """ + Validates if the schtasks command is installed on the host. + Returns: + bool - True if schtasks is installed + False if schtasks not found on the system + """ + try: + # Check if the manual page for `schtasks` can be displayed + run(["SCHTASKS", "/?"], capture_output=True, check=True) + return True + except CalledProcessError as cpe: + logging.error(f"Schtasks not found on the system: {cpe}") + return False + + +def validate_system_requirements(): + """ + Validates the system requirements for installing the profiler are met: + 1. The user must be using a Mac, Linux, or Windows OS + 2. Python 3.10+ must be installed + 3. System scheduler must be callable, e.g. systemd, launchmd, schtasks + """ + logging.info("Validating system requirements.") + logging.info("Checking operating system.") + system_info = platform.system().lower() + if platform.system().lower() not in {"darwin", "linux", "windows"}: + raise RuntimeError("Unsupported operating system detected.") + logging.info("Checking Python version.") + python_version_info = [int(v) for v in platform.python_version().split(".")] + if python_version_info[0] != 3 or python_version_info[1] < 10: + raise RuntimeError("Python version must be >= Python 3.10") + logging.info("Checking system scheduler.") + if system_info == "windows" and not is_task_scheduler_installed(): + raise RuntimeError("Schtasks is required to install the remorph profiler.") + + +def main(): + logging.info("Installing the remorph profiler...\n") + validate_system_requirements() + launch_installer() + logging.info("Installation complete.") + + +if __name__ == "__main__": + main() diff --git a/src/databricks/labs/remorph/assessments/profiler/uninstall.py b/src/databricks/labs/remorph/assessments/profiler/uninstall.py new file mode 100644 index 0000000000..802e865220 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/profiler/uninstall.py @@ -0,0 +1,63 @@ +import platform +from subprocess import run, CalledProcessError + +import logging + + +USAGE_COLLECTION_TASK_NAME = "Remorph Usage Collection" + + +def is_windows_task_scheduled(): + """ + Queries the status of the usage collection status. + Returns: + True - if schtasks returns a positive status code + False - if schtasks throws an exception if the task was not found. + """ + try: + logging.info("Checking if task exists.") + run(["SCHTASKS", "/QUERY", "/TN", USAGE_COLLECTION_TASK_NAME], check=True) + return True + except CalledProcessError as e: + error_msg = e.stderr + logging.error(f"Failed to query scheduled usage collection task: {error_msg}") + return False + + +def delete_windows_task(): + """ + Removes the usage collection task from schtasks + """ + logging.info("Deleting scheduled task from the task manager.") + if is_windows_task_scheduled(): + try: + run(["SCHTASKS", "/DELETE", "/TN", USAGE_COLLECTION_TASK_NAME, "/F"], check=True) + except CalledProcessError as e: + error_msg = e.stderr + raise RuntimeError(f"Failed to delete the usage collection task for Windows: {error_msg}") from e + + +def launch_uninstaller(): + """ + Removes the scheduled usage collection task from the system task scheduler. + """ + system = platform.system() + logging.info(f"Running remorph profiler uninstaller for: {system}") + if system == "Windows": + try: + delete_windows_task() + except RuntimeError as e: + logging.error(f"Failed to delete the usage collection task for Windows: {e}") + raise RuntimeError(f'Failed to delete the usage collection task for Windows: {e}') from e + else: + raise RuntimeError(f"Unsupported operating system: {system}") + + +def main(): + logging.info("Uninstalling the remorph profiler...\n") + launch_uninstaller() + logging.info("Uninstallation complete.") + + +if __name__ == "__main__": + main() diff --git a/tests/integration/assessments/test_pipeline_scheduler.py b/tests/integration/assessments/test_pipeline_scheduler.py new file mode 100644 index 0000000000..64a14e672e --- /dev/null +++ b/tests/integration/assessments/test_pipeline_scheduler.py @@ -0,0 +1,30 @@ +from pathlib import Path +import pytest + +from databricks.labs.remorph.assessments.pipeline import PipelineClass +from databricks.labs.remorph.assessments.pipeline_scheduler import PipelineScheduler +from ..connections.helpers import get_db_manager + + +@pytest.fixture() +def extractor(mock_credentials): + return get_db_manager("remorph", "mssql") + + +@pytest.fixture(scope="module") +def pipeline_config(): + prefix = Path(__file__).parent + config_path = f"{prefix}/../../resources/assessments/simple_python_pipeline_config.yml" + config = PipelineClass.load_config_from_yaml(config_path) + + for step in config.steps: + step.extract_source = f"{prefix}/../../{step.extract_source}" + return config + + +def test_pipeline_scheduler(pipeline_config, extractor): + simple_pipeline = PipelineClass(config=pipeline_config, executor=extractor) + pipelines = [simple_pipeline] + scheduler = PipelineScheduler(pipelines) + status = scheduler.run(num_polling_cycles=3) + assert len(status) == 3, "The actual step execution statuses did not match the expected amount." diff --git a/tests/resources/assessments/hello_world.py b/tests/resources/assessments/hello_world.py new file mode 100644 index 0000000000..f3815a86ce --- /dev/null +++ b/tests/resources/assessments/hello_world.py @@ -0,0 +1,6 @@ +def hello_world(): + print("Hello, World!") + + +if __name__ == '__main__': + hello_world() diff --git a/tests/resources/assessments/simple_python_pipeline_config.yml b/tests/resources/assessments/simple_python_pipeline_config.yml new file mode 100644 index 0000000000..b12ec2073a --- /dev/null +++ b/tests/resources/assessments/simple_python_pipeline_config.yml @@ -0,0 +1,10 @@ +name: "Simple Python Pipeline" +version: "1.0" +extract_folder: "tests/resources/assessments" +steps: + - name: hello_world_python_step + type: python + flag: active + extract_source: resources/assessments/hello_world.py + mode: overwrite + frequency: daily