Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/remorph/docs/profiler/index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
sidebar_position: 1
---

## What is the Remorph Profiler?

- The profiler collects usage and performance metrics from a source warehouse
- It leverages built-in scheduling components of the host operating system to execute usage collection queries

## Why is the profiler usage collection needed?

- Some data warehouses do not preserve query history for longer than a few days or are limited to only a few rows. As an example, Amazon Redshift only preserves usage history for around 2-5 days
- Having adequate usage history is critical in providing an _accurate_ Total Cost of Ownership (TCO) estimate
- Having too little history could produce an inaccurate representation of data warehouse query patterns and system utilization

## Supported operating systems

The profiler usage collection can be installed on the following operating systems:

1. MacOS
2. Linux
3. [Windows](./windows/windows_installation.mdx)

## How often does the profiler run?

The profiler system task is scheduled to run every 15 minutes. When running, the system task checks the local state to see if usage collection has been run for the day and if there are any gaps in the usage history that should be reconciled.

## Automatic restart

The profiler usage collection service utilizes the host operating system to run as a background process. As a result, the process will automatically resume processing after a system restart; no intervention is needed during a host system restart. The profiler will automatically detect usage history gaps and proactively backfill when possible.
47 changes: 47 additions & 0 deletions docs/remorph/docs/profiler/windows/windows_installation.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
---
sidebar_position: 1
title: Windows Installation
---

# (Windows) Installing the Usage Collection Scheduler

## Prerequisites

The following requirements must be met before running the profiler scheduler installer:
1. Python 3.10 (or higher)
3. Administrator access to the Windows Task Manager
4. Cloned copy of the `remorph` project

## Running the installer

The profiler installer will create and register a new Windows system task under the current user's identity:

- **Remorph Usage Collection** - A system task that schedules the usage collection pipeline as a system service

### Installation steps:

1. Open a new CMD window using Administrator privileges and navigate to the profiler directory:

```bash
> cd $HOME\\.databricks\\labs\\remorph\\assessments\\profiler\\
```

2. Run the installer script:

```bash
> python install.py
```

### Un-installation steps:

1. Open a new CMD window using Administrator privileges and navigate to the profiler directory:

```bash
> cd $HOME\\.databricks\\labs\\remorph\\assessments\\profiler\\
```

2. Run the uninstaller script:

```bash
> python uninstall.py
```
Empty file.
27 changes: 27 additions & 0 deletions src/databricks/labs/remorph/assessments/duckdb_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import duckdb


class DuckDBManager:
"""Manages a connection to a single DuckDB database instance."""

_instance = None
_connection = None

def __new__(cls, db_path: str = "remorph_profiler_db.duckdb"):
if cls._instance is None:
cls._instance = super(DuckDBManager, cls).__new__(cls)
cls._instance._init_connection(db_path)
return cls._instance

def _init_connection(self, db_path: str):
self._db_path = db_path
self._connection = duckdb.connect(database=db_path)

def get_connection(self):
return self._connection

def close_connection(self):
if self._connection is not None:
self._connection.close()
DuckDBManager._instance = None
DuckDBManager._connection = None
12 changes: 10 additions & 2 deletions src/databricks/labs/remorph/assessments/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import subprocess
from enum import Enum
import yaml
import duckdb

Expand All @@ -17,6 +18,12 @@
DB_NAME = "profiler_extract.db"


class StepExecutionStatus(Enum):
COMPLETE = "COMPLETE"
ERROR = "ERROR"
SKIPPED = "SKIPPED"


class PipelineClass:
def __init__(self, config: PipelineConfig, executor: DatabaseManager):
self.config = config
Expand All @@ -28,16 +35,17 @@ def execute(self):
for step in self.config.steps:
if step.flag == "active":
logging.debug(f"Executing step: {step.name}")
self._execute_step(step)
self.execute_step(step)
logging.info("Pipeline execution completed")

def _execute_step(self, step: Step):
def execute_step(self, step: Step):
if step.type == "sql":
self._execute_sql_step(step)
elif step.type == "python":
self._execute_python_step(step)
else:
logging.error(f"Unsupported step type: {step.type}")
return StepExecutionStatus.COMPLETE

def _execute_sql_step(self, step: Step):
logging.debug(f"Reading query from file: {step.extract_source}")
Expand Down
160 changes: 160 additions & 0 deletions src/databricks/labs/remorph/assessments/pipeline_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import time
import logging
from dataclasses import dataclass
from enum import Enum

from datetime import datetime, timezone
from databricks.labs.remorph.assessments.duckdb_manager import DuckDBManager
from databricks.labs.remorph.assessments.pipeline import PipelineClass, StepExecutionStatus
from databricks.labs.remorph.assessments.profiler_config import Step


def get_hours_since_last_run(last_run_datetime: datetime) -> int:
"""Returns the number of full hours that have elapsed since the given last run."""
elapsed_time = datetime.now(timezone.utc) - last_run_datetime
elapsed_seconds = elapsed_time.total_seconds()
return round(elapsed_seconds // 3600)


def get_days_since_last_run(last_run_datetime: datetime) -> int:
"""Returns the number of full days that have elapsed since the given last run."""
elapsed_time = datetime.now(timezone.utc) - last_run_datetime
elapsed_seconds = elapsed_time.total_seconds()
return round(elapsed_seconds // 86400)


class ScheduledFrequency(Enum):
ONCE = "ONCE"
DAILY = "DAILY"
WEEKLY = "WEEKLY"


@dataclass
class PollingStatus:
polling_dt: datetime
pipeline_name: str
step_name: str
status: str


class PipelineScheduler:
"""
Executes a Pipeline's steps according to predefined Step schedules.
"""

def __init__(self, pipelines: list[PipelineClass], polling_interval_secs: int = 5):

self.duckdb_connection = DuckDBManager().get_connection()
self.polling_interval = polling_interval_secs
self.pipelines: list[PipelineClass] = pipelines

# Create a table in DuckDB for maintaining pipeline state
self._init_db()

def _init_db(self):
"""Initializes a DuckDB database to store pipeline step execution state."""
logging.info("Initializing pipeline state database...")
self.duckdb_connection.execute(
query="""
CREATE TABLE IF NOT EXISTS pipeline_step_state (
step_name TEXT,
pipeline_name TEXT,
last_run TIMESTAMPTZ,
status TEXT,
PRIMARY KEY (pipeline_name, step_name)
)
"""
)
logging.info("DuckDB state table is initialized.")

def _get_last_run_time(self, pipeline_name: str, step_name: str) -> tuple[datetime | None, str | None]:
"""Fetches the last execution time of a pipeline step from the database."""
full_step_name = f"{pipeline_name}__{step_name}"
last_run = None
status = None
result = self.duckdb_connection.execute(
query="SELECT last_run, status FROM pipeline_step_state WHERE step_name = ?", parameters=[full_step_name]
).fetchone()
if result is not None:
last_run, status = result
return last_run, status

def _record_run_time(self, pipeline_name: str, step_name: str, status: str = StepExecutionStatus.COMPLETE.value):
"""Records the latest execution time of a pipeline step."""
now = datetime.now(timezone.utc)
full_step_name = f"{pipeline_name}__{step_name}"
self.duckdb_connection.execute(
query="""
INSERT INTO pipeline_step_state (step_name, pipeline_name, last_run, status)
VALUES (?, ?, ?, ?)
ON CONFLICT(pipeline_name, step_name)
DO UPDATE SET
last_run = EXCLUDED.last_run,
status = EXCLUDED.status
""",
parameters=[full_step_name, pipeline_name, now, status],
)

def _should_run(self, pipeline_name: str, step: Step) -> bool:
"""Determines if a pipeline step should run based on its schedule."""
scheduled_frequency = step.frequency
last_run_time, status = self._get_last_run_time(pipeline_name, step.name)
if last_run_time is None or status == StepExecutionStatus.ERROR.value:
# First time running the Step
should_run = True
logging.info(f"First time running the step: '{step.name}'.")
# The Step has been run once already
elif scheduled_frequency == ScheduledFrequency.ONCE.value:
logging.info(f"Step '{step.name}' has already been run once. Skipping.")
should_run = False
# Check if it's been >= 24 hours since the last run
elif scheduled_frequency == ScheduledFrequency.DAILY.value and get_hours_since_last_run(last_run_time) >= 24:
should_run = True
logging.info(f"Running daily step '{step.name}' now.")
# Check if it's been >= 7 days since the last run
elif scheduled_frequency == ScheduledFrequency.WEEKLY.value and get_days_since_last_run(last_run_time) >= 7:
should_run = True
logging.info(f"Running weekly step '{step.name}' now.")
else:
# None of the triggering frequency conditions have been met
should_run = False
return should_run

def _run_step(self, pipeline: PipelineClass, step: Step) -> str:
"""Executes a pipeline step if it's time to run."""
if self._should_run(pipeline.config.name, step):
status = str(pipeline.execute_step(step).value)
self._record_run_time(pipeline.config.name, step.name, status)
else:
status = StepExecutionStatus.SKIPPED.value
return status

def run(self, num_polling_cycles: int = 1) -> list[PollingStatus]:
"""
Loops over a pipeline's steps and checks if step has been executed
according to its scheduled frequency.
"""
logging.info("PipelineScheduler has started...")
cycle_counter = 0
polling_status = []
while cycle_counter < num_polling_cycles:

# Loop through the list of scheduled pipelines
# TODO: Parallelize this in the future to be more efficient
for pipeline in self.pipelines:
pipeline_steps = pipeline.config.steps
for pipeline_step in pipeline_steps:
# Inspect the execution frequency of the Step
# Possible frequencies include: "once", "daily", "weekly" (see `ScheduledFrequency` enum)
logging.info(f"Evaluating scheduling step '{pipeline_step.name}'.")
step_exec_status = self._run_step(pipeline, pipeline_step)
execution_status = PollingStatus(
datetime.now(timezone.utc), pipeline.config.name, pipeline_step.name, step_exec_status
)
polling_status.append(execution_status)

# Wait a bit before polling status
time.sleep(self.polling_interval)
cycle_counter += 1

return polling_status
Empty file.
Loading
Loading