From 8cee97d8ba5e13687dd6e26a5305832acda279bf Mon Sep 17 00:00:00 2001 From: Will Girten Date: Wed, 23 Apr 2025 18:38:52 -0500 Subject: [PATCH 01/22] Add PipelineScheduler class and MacOS and Linux system files --- docs/remorph/docs/scheduler/index.mdx | 28 ++++ .../scheduler/linux/linux_installation.mdx | 73 +++++++++ .../scheduler/macos/macos_installation.mdx | 42 +++++ .../remorph/assessments/duckdb_manager.py | 27 ++++ .../labs/remorph/assessments/pipeline.py | 36 +++-- .../linux/remorph_usage_collection.service | 14 ++ .../linux/remorph_usage_collection.timer | 14 ++ .../macos/com.remorph.usagecollection.plist | 27 ++++ .../scheduler/pipeline_scheduler.py | 152 ++++++++++++++++++ .../assessments/test_pipeline_scheduler.py | 51 ++++++ tests/integration/conftest.py | 7 + .../scheduler/postgres_pipeline_config.yml | 10 ++ 12 files changed, 472 insertions(+), 9 deletions(-) create mode 100644 docs/remorph/docs/scheduler/index.mdx create mode 100644 docs/remorph/docs/scheduler/linux/linux_installation.mdx create mode 100644 docs/remorph/docs/scheduler/macos/macos_installation.mdx create mode 100644 src/databricks/labs/remorph/assessments/duckdb_manager.py create mode 100644 src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service create mode 100644 src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer create mode 100644 src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist create mode 100644 src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py create mode 100644 tests/integration/assessments/test_pipeline_scheduler.py create mode 100644 tests/resources/assessments/scheduler/postgres_pipeline_config.yml diff --git a/docs/remorph/docs/scheduler/index.mdx b/docs/remorph/docs/scheduler/index.mdx new file mode 100644 index 0000000000..f76810b290 --- /dev/null +++ b/docs/remorph/docs/scheduler/index.mdx @@ -0,0 +1,28 @@ +--- +sidebar_position: 1 +--- + +# Installing the Usage Collection Scheduler + +## Why do we need a usage collection scheduler? +- Some data warehouses do not preserve query history for longer than a few days or are limited to only a few rows. As an example, Amazon Redshift only preserves usage history for around 2-5 days +- Having adequate usage history is critical to provide an accurate Total Cost of Ownership (TCO) estimate +- Having too little history could produce an inaccurate representation of data warehouse query patterns and system utilization + +## Supported operating systems +The usage collection scheduler can be installed on the following operating systems: + +1. MacOS +2. Linux + +## Local vs. Remote Deployments + +The usage collection scheduler can be executed in the following deployments: + +1. **Local scheduler** - Scheduler is installed on a local laptop or workstation, provided that the local system can connect to a bastion host and forward connectivity to the data warehouse +2. **Remote scheduler** - Scheduler is installed directly on a bastion host in a separate subnet or a compute instance deployed within the data warehouse's subnet + + +## Automatic restart + +The usage collection scheduler utilizes the host operating system to run as a background process. As a result, the process will automatically resume processing after a system restart; no intervention is needed during a host system restart. The scheduler will automatically detect usage history gaps and proactively backfill when possible. diff --git a/docs/remorph/docs/scheduler/linux/linux_installation.mdx b/docs/remorph/docs/scheduler/linux/linux_installation.mdx new file mode 100644 index 0000000000..fbf1e23d4e --- /dev/null +++ b/docs/remorph/docs/scheduler/linux/linux_installation.mdx @@ -0,0 +1,73 @@ +--- +sidebar_position: 2 +title: Linux Installation +--- + +# (Linux) Installing the Usage Collection Scheduler + +## Installation Steps: + +1. Open a new Terminal window and navigate to the remorph repository + +2. Copy the unit file to `/etc/systemd/system/`: + +```bash +$ cp ./src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service /etc/systemd/system/ +``` + +3. Copy the timer file to `/etc/systemd/system/`: + +```bash +$ cp ./src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer /etc/systemd/system/ +``` + +4. Reload the systmd process to pick up the new files: + +```bash +$ sudo systemctl daemon-reload +``` + +5. Enable the system timer + +```bash +$ sudo systemctl enable remorph_usage_collection.timer +``` + +6. Start the system timer + +```bash +$ sudo systemctl start remorph_usage_collection.timer +``` + +## Description of the Unit File Elements + +1. **Description**: A description of the service + +2. **After**: Ensures the service starts only after the specified system service is up (network in this case) + +3. **ExecStart**: Specifies the command and arguments (python3 interpreter and the script path) + +4. **WorkingDirectory**: The directory where the script should run + +5. **StandardOutput**: Redirects the standard out to the system logs + +6. **StandardError**: Redirects the error out to the system logs + +7. **Restart**: Restarts the script if the system reboots + +8. **User**: Runs the script as the specified user + +9. **WantedBy**: Ensures the service starts when the system reaches multi-user mode + + +## Description of the Timer File Elements + +1. **Description**: A description of the timer + +2. **Persistent** : Ensures the script runs after reboots + +3. **OnBootSec**: Ensures the script runs 1 min after bootin + +4. **OnUnitActiveSec**: Runs the script every 15 minutes + +5. **WantedBy**: Ensures the timer is activated when the system is running timers diff --git a/docs/remorph/docs/scheduler/macos/macos_installation.mdx b/docs/remorph/docs/scheduler/macos/macos_installation.mdx new file mode 100644 index 0000000000..d7f30b87ae --- /dev/null +++ b/docs/remorph/docs/scheduler/macos/macos_installation.mdx @@ -0,0 +1,42 @@ +--- +sidebar_position: 1 +title: MacOS Installation +--- + +# (MacOS) Installing the Usage Collection Scheduler + +## Installation Steps: + +1. Open a new Terminal window and navigate to the remorph repository + +2. Copy the plist file to `~/Library/LaunchAgents/` for a user-specific task or `/Library/LaunchDaemons/` for a system-wide task: + +```bash +$ cp ./src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist ~/Library/LaunchAgents/ +``` + +3. Next, load the process by executing the following command: + +```bash +$ launchctl load ~/Library/LaunchAgents/com.remorph.usagecollection.plist +``` + +4. Grant the usage collection script with execution permissions: + +```bash +$ chmod +x ./src/databricks/labs/remorph/assessments/scheduler/usage_collector.py +``` + +## Description of the `plist` Elements + +1. **Label**: A unique identifier for the job + +2. **ProgramArguments**: Specifies the command and arguments (python3 interpreter and the script path) + +3. **StartInterval**: Runs the script every 900 seconds (15 minutes) + +4. **RunAtLoad**: Ensures the script runs immediately after loading + +5. **StandardOutPath**: Logs output to `~/Downloads/remorph/scheduler/stdout` for debugging + +6. **StandardErrorPath**: Logs errors to `~/Downloads/remorph/scheduler/stderr` for debugging diff --git a/src/databricks/labs/remorph/assessments/duckdb_manager.py b/src/databricks/labs/remorph/assessments/duckdb_manager.py new file mode 100644 index 0000000000..0406bc0058 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/duckdb_manager.py @@ -0,0 +1,27 @@ +import duckdb + + +class DuckDBManager: + """Manages a connection to a single DuckDB database instance.""" + + _instance = None + _connection = None + + def __new__(cls, db_path: str = "remorph_profiler_db.duckdb"): + if cls._instance is None: + cls._instance = super(DuckDBManager, cls).__new__(cls) + cls._instance._init_connection(db_path) + return cls._instance + + def _init_connection(self, db_path: str): + self._db_path = db_path + self._connection = duckdb.connect(database=db_path) + + def get_connection(self): + return self._connection + + def close_connection(self): + if self._connection is not None: + self._connection.close() + DuckDBManager._instance = None + DuckDBManager._connection = None diff --git a/src/databricks/labs/remorph/assessments/pipeline.py b/src/databricks/labs/remorph/assessments/pipeline.py index 03f5ff9bbf..f9488ce78c 100644 --- a/src/databricks/labs/remorph/assessments/pipeline.py +++ b/src/databricks/labs/remorph/assessments/pipeline.py @@ -5,6 +5,7 @@ import subprocess import yaml import duckdb +from enum import StrEnum from databricks.labs.remorph.connections.credential_manager import cred_file @@ -17,6 +18,12 @@ DB_NAME = "profiler_extract.db" +class StepExecutionStatus(StrEnum): + COMPLETE = "COMPLETE" + ERROR = "ERROR" + SKIPPED = "SKIPPED" + + class PipelineClass: def __init__(self, config: PipelineConfig, executor: DatabaseManager): self.config = config @@ -28,16 +35,26 @@ def execute(self): for step in self.config.steps: if step.flag == "active": logging.debug(f"Executing step: {step.name}") - self._execute_step(step) + self.execute_step(step) logging.info("Pipeline execution completed") - def _execute_step(self, step: Step): - if step.type == "sql": - self._execute_sql_step(step) - elif step.type == "python": - self._execute_python_step(step) - else: - logging.error(f"Unsupported step type: {step.type}") + def execute_step(self, step: Step) -> StrEnum: + try: + if step.type == "sql": + logging.info(f"Executing SQL step {step.name}") + self._execute_sql_step(step) + status = StepExecutionStatus.COMPLETE + elif step.type == "python": + logging.info(f"Executing Python step {step.name}") + self._execute_python_step(step) + status = StepExecutionStatus.COMPLETE + else: + logging.error(f"Unsupported step type: {step.type}") + raise RuntimeError(f"Unsupported step type: {step.type}") + except RuntimeError as e: + logging.error(e) + status = StepExecutionStatus.ERROR + return status def _execute_sql_step(self, step: Step): logging.debug(f"Reading query from file: {step.extract_source}") @@ -86,7 +103,8 @@ def _save_to_db(self, result, step_name: str, mode: str, batch_size: int = 1000) self._create_dir(self.db_path_prefix) conn = duckdb.connect(str(self.db_path_prefix) + '/' + DB_NAME) columns = result.keys() - # TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable + # TODO: Add support for figuring out data types from SQLALCHEMY + # result object result.cursor.description is not reliable schema = ' STRING, '.join(columns) + ' STRING' # Handle write modes diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service new file mode 100644 index 0000000000..4785c4c78b --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service @@ -0,0 +1,14 @@ +[Unit] +Description=Remorph usage collection service +After=network.target + +[Service] +ExecStart=/usr/bin/python3 ~/Downloads/remorph/scheduler/usage_collector.py +WorkingDirectory=~/Downloads/remorph/scheduler/ +StandardOutput=journal +StandardError=journal +Restart=always +User=root + +[Install] +WantedBy=multi-user.target diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer new file mode 100644 index 0000000000..da58c12cd5 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer @@ -0,0 +1,14 @@ +[Unit] +Description=Remorph usage collection every 15 minutes +After=network.target + +[Service] +ExecStart=/usr/bin/python3 ~/Downloads/remorph/scheduler/usage_collector.py +WorkingDirectory=~/Downloads/remorph/scheduler/ +StandardOutput=journal +StandardError=journal +Restart=always +User=root + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist b/src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist new file mode 100644 index 0000000000..dfedf1d702 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist @@ -0,0 +1,27 @@ + + + + + Label + com.remorph.usagecollection + + ProgramArguments + + /Library/Frameworks/Python.framework/Versions/3.11/bin/python3 + ~/Downloads/remorph/scheduler/usage_collector.py + + + StartInterval + 900 + + RunAtLoad + + + StandardOutPath + ~/Downloads/remorph/scheduler/stdout/usagecollection.log + + StandardErrorPath + ~/Downloads/remorph/scheduler/stderr/usagecollection_error.log + + + diff --git a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py new file mode 100644 index 0000000000..e255955597 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py @@ -0,0 +1,152 @@ +import time +import logging +from dataclasses import dataclass + +from datetime import datetime +from enum import StrEnum + +from databricks.labs.remorph.assessments.duckdb_manager import DuckDBManager +from databricks.labs.remorph.assessments.pipeline import PipelineClass, StepExecutionStatus +from databricks.labs.remorph.assessments.profiler_config import Step + + +def get_hours_since_last_run(last_run_datetime: datetime) -> int: + elapsed_time = datetime.now() - last_run_datetime + elapsed_seconds = elapsed_time.total_seconds() + return int(divmod(elapsed_seconds, 3600)[0]) + + +def get_days_since_last_run(last_run_datetime: datetime) -> int: + elapsed_time = datetime.now() - last_run_datetime + elapsed_seconds = elapsed_time.total_seconds() + return int(divmod(elapsed_seconds, 86400)[0]) + + +class ScheduledFrequency(StrEnum): + ONCE = "ONCE" + DAILY = "DAILY" + WEEKLY = "WEEKLY" + + +@dataclass +class PollingStatus: + polling_dt: datetime + pipeline_name: str + step_name: str + status: str + + +class PipelineScheduler: + """A scheduler that executes Pipeline steps according to predefined schedules.""" + + def __init__(self, pipelines: list[PipelineClass], + polling_interval_secs: int = 5): + + self.duckdb_connection = DuckDBManager().get_connection() + self.polling_interval = polling_interval_secs + self.pipelines: list[PipelineClass] = pipelines + + # Create a table in DuckDB for maintaining pipeline state + self._init_db() + + def _init_db(self): + """Initializes a DuckDB database to store pipeline step execution state.""" + logging.info("Initializing pipeline state database...") + self.duckdb_connection.execute(query=""" + CREATE TABLE IF NOT EXISTS pipeline_step_state ( + step_name TEXT PRIMARY KEY, + pipeline_name TEXT, + last_run TIMESTAMP, + status TEXT + ) + """) + logging.info("DuckDB state table is ready to go!") + + def _get_last_run_time(self, pipeline_name: str, step_name: str) -> (datetime | None, str | None): + """Fetches the last execution time of a pipeline step from the database. """ + full_step_name = f"{pipeline_name}__{step_name}" + result = self.duckdb_connection.execute( + query="SELECT last_run, status FROM pipeline_step_state WHERE step_name = ?", + parameters=[full_step_name] + ).fetchone() + if result is not None: + last_run_time_str = result[0] + return last_run_time_str, result[1] + else: + return None, None + + def _record_run_time(self, pipeline_name: str, step_name: str, status: str = StepExecutionStatus.COMPLETE.value): + """Records the latest execution time of a pipeline step.""" + now = datetime.now() + full_step_name = f"{pipeline_name}__{step_name}" + self.duckdb_connection.execute(query=""" + INSERT INTO pipeline_step_state (step_name, pipeline_name, last_run, status) + VALUES (?, ?, ?, ?) + ON CONFLICT(step_name) + DO UPDATE + SET pipeline_name = excluded.pipeline_name, last_run = excluded.last_run, status = excluded.status + """, parameters=[full_step_name, pipeline_name, now, status]) + + def _should_run(self, pipeline_name: str, step: Step) -> bool: + """Determines if a pipeline step should run based on its schedule.""" + scheduled_frequency = step.frequency + last_run_time, status = self._get_last_run_time(pipeline_name, step.name) + if last_run_time is None or status == StepExecutionStatus.ERROR.value: + # First time running the Step + should_run = True + logging.info(f"First time running the step: '{step.name}'.") + else: + # The Step has been run once already + if scheduled_frequency == ScheduledFrequency.ONCE.value: + logging.info(f"Step '{step.name}' has already been run once. Skipping.") + should_run = False + # Check if it's been >= 24 hours since the last run + elif (scheduled_frequency == ScheduledFrequency.DAILY.value and + get_hours_since_last_run(last_run_time) >= 24): + should_run = True + logging.info(f"Running daily step '{step.name}' now.") + # Check if it's been >= 7 days since the last run + elif (scheduled_frequency == ScheduledFrequency.WEEKLY.value and + get_days_since_last_run(last_run_time) >= 7): + should_run = True + logging.info(f"Running weekly step '{step.name}' now.") + else: + # None of the triggering frequency conditions have been met + should_run = False + return should_run + + def _run_step(self, pipeline: PipelineClass, step: Step) -> str: + """Executes a pipeline step if it's time to run.""" + if self._should_run(pipeline.config.name, step): + status = str(pipeline.execute_step(step).value) + self._record_run_time(pipeline.config.name, step.name, status) + else: + status = StepExecutionStatus.SKIPPED.value + return status + + def run(self, max_num_cycles: int = None) -> list[PollingStatus]: + """Create an infinite loop over the pipeline steps""" + logging.info("PipelineScheduler has started...") + cycle_counter = 0 + polling_status = [] + while cycle_counter < max_num_cycles if max_num_cycles is not None else True: + + # Loop through the list of scheduled pipelines + # TODO: Parallelize this in the future to be more efficient + for pipeline in self.pipelines: + pipeline_steps = pipeline.config.steps + for pipeline_step in pipeline_steps: + # Inspect the execution frequency of the Step + # Possible frequencies include: "once", "daily", "weekly" (see `ScheduledFrequency` enum) + logging.info(f"Evaluating scheduling step '{pipeline_step.name}'.") + step_exec_status = self._run_step(pipeline, pipeline_step) + execution_status = PollingStatus(datetime.now(), pipeline.config.name, + pipeline_step.name, step_exec_status) + polling_status.append(execution_status) + + # Wait a bit before polling status + time.sleep(self.polling_interval) + if max_num_cycles is not None: + cycle_counter += 1 + + return polling_status diff --git a/tests/integration/assessments/test_pipeline_scheduler.py b/tests/integration/assessments/test_pipeline_scheduler.py new file mode 100644 index 0000000000..b13928c3b5 --- /dev/null +++ b/tests/integration/assessments/test_pipeline_scheduler.py @@ -0,0 +1,51 @@ +from pathlib import Path +import pytest + +from datetime import datetime + +from databricks.labs.remorph.assessments.pipeline import PipelineClass +from databricks.labs.remorph.assessments.scheduler.pipeline_scheduler import PipelineScheduler, \ + get_hours_since_last_run, get_days_since_last_run +from ..connections.helpers import get_db_manager + + +@pytest.fixture() +def extractor(): + return get_db_manager("remorph", "postgres") + + +@pytest.fixture(scope="module") +def pipeline_config(): + prefix = Path(__file__).parent + config_path = f"{prefix}/../../resources/assessments/scheduler/postgres_pipeline_config.yml" + config = PipelineClass.load_config_from_yaml(config_path) + + for step in config.steps: + step.extract_source = f"{prefix}/../../{step.extract_source}" + return config + + +def test_hours_since_last_run(): + date_start_str = "2025-04-08 10:30:00" + date_format = "%Y-%m-%d %H:%M:%S" + datetime_start = datetime.strptime(date_start_str, date_format) + actual_hours = get_hours_since_last_run(datetime_start) + expected_hours = divmod((datetime.now() - datetime_start).total_seconds(), 3600)[0] + assert actual_hours == expected_hours, "The calculated hours since last run does not match the expected value." + + +def test_days_since_last_run(): + date_start_str = "2025-04-08 10:30:00" + date_format = "%Y-%m-%d %H:%M:%S" + datetime_start = datetime.strptime(date_start_str, date_format) + actual_days = get_days_since_last_run(datetime_start) + expected_days = divmod((datetime.now() - datetime_start).total_seconds(), 86400)[0] + assert actual_days == expected_days, "The calculated days since last run does not match the expected value." + + +def test_pipeline_scheduler(pipeline_config, extractor): + simple_pipeline = PipelineClass(config=pipeline_config, executor=extractor) + pipelines = [simple_pipeline] + scheduler = PipelineScheduler(pipelines) + status = scheduler.run(max_num_cycles=3) + assert len(status) == 3, "The actual step execution statuses did not match the expected amount." diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index e1cc5a8fdd..bdc448acfe 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -65,6 +65,13 @@ def mock_credentials(): 'database': 'TEST_TSQL_JDBC', 'driver': 'ODBC Driver 18 for SQL Server', }, + 'postgres': { + 'user': 'TEST_PG_USER', + 'password': 'TEST_PG_PASS', + 'server': 'TEST_PG_JDBC', + 'database': 'TEST_PG_DB', + 'driver': 'PostgreSQL Unicode', + }, }, ): yield diff --git a/tests/resources/assessments/scheduler/postgres_pipeline_config.yml b/tests/resources/assessments/scheduler/postgres_pipeline_config.yml new file mode 100644 index 0000000000..4584bc7f51 --- /dev/null +++ b/tests/resources/assessments/scheduler/postgres_pipeline_config.yml @@ -0,0 +1,10 @@ +name: PostgresPipeline +version: "1.0" +extract_folder: /tmp/extracts/ +steps: + - name: usage + type: sql + extract_source: resources/assessments/target/postgres/usage.sql + mode: overwrite + frequency: weekly + flag: active From 5c9aafb4ca080c624273663fa5c922e0ea62ff5d Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:27:02 -0400 Subject: [PATCH 02/22] Update docs/remorph/docs/scheduler/linux/linux_installation.mdx Co-authored-by: Andrew Snare --- docs/remorph/docs/scheduler/linux/linux_installation.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/remorph/docs/scheduler/linux/linux_installation.mdx b/docs/remorph/docs/scheduler/linux/linux_installation.mdx index fbf1e23d4e..0f7aa98a9b 100644 --- a/docs/remorph/docs/scheduler/linux/linux_installation.mdx +++ b/docs/remorph/docs/scheduler/linux/linux_installation.mdx @@ -21,7 +21,7 @@ $ cp ./src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_u $ cp ./src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer /etc/systemd/system/ ``` -4. Reload the systmd process to pick up the new files: +4. Reload the `systemd` process to pick up the new files: ```bash $ sudo systemctl daemon-reload From ddc4a117208cbe38d4d0c8d56016bf1d6a1a6691 Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:33:55 -0400 Subject: [PATCH 03/22] Update src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py Co-authored-by: Andrew Snare --- .../labs/remorph/assessments/scheduler/pipeline_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py index e255955597..b15d8a2600 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py +++ b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py @@ -2,7 +2,7 @@ import logging from dataclasses import dataclass -from datetime import datetime +import datetime as dt from enum import StrEnum from databricks.labs.remorph.assessments.duckdb_manager import DuckDBManager From cf8725e2e8cb3f772f97824651a685c394c963cf Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:34:26 -0400 Subject: [PATCH 04/22] Update src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py Co-authored-by: Andrew Snare --- .../labs/remorph/assessments/scheduler/pipeline_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py index b15d8a2600..21fa2bb9d7 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py +++ b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py @@ -13,7 +13,7 @@ def get_hours_since_last_run(last_run_datetime: datetime) -> int: elapsed_time = datetime.now() - last_run_datetime elapsed_seconds = elapsed_time.total_seconds() - return int(divmod(elapsed_seconds, 3600)[0]) + return elapsed_seconds // 3600 def get_days_since_last_run(last_run_datetime: datetime) -> int: From 7774267687c21898c44d55bd78eab0560661b590 Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:34:39 -0400 Subject: [PATCH 05/22] Update src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py Co-authored-by: Andrew Snare --- .../labs/remorph/assessments/scheduler/pipeline_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py index 21fa2bb9d7..c1515dfd10 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py +++ b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py @@ -11,7 +11,7 @@ def get_hours_since_last_run(last_run_datetime: datetime) -> int: - elapsed_time = datetime.now() - last_run_datetime + elapsed_time = dt.datetime.now(dt.timezone.utc) - last_run_datetime elapsed_seconds = elapsed_time.total_seconds() return elapsed_seconds // 3600 From 5893ca9ef5ca512b88e6770f0b181b532392d19e Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:34:52 -0400 Subject: [PATCH 06/22] Update src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py Co-authored-by: Andrew Snare --- .../labs/remorph/assessments/scheduler/pipeline_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py index c1515dfd10..7c2833b548 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py +++ b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py @@ -17,7 +17,7 @@ def get_hours_since_last_run(last_run_datetime: datetime) -> int: def get_days_since_last_run(last_run_datetime: datetime) -> int: - elapsed_time = datetime.now() - last_run_datetime + elapsed_time = dt.datetime.now(dt.timezone.utc) - last_run_datetime elapsed_seconds = elapsed_time.total_seconds() return int(divmod(elapsed_seconds, 86400)[0]) From 6131a7f691eff1c914760a74819244a5e607ed4c Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:35:11 -0400 Subject: [PATCH 07/22] Update src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py Co-authored-by: Andrew Snare --- .../labs/remorph/assessments/scheduler/pipeline_scheduler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py index 7c2833b548..0c2abf36d4 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py +++ b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py @@ -54,10 +54,11 @@ def _init_db(self): logging.info("Initializing pipeline state database...") self.duckdb_connection.execute(query=""" CREATE TABLE IF NOT EXISTS pipeline_step_state ( - step_name TEXT PRIMARY KEY, + step_name TEXT, pipeline_name TEXT, last_run TIMESTAMP, - status TEXT + status TEXT, + PRIMARY_KEY(pipeline_name, step_name) ) """) logging.info("DuckDB state table is ready to go!") From cca4b2276755539f9d220013e28cc93803761f77 Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:35:22 -0400 Subject: [PATCH 08/22] Update tests/integration/assessments/test_pipeline_scheduler.py Co-authored-by: Andrew Snare --- tests/integration/assessments/test_pipeline_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/assessments/test_pipeline_scheduler.py b/tests/integration/assessments/test_pipeline_scheduler.py index b13928c3b5..9a9b919a23 100644 --- a/tests/integration/assessments/test_pipeline_scheduler.py +++ b/tests/integration/assessments/test_pipeline_scheduler.py @@ -39,7 +39,7 @@ def test_days_since_last_run(): date_format = "%Y-%m-%d %H:%M:%S" datetime_start = datetime.strptime(date_start_str, date_format) actual_days = get_days_since_last_run(datetime_start) - expected_days = divmod((datetime.now() - datetime_start).total_seconds(), 86400)[0] + expected_hours = dt.datetime.now(dt.timezone.utc) // 86400 assert actual_days == expected_days, "The calculated days since last run does not match the expected value." From a762af584c396ee1b131e9b1aedcbe831d7cbb57 Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:35:36 -0400 Subject: [PATCH 09/22] Update tests/integration/assessments/test_pipeline_scheduler.py Co-authored-by: Andrew Snare --- tests/integration/assessments/test_pipeline_scheduler.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/integration/assessments/test_pipeline_scheduler.py b/tests/integration/assessments/test_pipeline_scheduler.py index 9a9b919a23..490e81dc87 100644 --- a/tests/integration/assessments/test_pipeline_scheduler.py +++ b/tests/integration/assessments/test_pipeline_scheduler.py @@ -35,9 +35,7 @@ def test_hours_since_last_run(): def test_days_since_last_run(): - date_start_str = "2025-04-08 10:30:00" - date_format = "%Y-%m-%d %H:%M:%S" - datetime_start = datetime.strptime(date_start_str, date_format) + datetime_start = dt.datetime(year=2025, month=4, day=8, hour=10, minute=30, second=0, tzinfo=dt.timezone.utc) actual_days = get_days_since_last_run(datetime_start) expected_hours = dt.datetime.now(dt.timezone.utc) // 86400 assert actual_days == expected_days, "The calculated days since last run does not match the expected value." From 490985d14e7107fab7c88809bbe6e88f1367087c Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:35:47 -0400 Subject: [PATCH 10/22] Update tests/integration/assessments/test_pipeline_scheduler.py Co-authored-by: Andrew Snare --- tests/integration/assessments/test_pipeline_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/assessments/test_pipeline_scheduler.py b/tests/integration/assessments/test_pipeline_scheduler.py index 490e81dc87..037a56efb6 100644 --- a/tests/integration/assessments/test_pipeline_scheduler.py +++ b/tests/integration/assessments/test_pipeline_scheduler.py @@ -30,7 +30,7 @@ def test_hours_since_last_run(): date_format = "%Y-%m-%d %H:%M:%S" datetime_start = datetime.strptime(date_start_str, date_format) actual_hours = get_hours_since_last_run(datetime_start) - expected_hours = divmod((datetime.now() - datetime_start).total_seconds(), 3600)[0] + expected_hours = dt.datetime.now(dt.timezone.utc) // 3600 assert actual_hours == expected_hours, "The calculated hours since last run does not match the expected value." From 103d867af233edaeefc56fec10092bcd6f657812 Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:35:59 -0400 Subject: [PATCH 11/22] Update tests/integration/assessments/test_pipeline_scheduler.py Co-authored-by: Andrew Snare --- tests/integration/assessments/test_pipeline_scheduler.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/integration/assessments/test_pipeline_scheduler.py b/tests/integration/assessments/test_pipeline_scheduler.py index 037a56efb6..37566d18a8 100644 --- a/tests/integration/assessments/test_pipeline_scheduler.py +++ b/tests/integration/assessments/test_pipeline_scheduler.py @@ -26,9 +26,7 @@ def pipeline_config(): def test_hours_since_last_run(): - date_start_str = "2025-04-08 10:30:00" - date_format = "%Y-%m-%d %H:%M:%S" - datetime_start = datetime.strptime(date_start_str, date_format) + datetime_start = dt.datetime(year=2025, month=4, day=8, hour=10, minute=30, second=0, tzinfo=dt.timezone.utc) actual_hours = get_hours_since_last_run(datetime_start) expected_hours = dt.datetime.now(dt.timezone.utc) // 3600 assert actual_hours == expected_hours, "The calculated hours since last run does not match the expected value." From e77d7b45bc0d95086469921046fcfdef4b0ea4c9 Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:36:16 -0400 Subject: [PATCH 12/22] Update src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py Co-authored-by: Andrew Snare --- .../labs/remorph/assessments/scheduler/pipeline_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py index 0c2abf36d4..c5b82bf5ad 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py +++ b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py @@ -141,7 +141,7 @@ def run(self, max_num_cycles: int = None) -> list[PollingStatus]: # Possible frequencies include: "once", "daily", "weekly" (see `ScheduledFrequency` enum) logging.info(f"Evaluating scheduling step '{pipeline_step.name}'.") step_exec_status = self._run_step(pipeline, pipeline_step) - execution_status = PollingStatus(datetime.now(), pipeline.config.name, + execution_status = PollingStatus(dt.datetime.now(dt.timezone.utc), pipeline.config.name, pipeline_step.name, step_exec_status) polling_status.append(execution_status) From 17ff6f9d93c3240c8bedb1d0d5c9c85b52eba1da Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:36:28 -0400 Subject: [PATCH 13/22] Update src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py Co-authored-by: Andrew Snare --- .../labs/remorph/assessments/scheduler/pipeline_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py index c5b82bf5ad..a6f2aeae0d 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py +++ b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py @@ -78,7 +78,7 @@ def _get_last_run_time(self, pipeline_name: str, step_name: str) -> (datetime | def _record_run_time(self, pipeline_name: str, step_name: str, status: str = StepExecutionStatus.COMPLETE.value): """Records the latest execution time of a pipeline step.""" - now = datetime.now() + now = dt.datetime.now(dt.timezone.utc) full_step_name = f"{pipeline_name}__{step_name}" self.duckdb_connection.execute(query=""" INSERT INTO pipeline_step_state (step_name, pipeline_name, last_run, status) From c1a8e2bdec89d96218ca92e0be0b26194301d407 Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:36:55 -0400 Subject: [PATCH 14/22] Update src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py Co-authored-by: Andrew Snare --- .../labs/remorph/assessments/scheduler/pipeline_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py index a6f2aeae0d..c584556c36 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py +++ b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py @@ -19,7 +19,7 @@ def get_hours_since_last_run(last_run_datetime: datetime) -> int: def get_days_since_last_run(last_run_datetime: datetime) -> int: elapsed_time = dt.datetime.now(dt.timezone.utc) - last_run_datetime elapsed_seconds = elapsed_time.total_seconds() - return int(divmod(elapsed_seconds, 86400)[0]) + return elapsed_seconds // 86400 class ScheduledFrequency(StrEnum): From e072ccb017404663b0bc664b4e6326507546c3e9 Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:37:10 -0400 Subject: [PATCH 15/22] Update src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py Co-authored-by: Andrew Snare --- .../labs/remorph/assessments/scheduler/pipeline_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py index c584556c36..e1ca172242 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py +++ b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py @@ -71,8 +71,8 @@ def _get_last_run_time(self, pipeline_name: str, step_name: str) -> (datetime | parameters=[full_step_name] ).fetchone() if result is not None: - last_run_time_str = result[0] - return last_run_time_str, result[1] + last_run, status = result + return last_run, status else: return None, None From db13f9d18d2d0a7c2cb85e6af970a566e74f66cf Mon Sep 17 00:00:00 2001 From: Will Girten <47335283+goodwillpunning@users.noreply.github.com> Date: Sat, 26 Apr 2025 17:37:19 -0400 Subject: [PATCH 16/22] Update src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py Co-authored-by: Andrew Snare --- .../labs/remorph/assessments/scheduler/pipeline_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py index e1ca172242..540f10e0d8 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py +++ b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py @@ -56,7 +56,7 @@ def _init_db(self): CREATE TABLE IF NOT EXISTS pipeline_step_state ( step_name TEXT, pipeline_name TEXT, - last_run TIMESTAMP, + last_run TIMESTAMPTZ, status TEXT, PRIMARY_KEY(pipeline_name, step_name) ) From f3dc5c303e11106c48a2f79a2d9fbb6ebeefee06 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Sat, 26 Apr 2025 23:24:46 -0400 Subject: [PATCH 17/22] Fix linting errors --- .../labs/remorph/assessments/pipeline.py | 6 +- .../scheduler/pipeline_scheduler.py | 80 ++++++++++--------- .../assessments/test_pipeline_scheduler.py | 12 +-- 3 files changed, 51 insertions(+), 47 deletions(-) diff --git a/src/databricks/labs/remorph/assessments/pipeline.py b/src/databricks/labs/remorph/assessments/pipeline.py index f9488ce78c..fab19d3543 100644 --- a/src/databricks/labs/remorph/assessments/pipeline.py +++ b/src/databricks/labs/remorph/assessments/pipeline.py @@ -3,9 +3,9 @@ import json import logging import subprocess +from enum import Enum import yaml import duckdb -from enum import StrEnum from databricks.labs.remorph.connections.credential_manager import cred_file @@ -18,7 +18,7 @@ DB_NAME = "profiler_extract.db" -class StepExecutionStatus(StrEnum): +class StepExecutionStatus(Enum): COMPLETE = "COMPLETE" ERROR = "ERROR" SKIPPED = "SKIPPED" @@ -38,7 +38,7 @@ def execute(self): self.execute_step(step) logging.info("Pipeline execution completed") - def execute_step(self, step: Step) -> StrEnum: + def execute_step(self, step: Step) -> Enum: try: if step.type == "sql": logging.info(f"Executing SQL step {step.name}") diff --git a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py index 540f10e0d8..337e3c8350 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py +++ b/src/databricks/labs/remorph/assessments/scheduler/pipeline_scheduler.py @@ -1,9 +1,10 @@ import time import logging from dataclasses import dataclass +from enum import Enum import datetime as dt -from enum import StrEnum +from datetime import datetime from databricks.labs.remorph.assessments.duckdb_manager import DuckDBManager from databricks.labs.remorph.assessments.pipeline import PipelineClass, StepExecutionStatus @@ -13,16 +14,16 @@ def get_hours_since_last_run(last_run_datetime: datetime) -> int: elapsed_time = dt.datetime.now(dt.timezone.utc) - last_run_datetime elapsed_seconds = elapsed_time.total_seconds() - return elapsed_seconds // 3600 + return round(elapsed_seconds // 3600) def get_days_since_last_run(last_run_datetime: datetime) -> int: elapsed_time = dt.datetime.now(dt.timezone.utc) - last_run_datetime elapsed_seconds = elapsed_time.total_seconds() - return elapsed_seconds // 86400 + return round(elapsed_seconds // 86400) -class ScheduledFrequency(StrEnum): +class ScheduledFrequency(Enum): ONCE = "ONCE" DAILY = "DAILY" WEEKLY = "WEEKLY" @@ -39,8 +40,7 @@ class PollingStatus: class PipelineScheduler: """A scheduler that executes Pipeline steps according to predefined schedules.""" - def __init__(self, pipelines: list[PipelineClass], - polling_interval_secs: int = 5): + def __init__(self, pipelines: list[PipelineClass], polling_interval_secs: int = 5): self.duckdb_connection = DuckDBManager().get_connection() self.polling_interval = polling_interval_secs @@ -52,7 +52,8 @@ def __init__(self, pipelines: list[PipelineClass], def _init_db(self): """Initializes a DuckDB database to store pipeline step execution state.""" logging.info("Initializing pipeline state database...") - self.duckdb_connection.execute(query=""" + self.duckdb_connection.execute( + query=""" CREATE TABLE IF NOT EXISTS pipeline_step_state ( step_name TEXT, pipeline_name TEXT, @@ -60,33 +61,36 @@ def _init_db(self): status TEXT, PRIMARY_KEY(pipeline_name, step_name) ) - """) + """ + ) logging.info("DuckDB state table is ready to go!") - def _get_last_run_time(self, pipeline_name: str, step_name: str) -> (datetime | None, str | None): - """Fetches the last execution time of a pipeline step from the database. """ + def _get_last_run_time(self, pipeline_name: str, step_name: str) -> tuple[datetime | None, str | None]: + """Fetches the last execution time of a pipeline step from the database.""" full_step_name = f"{pipeline_name}__{step_name}" + last_run = None + status = None result = self.duckdb_connection.execute( - query="SELECT last_run, status FROM pipeline_step_state WHERE step_name = ?", - parameters=[full_step_name] + query="SELECT last_run, status FROM pipeline_step_state WHERE step_name = ?", parameters=[full_step_name] ).fetchone() if result is not None: last_run, status = result - return last_run, status - else: - return None, None + return last_run, status def _record_run_time(self, pipeline_name: str, step_name: str, status: str = StepExecutionStatus.COMPLETE.value): """Records the latest execution time of a pipeline step.""" now = dt.datetime.now(dt.timezone.utc) full_step_name = f"{pipeline_name}__{step_name}" - self.duckdb_connection.execute(query=""" + self.duckdb_connection.execute( + query=""" INSERT INTO pipeline_step_state (step_name, pipeline_name, last_run, status) VALUES (?, ?, ?, ?) ON CONFLICT(step_name) DO UPDATE SET pipeline_name = excluded.pipeline_name, last_run = excluded.last_run, status = excluded.status - """, parameters=[full_step_name, pipeline_name, now, status]) + """, + parameters=[full_step_name, pipeline_name, now, status], + ) def _should_run(self, pipeline_name: str, step: Step) -> bool: """Determines if a pipeline step should run based on its schedule.""" @@ -96,24 +100,21 @@ def _should_run(self, pipeline_name: str, step: Step) -> bool: # First time running the Step should_run = True logging.info(f"First time running the step: '{step.name}'.") + # The Step has been run once already + elif scheduled_frequency == ScheduledFrequency.ONCE.value: + logging.info(f"Step '{step.name}' has already been run once. Skipping.") + should_run = False + # Check if it's been >= 24 hours since the last run + elif scheduled_frequency == ScheduledFrequency.DAILY.value and get_hours_since_last_run(last_run_time) >= 24: + should_run = True + logging.info(f"Running daily step '{step.name}' now.") + # Check if it's been >= 7 days since the last run + elif scheduled_frequency == ScheduledFrequency.WEEKLY.value and get_days_since_last_run(last_run_time) >= 7: + should_run = True + logging.info(f"Running weekly step '{step.name}' now.") else: - # The Step has been run once already - if scheduled_frequency == ScheduledFrequency.ONCE.value: - logging.info(f"Step '{step.name}' has already been run once. Skipping.") - should_run = False - # Check if it's been >= 24 hours since the last run - elif (scheduled_frequency == ScheduledFrequency.DAILY.value and - get_hours_since_last_run(last_run_time) >= 24): - should_run = True - logging.info(f"Running daily step '{step.name}' now.") - # Check if it's been >= 7 days since the last run - elif (scheduled_frequency == ScheduledFrequency.WEEKLY.value and - get_days_since_last_run(last_run_time) >= 7): - should_run = True - logging.info(f"Running weekly step '{step.name}' now.") - else: - # None of the triggering frequency conditions have been met - should_run = False + # None of the triggering frequency conditions have been met + should_run = False return should_run def _run_step(self, pipeline: PipelineClass, step: Step) -> str: @@ -125,12 +126,12 @@ def _run_step(self, pipeline: PipelineClass, step: Step) -> str: status = StepExecutionStatus.SKIPPED.value return status - def run(self, max_num_cycles: int = None) -> list[PollingStatus]: - """Create an infinite loop over the pipeline steps""" + def run(self, max_num_cycles: int = 10) -> list[PollingStatus]: + """Create a loop over the pipeline steps""" logging.info("PipelineScheduler has started...") cycle_counter = 0 polling_status = [] - while cycle_counter < max_num_cycles if max_num_cycles is not None else True: + while cycle_counter < max_num_cycles: # Loop through the list of scheduled pipelines # TODO: Parallelize this in the future to be more efficient @@ -141,8 +142,9 @@ def run(self, max_num_cycles: int = None) -> list[PollingStatus]: # Possible frequencies include: "once", "daily", "weekly" (see `ScheduledFrequency` enum) logging.info(f"Evaluating scheduling step '{pipeline_step.name}'.") step_exec_status = self._run_step(pipeline, pipeline_step) - execution_status = PollingStatus(dt.datetime.now(dt.timezone.utc), pipeline.config.name, - pipeline_step.name, step_exec_status) + execution_status = PollingStatus( + dt.datetime.now(dt.timezone.utc), pipeline.config.name, pipeline_step.name, step_exec_status + ) polling_status.append(execution_status) # Wait a bit before polling status diff --git a/tests/integration/assessments/test_pipeline_scheduler.py b/tests/integration/assessments/test_pipeline_scheduler.py index 37566d18a8..d7986e1f00 100644 --- a/tests/integration/assessments/test_pipeline_scheduler.py +++ b/tests/integration/assessments/test_pipeline_scheduler.py @@ -1,11 +1,13 @@ +import datetime as dt from pathlib import Path import pytest -from datetime import datetime - from databricks.labs.remorph.assessments.pipeline import PipelineClass -from databricks.labs.remorph.assessments.scheduler.pipeline_scheduler import PipelineScheduler, \ - get_hours_since_last_run, get_days_since_last_run +from databricks.labs.remorph.assessments.scheduler.pipeline_scheduler import ( + PipelineScheduler, + get_hours_since_last_run, + get_days_since_last_run, +) from ..connections.helpers import get_db_manager @@ -35,7 +37,7 @@ def test_hours_since_last_run(): def test_days_since_last_run(): datetime_start = dt.datetime(year=2025, month=4, day=8, hour=10, minute=30, second=0, tzinfo=dt.timezone.utc) actual_days = get_days_since_last_run(datetime_start) - expected_hours = dt.datetime.now(dt.timezone.utc) // 86400 + expected_days = dt.datetime.now(dt.timezone.utc) // 86400 assert actual_days == expected_days, "The calculated days since last run does not match the expected value." From 919307ee3b0728be968d47ec36577ee67a5b3393 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Sun, 27 Apr 2025 15:32:35 -0400 Subject: [PATCH 18/22] Fix R0204 pylint error --- src/databricks/labs/remorph/assessments/pipeline.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/databricks/labs/remorph/assessments/pipeline.py b/src/databricks/labs/remorph/assessments/pipeline.py index fab19d3543..672c2009f4 100644 --- a/src/databricks/labs/remorph/assessments/pipeline.py +++ b/src/databricks/labs/remorph/assessments/pipeline.py @@ -43,18 +43,16 @@ def execute_step(self, step: Step) -> Enum: if step.type == "sql": logging.info(f"Executing SQL step {step.name}") self._execute_sql_step(step) - status = StepExecutionStatus.COMPLETE elif step.type == "python": logging.info(f"Executing Python step {step.name}") self._execute_python_step(step) - status = StepExecutionStatus.COMPLETE else: logging.error(f"Unsupported step type: {step.type}") raise RuntimeError(f"Unsupported step type: {step.type}") except RuntimeError as e: logging.error(e) - status = StepExecutionStatus.ERROR - return status + return StepExecutionStatus.ERROR + return StepExecutionStatus.COMPLETE def _execute_sql_step(self, step: Step): logging.debug(f"Reading query from file: {step.extract_source}") From a6a6c4fba3473360422829499409a54f08801462 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Mon, 28 Apr 2025 01:49:16 -0400 Subject: [PATCH 19/22] Add installer/uninstaller scripts. --- .../assessments/scheduler/install/install.py | 87 +++++++++++++++++++ .../linux/remorph_usage_collection.timer | 2 +- .../macos/com.remorph.usagecollection.plist | 7 +- .../install/macos/install_launch_agent.py | 43 +++++++++ .../install/macos/uninstall_launch_agent.py | 27 ++++++ .../scheduler/install/uninstall.py | 70 +++++++++++++++ 6 files changed, 231 insertions(+), 5 deletions(-) create mode 100644 src/databricks/labs/remorph/assessments/scheduler/install/install.py create mode 100644 src/databricks/labs/remorph/assessments/scheduler/install/macos/install_launch_agent.py create mode 100644 src/databricks/labs/remorph/assessments/scheduler/install/macos/uninstall_launch_agent.py create mode 100644 src/databricks/labs/remorph/assessments/scheduler/install/uninstall.py diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/install.py b/src/databricks/labs/remorph/assessments/scheduler/install/install.py new file mode 100644 index 0000000000..3e5cbe3f19 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/install.py @@ -0,0 +1,87 @@ +import platform +from pathlib import Path +from subprocess import run, CalledProcessError + +import venv +import tempfile +import json +import logging + +PROFILER_DEPENDENCIES = [] + + +def launch_installer(): + + logging.info(f"Creating virtual environment.") + + # Build a virtual environment for launching installer + with tempfile.TemporaryDirectory() as temp_dir: + venv_dir = Path(temp_dir) / "profiler_venv" + builder = venv.EnvBuilder(with_pip=True) + builder.create(venv_dir) + venv_python = Path(venv_dir) / "bin" / "python" + venv_pip = Path(venv_dir) / "bin" / "pip" + logging.info("Venv setup complete.") + + # Install dependencies in the virtual environment + if len(PROFILER_DEPENDENCIES) > 0: + logging.info(f"Installing dependencies: {', '.join(PROFILER_DEPENDENCIES)}") + try: + run([str(venv_pip), "install", *PROFILER_DEPENDENCIES], check=True, capture_output=True, text=True) + except CalledProcessError as e: + logging.error(f"Failed to install dependencies: {e.stderr}") + raise RuntimeError(f"Failed to install dependencies: {e.stderr}") from e + + # Execute the Python script using the virtual environment's Python interpreter + try: + # Detect the host operating system and execute the appropriate script + system = platform.system() + if system == "Darwin": + installer_path = "macos/install_launch_agent.py" + elif system == "Linux": + installer_path = "linux/install_launch_agent.py" + elif system == "Windows": + installer_path = "windows/install_launch_agent.py" + else: + raise RuntimeError(f"Unsupported operating system: {system}") + + logging.info(f"Running scheduler installer for: {system}") + result = run( + [ + str(venv_python), + str(installer_path) + ], + check=True, + capture_output=True, + text=True, + ) + + try: + output = json.loads(result.stdout) + logging.debug(output) + if output["status"] == "success": + logging.info(f"Python script completed: {output['message']}") + else: + raise RuntimeError(f"Script reported error: {output['message']}") + except json.JSONDecodeError: + logging.info(f"Python script output: {result.stdout}") + + except CalledProcessError as e: + error_msg = e.stderr + logging.error(f"Installation script failed: {error_msg}") + raise RuntimeError(f"Installation script failed: {error_msg}") from e + + +def main(): + # Prerequisites: + # The user has already cloned the remorph project and is executing the `install.py` script + # So we can skip cloning the remorph project + # The user must be using a Mac, Linux, or Windows OS + # Python 3.11+ must be installed + logging.info("Installing the remorph EDW profiler...\n") + launch_installer() + logging.info("Installation complete.") + + +if __name__ == "__main__": + main() diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer index da58c12cd5..2be924de8a 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer +++ b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer @@ -11,4 +11,4 @@ Restart=always User=root [Install] -WantedBy=multi-user.target \ No newline at end of file +WantedBy=multi-user.target diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist b/src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist index dfedf1d702..e7a0c4698d 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist +++ b/src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist @@ -8,7 +8,7 @@ ProgramArguments /Library/Frameworks/Python.framework/Versions/3.11/bin/python3 - ~/Downloads/remorph/scheduler/usage_collector.py + $HOME/.databricks/labs/remorph/assessment/scheduler/pipeline_scheduler.py StartInterval @@ -18,10 +18,9 @@ StandardOutPath - ~/Downloads/remorph/scheduler/stdout/usagecollection.log + $HOME/.databricks/labs/remorph/assessment/scheduler/usagecollection.log StandardErrorPath - ~/Downloads/remorph/scheduler/stderr/usagecollection_error.log + $HOME/.databricks/labs/remorph/assessment/scheduler/usagecollection_error.log - diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/macos/install_launch_agent.py b/src/databricks/labs/remorph/assessments/scheduler/install/macos/install_launch_agent.py new file mode 100644 index 0000000000..95269144a0 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/macos/install_launch_agent.py @@ -0,0 +1,43 @@ +import logging +import os +from pathlib import Path +import subprocess +import shutil + + +def install_launch_agent(): + # Ensure that the User's `LaunchAgents` dir exists + home_dir = os.getenv("HOME") + launch_agents_dir = home_dir / Path("Library/LaunchAgents") + launch_agents_dir.mkdir(parents=True, exist_ok=True) + plist_path = launch_agents_dir / "com.remorph.usagecollection.plist" + + # Ensure that the scheduler script exits too + scheduler_path = Path(__file__).parent.parent.parent / "pipeline_scheduler.py" + if not scheduler_path.exists(): + raise FileNotFoundError(f"Could not find the profiler scheduler: {scheduler_path}") + + # Check if the profiler scheduler has been previously registered + if plist_path.exists(): + logging.info("Plist exists already. Unloading.") + subprocess.run(["launchctl", "unload", str(plist_path)], check=False) + else: + logging.info("Plist does not exist. Copying template to LaunchAgents dir.") + # copy the plist file to the launch agents dir + shutil.copy2("com.remorph.usagecollection.plist", plist_path) + + # TODO: Update plist file + + # Register the profiler scheduler + subprocess.run(["launchctl", "load", str(plist_path)], check=True) + logging.info(f"Loaded LaunchAgent com.remorph.usagecollection.plist successfully.") + + +def main(): + logging.info("Installing MacOS scheduler components...\n") + install_launch_agent() + logging.info("Installation complete.") + + +if __name__ == "__main__": + main() diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/macos/uninstall_launch_agent.py b/src/databricks/labs/remorph/assessments/scheduler/install/macos/uninstall_launch_agent.py new file mode 100644 index 0000000000..f5ba1f996f --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/macos/uninstall_launch_agent.py @@ -0,0 +1,27 @@ +import logging +import os +from pathlib import Path +import subprocess + + +def uninstall_launch_agent(): + home_dir = os.getenv("HOME") + launch_agents_dir = home_dir / Path("Library/LaunchAgents") + plist_path = launch_agents_dir / "com.remorph.usagecollection.plist" + + # Check if the profiler scheduler has been previously registered + if plist_path.exists(): + logging.info("Plist exists. Unloading agent.") + subprocess.run(["launchctl", "unload", str(plist_path)], check=False) + plist_path.unlink() + logging.info("Plist file removed from LaunchAgents dir.") + + +def main(): + logging.info("Uninstalling MacOS scheduler components...\n") + uninstall_launch_agent() + logging.info("Uninstallation complete.") + + +if __name__ == "__main__": + main() diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/uninstall.py b/src/databricks/labs/remorph/assessments/scheduler/install/uninstall.py new file mode 100644 index 0000000000..763804b853 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/uninstall.py @@ -0,0 +1,70 @@ +import platform +from pathlib import Path +from subprocess import run, CalledProcessError + +import venv +import tempfile +import json +import logging + + +def launch_uninstaller(): + + logging.info(f"Creating virtual environment.") + + # Create a temporary directory for the virtual environment + with tempfile.TemporaryDirectory() as temp_dir: + venv_dir = Path(temp_dir) / "profiler_venv" + builder = venv.EnvBuilder(with_pip=True) + builder.create(venv_dir) + venv_python = Path(venv_dir) / "bin" / "python" + logging.info("Venv setup complete.") + + # Execute the Python script using the virtual environment's Python interpreter + try: + # Detect the host operating system and execute the appropriate script + system = platform.system() + if system == "Darwin": + installer_path = "macos/uninstall_launch_agent.py" + elif system == "Linux": + installer_path = "linux/uninstall_launch_agent.py" + elif system == "Windows": + installer_path = "windows/uninstall_launch_agent.py" + else: + raise RuntimeError(f"Unsupported operating system: {system}") + + logging.info(f"Running scheduler uninstaller for: {system}") + result = run( + [ + str(venv_python), + str(installer_path) + ], + check=True, + capture_output=True, + text=True, + ) + + try: + output = json.loads(result.stdout) + logging.debug(output) + if output["status"] == "success": + logging.info(f"Python script completed: {output['message']}") + else: + raise RuntimeError(f"Script reported error: {output['message']}") + except json.JSONDecodeError: + logging.info(f"Python script output: {result.stdout}") + + except CalledProcessError as e: + error_msg = e.stderr + logging.error(f"Installation script failed: {error_msg}") + raise RuntimeError(f"Installation script failed: {error_msg}") from e + + +def main(): + logging.info("Uninstalling the remorph EDW profiler scheduler...\n") + launch_uninstaller() + logging.info("Uninstallation complete.") + + +if __name__ == "__main__": + main() From f0371c7822ef93fa110c6d031ef1ceaa0102c572 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Mon, 28 Apr 2025 11:53:21 -0400 Subject: [PATCH 20/22] Add installation/uninstallation scripts for Linux --- .../assessments/scheduler/install/install.py | 4 +- .../install/linux/install_service_file.py | 43 +++++++++++++++++++ .../install/linux/uninstall_service_file.py | 31 +++++++++++++ .../scheduler/install/uninstall.py | 4 +- 4 files changed, 78 insertions(+), 4 deletions(-) create mode 100644 src/databricks/labs/remorph/assessments/scheduler/install/linux/install_service_file.py create mode 100644 src/databricks/labs/remorph/assessments/scheduler/install/linux/uninstall_service_file.py diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/install.py b/src/databricks/labs/remorph/assessments/scheduler/install/install.py index 3e5cbe3f19..2c50a7e67a 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/install/install.py +++ b/src/databricks/labs/remorph/assessments/scheduler/install/install.py @@ -39,9 +39,9 @@ def launch_installer(): if system == "Darwin": installer_path = "macos/install_launch_agent.py" elif system == "Linux": - installer_path = "linux/install_launch_agent.py" + installer_path = "linux/install_service_file.py" elif system == "Windows": - installer_path = "windows/install_launch_agent.py" + installer_path = "windows/install_service_file.py" else: raise RuntimeError(f"Unsupported operating system: {system}") diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/linux/install_service_file.py b/src/databricks/labs/remorph/assessments/scheduler/install/linux/install_service_file.py new file mode 100644 index 0000000000..7b364cb673 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/linux/install_service_file.py @@ -0,0 +1,43 @@ +import logging +from pathlib import Path +import subprocess +import shutil + + +def install_service_file(): + + # Copy the unit file to `/etc/systemd/system/` + system_files_dir = Path("/etc/systemd/system/") + unit_file_path = system_files_dir / "remorph_usage_collection.service" + shutil.copy2("remorph_usage_collection.service", unit_file_path) + + # Copy the timer file to `/etc/systemd/system/` + timer_file_path = system_files_dir / "remorph_usage_collection.timer" + shutil.copy2("remorph_usage_collection.timer", timer_file_path) + + # Ensure that the scheduler script exits too + scheduler_path = Path(__file__).parent.parent.parent / "pipeline_scheduler.py" + if not scheduler_path.exists(): + raise FileNotFoundError(f"Could not find the profiler scheduler: {scheduler_path}") + + # Reload the `systemd` process to pick up the new files + logging.info("Reloading systemd.") + subprocess.run(["sudo", "systemctl", "daemon-reload"], check=False) + + # Enable the system timer + logging.info("Enabling profiler scheduler.") + subprocess.run(["sudo", "systemctl", "enable", "remorph_usage_collection.timer"], check=False) + + # Start the system timer + logging.info("Starting profiler timer.") + subprocess.run(["sudo", "systemctl", "start", "remorph_usage_collection.timer"], check=False) + + +def main(): + logging.info("Installing Linux scheduler components...\n") + install_service_file() + logging.info("Installation complete.") + + +if __name__ == "__main__": + main() diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/linux/uninstall_service_file.py b/src/databricks/labs/remorph/assessments/scheduler/install/linux/uninstall_service_file.py new file mode 100644 index 0000000000..2176a5427c --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/linux/uninstall_service_file.py @@ -0,0 +1,31 @@ +import logging +from pathlib import Path +import subprocess + + +def uninstall_service_file(): + + # Stop the system timer + logging.info("Stoping profiler timer.") + subprocess.run(["sudo", "systemctl", "stop", "remorph_usage_collection.timer"], check=False) + + # Disbale the system timer + logging.info("Disabling profiler scheduler.") + subprocess.run(["sudo", "systemctl", "disable", "remorph_usage_collection.timer"], check=False) + + # Remove the unit file and timer file + system_files_dir = Path("/etc/systemd/system/") + unit_file_path = system_files_dir / "remorph_usage_collection.service" + timer_file_path = system_files_dir / "remorph_usage_collection.timer" + unit_file_path.unlink() + timer_file_path.unlink() + + +def main(): + logging.info("Uninstalling Linux scheduler components...\n") + uninstall_service_file() + logging.info("Uninstallation complete.") + + +if __name__ == "__main__": + main() diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/uninstall.py b/src/databricks/labs/remorph/assessments/scheduler/install/uninstall.py index 763804b853..4b81d4add9 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/install/uninstall.py +++ b/src/databricks/labs/remorph/assessments/scheduler/install/uninstall.py @@ -27,9 +27,9 @@ def launch_uninstaller(): if system == "Darwin": installer_path = "macos/uninstall_launch_agent.py" elif system == "Linux": - installer_path = "linux/uninstall_launch_agent.py" + installer_path = "linux/uninstall_service_file.py" elif system == "Windows": - installer_path = "windows/uninstall_launch_agent.py" + installer_path = "windows/uninstall_service_file.py" else: raise RuntimeError(f"Unsupported operating system: {system}") From ede927295639e888e4eb1dbe4e7aa2193b6f1224 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Mon, 28 Apr 2025 18:27:13 -0400 Subject: [PATCH 21/22] Add Windows installer/uninstaller --- docs/remorph/docs/scheduler/index.mdx | 1 + .../assessments/scheduler/install/install.py | 9 ++--- .../linux/remorph_usage_collection.service | 4 +-- .../linux/remorph_usage_collection.timer | 14 +++----- .../install/macos/install_launch_agent.py | 6 ++-- .../install/macos/uninstall_launch_agent.py | 4 +-- .../scheduler/install/uninstall.py | 9 ++--- .../install/windows/install_task_file.py | 34 ++++++++++++++++++ .../install/windows/uninstall_task_file.py | 27 ++++++++++++++ .../install/windows/usage_collection_task.xml | Bin 0 -> 3080 bytes 10 files changed, 78 insertions(+), 30 deletions(-) create mode 100644 src/databricks/labs/remorph/assessments/scheduler/install/windows/install_task_file.py create mode 100644 src/databricks/labs/remorph/assessments/scheduler/install/windows/uninstall_task_file.py create mode 100644 src/databricks/labs/remorph/assessments/scheduler/install/windows/usage_collection_task.xml diff --git a/docs/remorph/docs/scheduler/index.mdx b/docs/remorph/docs/scheduler/index.mdx index f76810b290..958f7b39c5 100644 --- a/docs/remorph/docs/scheduler/index.mdx +++ b/docs/remorph/docs/scheduler/index.mdx @@ -14,6 +14,7 @@ The usage collection scheduler can be installed on the following operating syste 1. MacOS 2. Linux +3. Windows ## Local vs. Remote Deployments diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/install.py b/src/databricks/labs/remorph/assessments/scheduler/install/install.py index 2c50a7e67a..66914be4b0 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/install/install.py +++ b/src/databricks/labs/remorph/assessments/scheduler/install/install.py @@ -7,12 +7,12 @@ import json import logging -PROFILER_DEPENDENCIES = [] +PROFILER_DEPENDENCIES: list[str] = [] def launch_installer(): - logging.info(f"Creating virtual environment.") + logging.info("Creating virtual environment.") # Build a virtual environment for launching installer with tempfile.TemporaryDirectory() as temp_dir: @@ -47,10 +47,7 @@ def launch_installer(): logging.info(f"Running scheduler installer for: {system}") result = run( - [ - str(venv_python), - str(installer_path) - ], + [str(venv_python), str(installer_path)], check=True, capture_output=True, text=True, diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service index 4785c4c78b..a947c6d103 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service +++ b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service @@ -3,8 +3,8 @@ Description=Remorph usage collection service After=network.target [Service] -ExecStart=/usr/bin/python3 ~/Downloads/remorph/scheduler/usage_collector.py -WorkingDirectory=~/Downloads/remorph/scheduler/ +ExecStart=/usr/bin/python3 $HOME/.databricks/remorph/assessments/scheduler/pipeline_scheduler.py +WorkingDirectory=$HOME/.databricks/remorph/assessments/scheduler/ StandardOutput=journal StandardError=journal Restart=always diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer index 2be924de8a..1d9660cecc 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer +++ b/src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer @@ -1,14 +1,10 @@ [Unit] -Description=Remorph usage collection every 15 minutes -After=network.target +Description=Timer for remorph usage collection set to execute every 15 minutes -[Service] -ExecStart=/usr/bin/python3 ~/Downloads/remorph/scheduler/usage_collector.py -WorkingDirectory=~/Downloads/remorph/scheduler/ -StandardOutput=journal -StandardError=journal -Restart=always -User=root +[Timer] +OnBootSec=5min +OnUnitActiveSec=15min +Unit=remorph_usage_collection.service [Install] WantedBy=multi-user.target diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/macos/install_launch_agent.py b/src/databricks/labs/remorph/assessments/scheduler/install/macos/install_launch_agent.py index 95269144a0..a0373296ce 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/install/macos/install_launch_agent.py +++ b/src/databricks/labs/remorph/assessments/scheduler/install/macos/install_launch_agent.py @@ -1,5 +1,4 @@ import logging -import os from pathlib import Path import subprocess import shutil @@ -7,8 +6,7 @@ def install_launch_agent(): # Ensure that the User's `LaunchAgents` dir exists - home_dir = os.getenv("HOME") - launch_agents_dir = home_dir / Path("Library/LaunchAgents") + launch_agents_dir = Path.home() / Path("Library/LaunchAgents") launch_agents_dir.mkdir(parents=True, exist_ok=True) plist_path = launch_agents_dir / "com.remorph.usagecollection.plist" @@ -30,7 +28,7 @@ def install_launch_agent(): # Register the profiler scheduler subprocess.run(["launchctl", "load", str(plist_path)], check=True) - logging.info(f"Loaded LaunchAgent com.remorph.usagecollection.plist successfully.") + logging.info("Loaded LaunchAgent com.remorph.usagecollection.plist successfully.") def main(): diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/macos/uninstall_launch_agent.py b/src/databricks/labs/remorph/assessments/scheduler/install/macos/uninstall_launch_agent.py index f5ba1f996f..115689deb5 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/install/macos/uninstall_launch_agent.py +++ b/src/databricks/labs/remorph/assessments/scheduler/install/macos/uninstall_launch_agent.py @@ -1,12 +1,10 @@ import logging -import os from pathlib import Path import subprocess def uninstall_launch_agent(): - home_dir = os.getenv("HOME") - launch_agents_dir = home_dir / Path("Library/LaunchAgents") + launch_agents_dir = Path.home() / Path("Library/LaunchAgents") plist_path = launch_agents_dir / "com.remorph.usagecollection.plist" # Check if the profiler scheduler has been previously registered diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/uninstall.py b/src/databricks/labs/remorph/assessments/scheduler/install/uninstall.py index 4b81d4add9..2f8f2e937e 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/install/uninstall.py +++ b/src/databricks/labs/remorph/assessments/scheduler/install/uninstall.py @@ -10,7 +10,7 @@ def launch_uninstaller(): - logging.info(f"Creating virtual environment.") + logging.info("Creating virtual environment.") # Create a temporary directory for the virtual environment with tempfile.TemporaryDirectory() as temp_dir: @@ -29,16 +29,13 @@ def launch_uninstaller(): elif system == "Linux": installer_path = "linux/uninstall_service_file.py" elif system == "Windows": - installer_path = "windows/uninstall_service_file.py" + installer_path = "windows/uninstall_task_file.py" else: raise RuntimeError(f"Unsupported operating system: {system}") logging.info(f"Running scheduler uninstaller for: {system}") result = run( - [ - str(venv_python), - str(installer_path) - ], + [str(venv_python), str(installer_path)], check=True, capture_output=True, text=True, diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/windows/install_task_file.py b/src/databricks/labs/remorph/assessments/scheduler/install/windows/install_task_file.py new file mode 100644 index 0000000000..fa91b9b5d6 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/windows/install_task_file.py @@ -0,0 +1,34 @@ +import logging +from pathlib import Path +import subprocess +import shutil + + +# Leveraging the Task manager in Windows has the following benefits: +# 1. No dependency on 3P software +# 2. Executes even if user is not logged in +# 3. Can be configured to run on schedule +# 4. Automatic start at boot +# 5. Can handle restart +# 6. Service-level execution w/ system permissions +def install_task_file(): + # Prepare the task file for import + logging.info("Copying task file to Downloads folder.") + downloads_path = Path.home() / "Downloads" + task_def_path = downloads_path / "usage_collection_task.xml" + shutil.copy2("usage_collection_task.xml", downloads_path) + + # TODO: Dynamically update the `REPLACE_ME` values in task template + # Import the task definition, force overwrite if exists + logging.info("Importing task from task file.") + subprocess.run(["schtasks", "/Create", "/TN", "Remorph Usage Collection", "/XML", task_def_path, "/F"], check=False) + + +def main(): + logging.info("Installing Windows scheduler components...\n") + install_task_file() + logging.info("Installation complete.") + + +if __name__ == "__main__": + main() diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/windows/uninstall_task_file.py b/src/databricks/labs/remorph/assessments/scheduler/install/windows/uninstall_task_file.py new file mode 100644 index 0000000000..45b99ad4f8 --- /dev/null +++ b/src/databricks/labs/remorph/assessments/scheduler/install/windows/uninstall_task_file.py @@ -0,0 +1,27 @@ +import logging +from pathlib import Path +import subprocess + + +def uninstall_task_file(): + # Delete the registered task + logging.info("Deleting scheduled task from task manager.") + subprocess.run(["schtasks", "/Delete", "/TN", "Remorph Usage Collection", "/F"], check=False) + + # Remove the imported task if still exists + logging.info("Removing task file from the Downloads folder.") + downloads_path = Path.home() / "Downloads" + task_def_path = downloads_path / "usage_collection_task.xml" + if task_def_path.exists(): + logging.info("Task file still exists. Removing.") + task_def_path.unlink() + + +def main(): + logging.info("Uninstalling Windows scheduler components...\n") + uninstall_task_file() + logging.info("Uninstallation complete.") + + +if __name__ == "__main__": + main() diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/windows/usage_collection_task.xml b/src/databricks/labs/remorph/assessments/scheduler/install/windows/usage_collection_task.xml new file mode 100644 index 0000000000000000000000000000000000000000..108bbb8e3be7f51fc0084445873eb7c8014ae78b GIT binary patch literal 3080 zcmbW3?N8cJ6vofz{T0OT!&Ar^xC@i z-9DoA1)W1{+6Vk~t%r}tjs6atKOZwfQ*nki=Qp#jw#zRe(vXM~=c&$p#7<1~gzEv# zy2CO-cj@r-*VY;_X%yIaJwX6-oX39{2GrTt~KY^^^a+ZdUyj zolITmGk57H4|s_;r@J1$6b6Q>%W65BKrM#K}hl3eG?G-~Mf-FXk$o}EZl zz*pfKZ(?x={mthsuT`wY8N3~EMP#vZHE9Mt=z(1Q)XeLm^~gPPRW7qD`o450;tU(v zkHL6~uYsFdnkFswd+dz)|Ks@Wd!us{T4jDEU$1G(v~p=y@dlh0N0pJLb0+ciJa?x| z(wRdeE(&Ll`yG}Ev8PBhXLSNQ_=wnvk^SUay2`xOOPJj;z7xljF@BfeRdmgl4zuDLb05YjOTE`3 zm$lw1dIv4oSFg9T2xDaR{Owp6m2fa0V>^p5hG)I1HHIasp|~-pn_|jiX~cX7-{T=f zpn0ITMHSmUlZEAJGTJt22sY)}8?Y(NcCS>fH8~QGKtQe!rMG~dSMFIcZQystZ zE%ZfS#aJ!Q!+_H~aW$EfM-!W}cr$9J_qAqk#QhfC{FhP8*RF8*xv=gxi!F^}{@yCA Os4An~E}82)o#%hXB=#}@ literal 0 HcmV?d00001 From 87b120d552e02cb054cae60e98b5525b45b33160 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Tue, 29 Apr 2025 10:36:25 -0400 Subject: [PATCH 22/22] Add prerequisite check in installation script. --- .../assessments/scheduler/install/install.py | 44 ++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/src/databricks/labs/remorph/assessments/scheduler/install/install.py b/src/databricks/labs/remorph/assessments/scheduler/install/install.py index 66914be4b0..0f7f25baa7 100644 --- a/src/databricks/labs/remorph/assessments/scheduler/install/install.py +++ b/src/databricks/labs/remorph/assessments/scheduler/install/install.py @@ -69,13 +69,45 @@ def launch_installer(): raise RuntimeError(f"Installation script failed: {error_msg}") from e +def is_systemd_installed() -> bool: + """ + Validates if systemd is installed on the host. + Returns: + bool - True if systemd is installed + False if systemd not found on the system + """ + try: + run( + ["systemctl", "-version"], + capture_output=True, + check=True + ) + return True + except CalledProcessError as cpe: + logging.error(f"Systemd not found on the system: {cpe}") + return False + + +def validate_prerequisites(): + """ + Validates that the prerequisites for installing the profiler scheduler + are met. + 1. The user must be using a Mac, Linux, or Windows OS + 2. Python 3.11+ must be installed + """ + system_info = platform.system().lower() + if platform.system().lower() not in {"darwin", "linux", "windows"}: + raise RuntimeError("Unsupported operating system detected.") + python_version_info = [int(v) for v in platform.python_version().split(".")] + if python_version_info[0] != 3 or python_version_info[1] < 11: + raise RuntimeError("Python version must be >= Python 3.11") + if system_info == "darwin" and not is_systemd_installed(): + raise RuntimeError("Systemd is required.") + + def main(): - # Prerequisites: - # The user has already cloned the remorph project and is executing the `install.py` script - # So we can skip cloning the remorph project - # The user must be using a Mac, Linux, or Windows OS - # Python 3.11+ must be installed - logging.info("Installing the remorph EDW profiler...\n") + logging.info("Installing the remorph profiler scheduler...\n") + validate_prerequisites() launch_installer() logging.info("Installation complete.")