Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c2e0f7b
initial attempt of DAG
grallewellyn Jul 14, 2025
c8848ae
updating DAG and dockerfile to run process
grallewellyn Jul 15, 2025
d15cbe1
Merge branch 'develop' into run-ogc-process
grallewellyn Jul 15, 2025
eb7b628
successfully submitting a job but not monitoring yet
grallewellyn Jul 16, 2025
f260bca
can successfully run and monitor job now
grallewellyn Jul 16, 2025
eeec99f
cleaned up comments and print statements
grallewellyn Jul 17, 2025
84d3d6a
removed Unity specific params
grallewellyn Jul 21, 2025
4f6a4d5
idea of custom operators i want to follow
grallewellyn Jul 21, 2025
e1f9bd7
still working on dropdown progress, also attempting approach to get k…
grallewellyn Jul 22, 2025
ee785a4
tried another way for the custom operators approach
grallewellyn Jul 22, 2025
da99125
AI attempts of a dynamic UI to load inputs for processes
grallewellyn Aug 1, 2025
2957854
deleted dynamic attempts
grallewellyn Aug 1, 2025
2c7973a
deleted other dynamic UI functions
grallewellyn Aug 1, 2025
677ec3c
cleaned up DAG
grallewellyn Aug 4, 2025
e840e15
ran precommit
grallewellyn Aug 4, 2025
93cb0e2
corrected failing test github that runs locally
grallewellyn Aug 4, 2025
2f05e8f
added to post deployment script
grallewellyn Aug 6, 2025
4746f75
ran precommit
grallewellyn Aug 6, 2025
c850ca3
Merge branch 'develop' into 438-run-ogc-process
grallewellyn Aug 6, 2025
02266b0
updated ssm to point to new mdps dev account pgt token
grallewellyn Aug 7, 2025
4382513
Merge branch '438-run-ogc-process' of github.com:unity-sds/unity-sps …
grallewellyn Aug 7, 2025
7e9d815
updated default queue to be maap-dps-worker-cardamom since that is th…
grallewellyn Aug 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 314 additions & 0 deletions airflow/dags/run_ogc_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
"""
DAG with custom SPSOGCOperator that subclasses KubernetesPodOperator
for OGC process execution with SPS-specific functionality.
"""

import json
import logging
import re
from datetime import datetime

import requests
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator, get_current_context
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.secret import Secret as AirflowK8sSecret
from airflow.utils.trigger_rule import TriggerRule
from kubernetes.client import models as k8s
from unity_sps_utils import POD_LABEL, POD_NAMESPACE, get_affinity

PROCESSES_ENDPOINT = "https://api.dit.maap-project.org/api/ogc/processes"


def fetch_ogc_processes():
"""Fetch available processes from the OGC API and create mapping."""
try:
response = requests.get(PROCESSES_ENDPOINT, timeout=30)
response.raise_for_status()

processes_data = response.json()
process_mapping = {}
dropdown_options = []

for process in processes_data.get("processes", []):
process_id = process.get("id")
process_version = process.get("version")

# Extract numerical ID from links
numerical_id = None
for link in process.get("links", []):
if link.get("rel") == "self":
href = link.get("href", "")
# Extract number from href like "/ogc/processes/7"
match = re.search(r"/processes/(\d+)$", href)
if match:
numerical_id = int(match.group(1))
break

if process_id and numerical_id:
display_name = f"{process_id}:{process_version}" if process_version else process_id
dropdown_options.append(display_name)
process_mapping[display_name] = numerical_id

return process_mapping, dropdown_options

except requests.RequestException as e:
logging.error(f"Failed to fetch processes: {e}")
# Return fallback mapping
return {"example-process:1.0": 1}, ["example-process:1.0"]
except Exception as e:
logging.error(f"Error processing OGC processes: {e}")
return {"example-process:1.0": 1}, ["example-process:1.0"]


# Constants
K8S_SECRET_NAME = "sps-app-credentials"
DOCKER_IMAGE = "jplmdps/ogc-job-runner:latest"
PROCESS_MAPPING, DROPDOWN_OPTIONS = fetch_ogc_processes()

# SPS-specific secrets
secret_env_vars = [
AirflowK8sSecret(
deploy_type="env",
deploy_target="MAAP_PGT",
secret=K8S_SECRET_NAME,
key="MAAP_PGT",
)
]


class SPSOGCOperator(KubernetesPodOperator):
"""
Custom operator for SPS OGC process execution that subclasses KubernetesPodOperator.

This operator encapsulates all SPS-specific configuration and provides a clean
interface for OGC process submission and monitoring.
"""

def __init__(
self,
operation_type: str,
selected_process: str = None,
job_inputs: str = None,
job_queue: str = None,
job_id: str = None,
**kwargs,
):
"""
Initialize the SPSOGCOperator.

Args:
operation_type: Either "submit" or "monitor"
selected_process: Process selection for submit operations
job_inputs: JSON string of job inputs for submit operations
job_queue: Queue name for submit operations
job_id: Job ID for monitor operations
"""
self.operation_type = operation_type
self.selected_process = selected_process
self.job_inputs = job_inputs
self.job_queue = job_queue
self.job_id = job_id

# Set SPS-specific defaults
kwargs.setdefault("namespace", POD_NAMESPACE)
kwargs.setdefault("image", DOCKER_IMAGE)
kwargs.setdefault("service_account_name", "airflow-worker")
kwargs.setdefault("secrets", secret_env_vars)
kwargs.setdefault("in_cluster", True)
kwargs.setdefault("get_logs", True)
kwargs.setdefault("startup_timeout_seconds", 600)
kwargs.setdefault("container_security_context", {"privileged": True})
kwargs.setdefault("container_logs", True)
kwargs.setdefault("labels", {"pod": POD_LABEL})
kwargs.setdefault("annotations", {"karpenter.sh/do-not-disrupt": "true"})
kwargs.setdefault(
"affinity",
get_affinity(
capacity_type=["spot"],
anti_affinity_label=POD_LABEL,
),
)
kwargs.setdefault("on_finish_action", "keep_pod")
kwargs.setdefault("is_delete_operator_pod", False)

# Build operation-specific environment variables
if operation_type == "submit":
kwargs["env_vars"] = self._build_submit_env_vars()
kwargs["name"] = f"ogc-submit-pod-{kwargs.get('task_id', 'unknown')}"
kwargs.setdefault("do_xcom_push", True) # Submit tasks need to return job ID
elif operation_type == "monitor":
kwargs["env_vars"] = self._build_monitor_env_vars()
kwargs["name"] = f"ogc-monitor-pod-{kwargs.get('task_id', 'unknown')}"
else:
raise ValueError(f"Invalid operation_type: {operation_type}. Must be 'submit' or 'monitor'")

super().__init__(**kwargs)

def _build_submit_env_vars(self):
"""Build environment variables for job submission."""
# Resolve numerical process ID from selected process
numerical_process_id = self._resolve_process_id()

return [
k8s.V1EnvVar(
name="SUBMIT_JOB_URL",
value="https://api.dit.maap-project.org/api/ogc/processes/{process_id}/execution",
),
k8s.V1EnvVar(name="PROCESS_ID", value=str(numerical_process_id)),
k8s.V1EnvVar(name="JOB_INPUTS", value=self.job_inputs or "{}"),
k8s.V1EnvVar(name="QUEUE", value=self.job_queue or "maap-dps-sandbox"),
k8s.V1EnvVar(name="SUBMIT_JOB", value="true"),
]

def _build_monitor_env_vars(self):
"""Build environment variables for job monitoring."""
return [
k8s.V1EnvVar(
name="MONITOR_JOB_URL",
value="https://api.dit.maap-project.org/api/ogc/jobs/{job_id}",
),
k8s.V1EnvVar(name="JOB_ID", value=self.job_id),
k8s.V1EnvVar(name="SUBMIT_JOB", value="false"),
]

def _resolve_process_id(self):
"""Resolve the selected process to a numerical process ID."""
if not self.selected_process:
raise ValueError("selected_process is required for submit operations")

# Handle templated values - they won't be resolved yet during __init__
if "{{" in str(self.selected_process):
# Return a template that will be resolved at runtime
return "{{ ti.xcom_pull(task_ids='Setup', key='return_value')['numerical_process_id'] }}"

# Direct lookup for non-templated values
numerical_id = PROCESS_MAPPING.get(self.selected_process)
if numerical_id is None:
self.log.warning(f"Process '{self.selected_process}' not found in mapping, defaulting to ID 1")
return 1

return numerical_id

def execute(self, context):
"""Execute the operator with additional SPS-specific logging."""
self.log.info(f"Starting SPS OGC {self.operation_type} operation")

if self.operation_type == "submit":
self.log.info(f"Selected process: {self.selected_process}")
self.log.info(f"Job queue: {self.job_queue}")
self.log.info(f"Job inputs: {self.job_inputs}")
elif self.operation_type == "monitor":
self.log.info(f"Monitoring job ID: {self.job_id}")

# Call parent execute method
result = super().execute(context)

self.log.info(f"SPS OGC {self.operation_type} operation completed")
return result


dag_default_args = {
"owner": "unity-sps",
"depends_on_past": False,
"start_date": datetime.utcfromtimestamp(0),
}

# --- DAG Definition ---

dag = DAG(
dag_id="run_ogc_process3",
description="Submits a job to an OGC process and monitors (using custom SPSOGCOperator)",
dag_display_name="Run an OGC Process (Custom Operator from KubernetesPodOperator)",
tags=["ogc", "job", "custom-operator"],
is_paused_upon_creation=False,
catchup=False,
schedule=None,
max_active_runs=10,
default_args=dag_default_args,
params={
"selected_process": Param(
default=DROPDOWN_OPTIONS[0] if DROPDOWN_OPTIONS else "Error loading dropdown",
enum=DROPDOWN_OPTIONS,
title="Process Selection",
description="Select a process to execute.",
),
"queue": Param(
"maap-dps-sandbox",
type="string",
title="Queue",
description="The MAAP queue to submit the job to",
),
"job_inputs": Param(
"{}",
type="string",
title="Job Inputs",
description="A JSON string representing the inputs payload for the job.",
),
},
)

# --- Task Definitions ---


def setup(ti=None, **context):
"""Task that logs DAG parameters and process mapping information."""

logging.info("Starting OGC job submission and monitoring DAG (Custom Operator Version).")
logging.info(f"Parameters received: {context['params']}")
logging.info(f"Available processes: {len(DROPDOWN_OPTIONS)}")
logging.info(f"Process mapping: {json.dumps(PROCESS_MAPPING, indent=2)}")

context = get_current_context()
logging.info(f"DAG Run parameters: {json.dumps(context['params'], sort_keys=True, indent=4)}")

selected_process = context["params"].get("selected_process")
if selected_process in PROCESS_MAPPING:
numerical_id = PROCESS_MAPPING[selected_process]
logging.info(f"Selected process '{selected_process}' maps to numerical ID: {numerical_id}")
return {"numerical_process_id": numerical_id}
else:
logging.warning(f"Selected process '{selected_process}' not found in mapping")
return {"numerical_process_id": 1}


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)

submit_job_task = SPSOGCOperator(
task_id="submit_job_task",
operation_type="submit",
selected_process="{{ params.selected_process }}",
job_inputs="{{ params.job_inputs }}",
job_queue="{{ params.queue }}",
dag=dag,
)

monitor_job_task = SPSOGCOperator(
task_id="monitor_job_task",
operation_type="monitor",
job_id="{{ ti.xcom_pull(task_ids='submit_job_task', key='return_value')['job_id'] }}",
dag=dag,
)


def cleanup(**context):
"""A placeholder cleanup task"""
logging.info("Cleanup executed.")

# Log final results if available
submit_result = context["ti"].xcom_pull(task_ids="submit_job_task", key="return_value")
monitor_result = context["ti"].xcom_pull(task_ids="monitor_job_task", key="return_value")

if submit_result:
logging.info(f"Job submission result: {submit_result}")
if monitor_result:
logging.info(f"Job monitoring result: {monitor_result}")


cleanup_task = PythonOperator(
task_id="Cleanup", python_callable=cleanup, dag=dag, trigger_rule=TriggerRule.ALL_DONE
)

chain(setup_task, submit_job_task, monitor_job_task, cleanup_task)
8 changes: 8 additions & 0 deletions airflow/docker/run_ogc_process/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM alpine:3.18

RUN apk add --no-cache curl jq

COPY run_ogc_process_entrypoint.sh /usr/share/ogc/run_ogc_process_entrypoint.sh
WORKDIR /usr/share/ogc
RUN chmod +x /usr/share/ogc/run_ogc_process_entrypoint.sh
ENTRYPOINT ["/usr/share/ogc/run_ogc_process_entrypoint.sh"]
70 changes: 70 additions & 0 deletions airflow/docker/run_ogc_process/run_ogc_process_entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#!/bin/sh

set -e

if [ "$SUBMIT_JOB" = "true" ] || [ "$SUBMIT_JOB" = "True" ]; then
echo "Submitting job"

SUBMIT_JOB_URL=$(echo "$SUBMIT_JOB_URL" | sed "s/{process_id}/$PROCESS_ID/")
SUBMIT_JOB_ARGUMENTS=$(jq -n \
--arg queue "$QUEUE" \
--argjson inputs "$JOB_INPUTS" \
'{queue: $queue, inputs: $inputs}')

echo "Submitting the job to ${SUBMIT_JOB_URL}"

response=$(curl --location ${SUBMIT_JOB_URL} \
--header "proxy-ticket: ${MAAP_PGT}" \
--header "Content-Type: application/json" \
--data "${SUBMIT_JOB_ARGUMENTS}")

echo "API Response: $response"
job_id=$(echo "$response" | jq -r .id)

if [ "$job_id" = "null" ] || [ -z "$job_id" ]; then
echo "Failed to get jobID from response."
exit 1
fi

echo "Job submitted successfully. Job ID: ${job_id}"

# Write the job_id to the XCom return file for the next task
mkdir -p /airflow/xcom/
printf '{"job_id": "%s"}' "$job_id" > /airflow/xcom/return.json
elif [ "$SUBMIT_JOB" = "false" ] || [ "$SUBMIT_JOB" = "False" ]; then
echo "Monitoring job status"

MONITOR_JOB_URL=$(echo "$MONITOR_JOB_URL" | sed "s/{job_id}/$JOB_ID/")

TIMEOUT=3600
POLL_INTERVAL=30
SECONDS=0

while [ $SECONDS -lt $TIMEOUT ]; do
echo "Checking status..."
response=$(curl --location ${MONITOR_JOB_URL} \
--header "proxy-ticket: ${MAAP_PGT}" \
--header "Content-Type: application/json")

status=$(echo "$response" | jq -r .status)

echo "Current status is: $status"

if [ "$status" = "successful" ]; then
echo "Job completed successfully!"
exit 0
elif [ "$status" = "failed" ]; then
echo "Job failed!"
echo "Error details: $(echo "$response" | jq .)"
exit 1
fi

sleep $POLL_INTERVAL
SECONDS=$((SECONDS + POLL_INTERVAL))
done

echo "Job monitoring timed out after $TIMEOUT seconds."
exit 1
else
echo "SUBMIT_JOB variable must be specified and set to true or false"
fi
Loading