-
Notifications
You must be signed in to change notification settings - Fork 77
Add Pipeline Scheduler #1522
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Pipeline Scheduler #1522
Changes from all commits
8cee97d
8e3d6f3
5c9aafb
ddc4a11
cf8725e
7774267
5893ca9
6131a7f
cca4b22
a762af5
490985d
103d867
e77d7b4
17ff6f9
c1a8e2b
e072ccb
db13f9d
f3dc5c3
919307e
a6a6c4f
f0371c7
ede9272
87b120d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| --- | ||
| sidebar_position: 1 | ||
| --- | ||
|
|
||
| # Installing the Usage Collection Scheduler | ||
|
|
||
| ## Why do we need a usage collection scheduler? | ||
| - Some data warehouses do not preserve query history for longer than a few days or are limited to only a few rows. As an example, Amazon Redshift only preserves usage history for around 2-5 days | ||
| - Having adequate usage history is critical to provide an accurate Total Cost of Ownership (TCO) estimate | ||
| - Having too little history could produce an inaccurate representation of data warehouse query patterns and system utilization | ||
|
|
||
| ## Supported operating systems | ||
| The usage collection scheduler can be installed on the following operating systems: | ||
|
|
||
| 1. MacOS | ||
| 2. Linux | ||
| 3. Windows | ||
|
|
||
| ## Local vs. Remote Deployments | ||
|
|
||
| The usage collection scheduler can be executed in the following deployments: | ||
|
|
||
| 1. **Local scheduler** - Scheduler is installed on a local laptop or workstation, provided that the local system can connect to a bastion host and forward connectivity to the data warehouse | ||
| 2. **Remote scheduler** - Scheduler is installed directly on a bastion host in a separate subnet or a compute instance deployed within the data warehouse's subnet | ||
|
|
||
|
|
||
| ## Automatic restart | ||
|
|
||
| The usage collection scheduler utilizes the host operating system to run as a background process. As a result, the process will automatically resume processing after a system restart; no intervention is needed during a host system restart. The scheduler will automatically detect usage history gaps and proactively backfill when possible. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| --- | ||
| sidebar_position: 2 | ||
| title: Linux Installation | ||
| --- | ||
|
|
||
| # (Linux) Installing the Usage Collection Scheduler | ||
|
|
||
| ## Installation Steps: | ||
|
|
||
| 1. Open a new Terminal window and navigate to the remorph repository | ||
|
|
||
| 2. Copy the unit file to `/etc/systemd/system/`: | ||
|
|
||
| ```bash | ||
| $ cp ./src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service /etc/systemd/system/ | ||
| ``` | ||
|
|
||
| 3. Copy the timer file to `/etc/systemd/system/`: | ||
|
|
||
| ```bash | ||
| $ cp ./src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer /etc/systemd/system/ | ||
| ``` | ||
|
|
||
| 4. Reload the `systemd` process to pick up the new files: | ||
|
|
||
| ```bash | ||
| $ sudo systemctl daemon-reload | ||
| ``` | ||
|
|
||
| 5. Enable the system timer | ||
|
|
||
| ```bash | ||
| $ sudo systemctl enable remorph_usage_collection.timer | ||
| ``` | ||
|
|
||
| 6. Start the system timer | ||
|
|
||
| ```bash | ||
| $ sudo systemctl start remorph_usage_collection.timer | ||
| ``` | ||
|
|
||
| ## Description of the Unit File Elements | ||
|
|
||
| 1. **Description**: A description of the service | ||
|
|
||
| 2. **After**: Ensures the service starts only after the specified system service is up (network in this case) | ||
|
|
||
| 3. **ExecStart**: Specifies the command and arguments (python3 interpreter and the script path) | ||
|
|
||
| 4. **WorkingDirectory**: The directory where the script should run | ||
|
|
||
| 5. **StandardOutput**: Redirects the standard out to the system logs | ||
|
|
||
| 6. **StandardError**: Redirects the error out to the system logs | ||
|
|
||
| 7. **Restart**: Restarts the script if the system reboots | ||
|
|
||
| 8. **User**: Runs the script as the specified user | ||
|
|
||
| 9. **WantedBy**: Ensures the service starts when the system reaches multi-user mode | ||
|
|
||
|
|
||
| ## Description of the Timer File Elements | ||
|
|
||
| 1. **Description**: A description of the timer | ||
|
|
||
| 2. **Persistent** : Ensures the script runs after reboots | ||
|
|
||
| 3. **OnBootSec**: Ensures the script runs 1 min after bootin | ||
|
|
||
| 4. **OnUnitActiveSec**: Runs the script every 15 minutes | ||
|
|
||
| 5. **WantedBy**: Ensures the timer is activated when the system is running timers |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| --- | ||
| sidebar_position: 1 | ||
| title: MacOS Installation | ||
| --- | ||
|
|
||
| # (MacOS) Installing the Usage Collection Scheduler | ||
|
|
||
| ## Installation Steps: | ||
|
|
||
| 1. Open a new Terminal window and navigate to the remorph repository | ||
|
|
||
| 2. Copy the plist file to `~/Library/LaunchAgents/` for a user-specific task or `/Library/LaunchDaemons/` for a system-wide task: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most users won't know the difference, I don't think? My advice here is to advise one of these, or help the user choose. (My preference: user-specific, given the rest of the instructions assume this.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, totally agree. I'll update the PR to install only a user-specific agent. |
||
|
|
||
| ```bash | ||
| $ cp ./src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist ~/Library/LaunchAgents/ | ||
| ``` | ||
|
|
||
| 3. Next, load the process by executing the following command: | ||
|
|
||
| ```bash | ||
| $ launchctl load ~/Library/LaunchAgents/com.remorph.usagecollection.plist | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this work, in sequio and above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems to work. I've been running this process on my Mac for over a month now. Happy to update to another approach if you feel it's more appropriate. |
||
| ``` | ||
|
|
||
| 4. Grant the usage collection script with execution permissions: | ||
|
|
||
| ```bash | ||
| $ chmod +x ./src/databricks/labs/remorph/assessments/scheduler/usage_collector.py | ||
| ``` | ||
|
Comment on lines
+24
to
+28
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be earlier… step 2? Otherwise I think the first run will already be triggered (and fail). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great catch. Thanks |
||
|
|
||
| ## Description of the `plist` Elements | ||
|
|
||
| 1. **Label**: A unique identifier for the job | ||
|
|
||
| 2. **ProgramArguments**: Specifies the command and arguments (python3 interpreter and the script path) | ||
|
|
||
| 3. **StartInterval**: Runs the script every 900 seconds (15 minutes) | ||
|
|
||
| 4. **RunAtLoad**: Ensures the script runs immediately after loading | ||
|
|
||
| 5. **StandardOutPath**: Logs output to `~/Downloads/remorph/scheduler/stdout` for debugging | ||
|
|
||
| 6. **StandardErrorPath**: Logs errors to `~/Downloads/remorph/scheduler/stderr` for debugging | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| import duckdb | ||
|
|
||
|
|
||
| class DuckDBManager: | ||
| """Manages a connection to a single DuckDB database instance.""" | ||
|
|
||
| _instance = None | ||
| _connection = None | ||
|
|
||
| def __new__(cls, db_path: str = "remorph_profiler_db.duckdb"): | ||
| if cls._instance is None: | ||
| cls._instance = super(DuckDBManager, cls).__new__(cls) | ||
| cls._instance._init_connection(db_path) | ||
| return cls._instance | ||
|
|
||
| def _init_connection(self, db_path: str): | ||
| self._db_path = db_path | ||
| self._connection = duckdb.connect(database=db_path) | ||
|
|
||
| def get_connection(self): | ||
| return self._connection | ||
|
|
||
| def close_connection(self): | ||
| if self._connection is not None: | ||
| self._connection.close() | ||
| DuckDBManager._instance = None | ||
| DuckDBManager._connection = None |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| import json | ||
| import logging | ||
| import subprocess | ||
| from enum import Enum | ||
| import yaml | ||
| import duckdb | ||
|
|
||
|
|
@@ -17,6 +18,12 @@ | |
| DB_NAME = "profiler_extract.db" | ||
|
|
||
|
|
||
| class StepExecutionStatus(Enum): | ||
| COMPLETE = "COMPLETE" | ||
| ERROR = "ERROR" | ||
| SKIPPED = "SKIPPED" | ||
|
|
||
|
|
||
| class PipelineClass: | ||
| def __init__(self, config: PipelineConfig, executor: DatabaseManager): | ||
| self.config = config | ||
|
|
@@ -28,16 +35,24 @@ def execute(self): | |
| for step in self.config.steps: | ||
| if step.flag == "active": | ||
| logging.debug(f"Executing step: {step.name}") | ||
| self._execute_step(step) | ||
| self.execute_step(step) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you expect to people to run There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I meant to sync with you. I had to make this a public function so that the scheduler or another process can parse the pipeline config and execute the step based upon the defined frequency. What do you think? I know you made this private so that the pipeline class can execute entirely. But there needs to be a hook for the scheduler class or the pipeline class needs to parse the scheduled frequency info. |
||
| logging.info("Pipeline execution completed") | ||
|
|
||
| def _execute_step(self, step: Step): | ||
| if step.type == "sql": | ||
| self._execute_sql_step(step) | ||
| elif step.type == "python": | ||
| self._execute_python_step(step) | ||
| else: | ||
| logging.error(f"Unsupported step type: {step.type}") | ||
| def execute_step(self, step: Step) -> Enum: | ||
| try: | ||
| if step.type == "sql": | ||
| logging.info(f"Executing SQL step {step.name}") | ||
| self._execute_sql_step(step) | ||
| elif step.type == "python": | ||
| logging.info(f"Executing Python step {step.name}") | ||
| self._execute_python_step(step) | ||
| else: | ||
| logging.error(f"Unsupported step type: {step.type}") | ||
| raise RuntimeError(f"Unsupported step type: {step.type}") | ||
| except RuntimeError as e: | ||
| logging.error(e) | ||
| return StepExecutionStatus.ERROR | ||
| return StepExecutionStatus.COMPLETE | ||
|
|
||
| def _execute_sql_step(self, step: Step): | ||
| logging.debug(f"Reading query from file: {step.extract_source}") | ||
|
|
@@ -86,7 +101,8 @@ def _save_to_db(self, result, step_name: str, mode: str, batch_size: int = 1000) | |
| self._create_dir(self.db_path_prefix) | ||
| conn = duckdb.connect(str(self.db_path_prefix) + '/' + DB_NAME) | ||
| columns = result.keys() | ||
| # TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable | ||
| # TODO: Add support for figuring out data types from SQLALCHEMY | ||
| # result object result.cursor.description is not reliable | ||
| schema = ' STRING, '.join(columns) + ' STRING' | ||
|
|
||
| # Handle write modes | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,116 @@ | ||
| import platform | ||
| from pathlib import Path | ||
| from subprocess import run, CalledProcessError | ||
|
|
||
| import venv | ||
| import tempfile | ||
| import json | ||
| import logging | ||
|
|
||
| PROFILER_DEPENDENCIES: list[str] = [] | ||
|
|
||
|
|
||
| def launch_installer(): | ||
|
|
||
| logging.info("Creating virtual environment.") | ||
|
|
||
| # Build a virtual environment for launching installer | ||
| with tempfile.TemporaryDirectory() as temp_dir: | ||
| venv_dir = Path(temp_dir) / "profiler_venv" | ||
| builder = venv.EnvBuilder(with_pip=True) | ||
| builder.create(venv_dir) | ||
| venv_python = Path(venv_dir) / "bin" / "python" | ||
| venv_pip = Path(venv_dir) / "bin" / "pip" | ||
| logging.info("Venv setup complete.") | ||
|
|
||
| # Install dependencies in the virtual environment | ||
| if len(PROFILER_DEPENDENCIES) > 0: | ||
| logging.info(f"Installing dependencies: {', '.join(PROFILER_DEPENDENCIES)}") | ||
| try: | ||
| run([str(venv_pip), "install", *PROFILER_DEPENDENCIES], check=True, capture_output=True, text=True) | ||
| except CalledProcessError as e: | ||
| logging.error(f"Failed to install dependencies: {e.stderr}") | ||
| raise RuntimeError(f"Failed to install dependencies: {e.stderr}") from e | ||
|
|
||
| # Execute the Python script using the virtual environment's Python interpreter | ||
| try: | ||
| # Detect the host operating system and execute the appropriate script | ||
| system = platform.system() | ||
| if system == "Darwin": | ||
| installer_path = "macos/install_launch_agent.py" | ||
| elif system == "Linux": | ||
| installer_path = "linux/install_service_file.py" | ||
| elif system == "Windows": | ||
| installer_path = "windows/install_service_file.py" | ||
| else: | ||
| raise RuntimeError(f"Unsupported operating system: {system}") | ||
|
|
||
| logging.info(f"Running scheduler installer for: {system}") | ||
| result = run( | ||
| [str(venv_python), str(installer_path)], | ||
| check=True, | ||
| capture_output=True, | ||
| text=True, | ||
| ) | ||
|
|
||
| try: | ||
| output = json.loads(result.stdout) | ||
| logging.debug(output) | ||
| if output["status"] == "success": | ||
| logging.info(f"Python script completed: {output['message']}") | ||
| else: | ||
| raise RuntimeError(f"Script reported error: {output['message']}") | ||
| except json.JSONDecodeError: | ||
| logging.info(f"Python script output: {result.stdout}") | ||
|
|
||
| except CalledProcessError as e: | ||
| error_msg = e.stderr | ||
| logging.error(f"Installation script failed: {error_msg}") | ||
| raise RuntimeError(f"Installation script failed: {error_msg}") from e | ||
|
|
||
|
|
||
| def is_systemd_installed() -> bool: | ||
| """ | ||
| Validates if systemd is installed on the host. | ||
| Returns: | ||
| bool - True if systemd is installed | ||
| False if systemd not found on the system | ||
| """ | ||
| try: | ||
| run( | ||
| ["systemctl", "-version"], | ||
| capture_output=True, | ||
| check=True | ||
| ) | ||
| return True | ||
| except CalledProcessError as cpe: | ||
| logging.error(f"Systemd not found on the system: {cpe}") | ||
| return False | ||
|
|
||
|
|
||
| def validate_prerequisites(): | ||
| """ | ||
| Validates that the prerequisites for installing the profiler scheduler | ||
| are met. | ||
| 1. The user must be using a Mac, Linux, or Windows OS | ||
| 2. Python 3.11+ must be installed | ||
| """ | ||
| system_info = platform.system().lower() | ||
| if platform.system().lower() not in {"darwin", "linux", "windows"}: | ||
| raise RuntimeError("Unsupported operating system detected.") | ||
| python_version_info = [int(v) for v in platform.python_version().split(".")] | ||
| if python_version_info[0] != 3 or python_version_info[1] < 11: | ||
| raise RuntimeError("Python version must be >= Python 3.11") | ||
| if system_info == "darwin" and not is_systemd_installed(): | ||
| raise RuntimeError("Systemd is required.") | ||
|
|
||
|
|
||
| def main(): | ||
| logging.info("Installing the remorph profiler scheduler...\n") | ||
| validate_prerequisites() | ||
| launch_installer() | ||
| logging.info("Installation complete.") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| import logging | ||
| from pathlib import Path | ||
| import subprocess | ||
| import shutil | ||
|
|
||
|
|
||
| def install_service_file(): | ||
|
|
||
| # Copy the unit file to `/etc/systemd/system/` | ||
| system_files_dir = Path("/etc/systemd/system/") | ||
| unit_file_path = system_files_dir / "remorph_usage_collection.service" | ||
| shutil.copy2("remorph_usage_collection.service", unit_file_path) | ||
|
|
||
| # Copy the timer file to `/etc/systemd/system/` | ||
| timer_file_path = system_files_dir / "remorph_usage_collection.timer" | ||
| shutil.copy2("remorph_usage_collection.timer", timer_file_path) | ||
|
|
||
| # Ensure that the scheduler script exits too | ||
| scheduler_path = Path(__file__).parent.parent.parent / "pipeline_scheduler.py" | ||
| if not scheduler_path.exists(): | ||
| raise FileNotFoundError(f"Could not find the profiler scheduler: {scheduler_path}") | ||
|
|
||
| # Reload the `systemd` process to pick up the new files | ||
| logging.info("Reloading systemd.") | ||
| subprocess.run(["sudo", "systemctl", "daemon-reload"], check=False) | ||
|
|
||
| # Enable the system timer | ||
| logging.info("Enabling profiler scheduler.") | ||
| subprocess.run(["sudo", "systemctl", "enable", "remorph_usage_collection.timer"], check=False) | ||
|
|
||
| # Start the system timer | ||
| logging.info("Starting profiler timer.") | ||
| subprocess.run(["sudo", "systemctl", "start", "remorph_usage_collection.timer"], check=False) | ||
|
|
||
|
|
||
| def main(): | ||
| logging.info("Installing Linux scheduler components...\n") | ||
| install_service_file() | ||
| logging.info("Installation complete.") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These instructions are clear, although I think that we also need to describe how to remove it and clean up afterwards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point. I didn't even think about uninstallation.