Skip to content

Commit 677ec3c

Browse files
committed
cleaned up DAG
1 parent 2c7973a commit 677ec3c

File tree

1 file changed

+8
-11
lines changed

1 file changed

+8
-11
lines changed

airflow/dags/run_ogc_process.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,17 @@
1717
from airflow.utils.trigger_rule import TriggerRule
1818
from kubernetes.client import models as k8s
1919
from unity_sps_utils import (
20-
DEFAULT_LOG_LEVEL,
21-
EC2_TYPES,
22-
NODE_POOL_DEFAULT,
23-
NODE_POOL_HIGH_WORKLOAD,
2420
POD_LABEL,
2521
POD_NAMESPACE,
26-
build_ec2_type_label,
2722
get_affinity,
2823
)
2924

25+
PROCESSES_ENDPOINT = "https://api.dit.maap-project.org/api/ogc/processes"
26+
3027
def fetch_ogc_processes():
3128
"""Fetch available processes from the OGC API and create mapping."""
3229
try:
33-
response = requests.get("https://api.dit.maap-project.org/api/ogc/processes", timeout=30)
30+
response = requests.get(PROCESSES_ENDPOINT, timeout=30)
3431
response.raise_for_status()
3532

3633
processes_data = response.json()
@@ -272,7 +269,7 @@ def setup(ti=None, **context):
272269
setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)
273270

274271
submit_job_task = SPSOGCOperator(
275-
task_id="submit_job_task3",
272+
task_id="submit_job_task",
276273
operation_type="submit",
277274
selected_process="{{ params.selected_process }}",
278275
job_inputs="{{ params.job_inputs }}",
@@ -281,9 +278,9 @@ def setup(ti=None, **context):
281278
)
282279

283280
monitor_job_task = SPSOGCOperator(
284-
task_id="monitor_job_task3",
281+
task_id="monitor_job_task",
285282
operation_type="monitor",
286-
job_id="{{ ti.xcom_pull(task_ids='submit_job_task3', key='return_value')['job_id'] }}",
283+
job_id="{{ ti.xcom_pull(task_ids='submit_job_task', key='return_value')['job_id'] }}",
287284
dag=dag,
288285
)
289286

@@ -292,8 +289,8 @@ def cleanup(**context):
292289
logging.info("Cleanup executed.")
293290

294291
# Log final results if available
295-
submit_result = context['ti'].xcom_pull(task_ids='submit_job_task3', key='return_value')
296-
monitor_result = context['ti'].xcom_pull(task_ids='monitor_job_task3', key='return_value')
292+
submit_result = context['ti'].xcom_pull(task_ids='submit_job_task', key='return_value')
293+
monitor_result = context['ti'].xcom_pull(task_ids='monitor_job_task', key='return_value')
297294

298295
if submit_result:
299296
logging.info(f"Job submission result: {submit_result}")

0 commit comments

Comments
 (0)