Skip to content

Commit e0f78c0

Browse files
authored
add action to submit HySDS job for NISAR ECMWF task (#5)
* initial implementation of HySDS submit job action * fix schema * finalize implementation; add unit and regression tests
1 parent e84d016 commit e0f78c0

File tree

5 files changed

+148
-7
lines changed

5 files changed

+148
-7
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import json
2+
import uuid
3+
4+
import httpx
5+
6+
from ..utils.logger import logger
7+
from .base import Action
8+
9+
__all__ = ["SubmitHysdsJob"]
10+
11+
12+
class SubmitHysdsJob(Action):
13+
def __init__(self, payload, payload_info, params):
14+
super().__init__(payload, payload_info, params)
15+
logger.info("instantiated %s", __class__.__name__)
16+
17+
def execute(self):
18+
"""Submit job to mozart via REST API."""
19+
20+
# build job params
21+
job_params = {
22+
"payload": self._payload,
23+
"payload_info": self._payload_info,
24+
"on_success": self._params["on_success"],
25+
}
26+
27+
# setup url and request body
28+
url = self._params["mozart_base_api_endpoint"]
29+
body = {
30+
"queue": self._params["queue"],
31+
"priority": self._params.get("priority", 0),
32+
"tags": json.dumps(self._params.get("tags", [])),
33+
"type": self._params["job_spec"],
34+
"params": json.dumps(job_params),
35+
"name": f"{self._params['job_spec'].split(':')[0]}-{str(uuid.uuid4())}",
36+
}
37+
38+
# submit job
39+
logger.info("job URL: %s", url)
40+
logger.info("job params: %s", json.dumps(body, indent=2, sort_keys=True))
41+
response = httpx.post(url, data=body, verify=False) # nosec
42+
if response.status_code in (200, 201):
43+
success = True
44+
resp = response.json()
45+
logger.info(
46+
"Successfully submitted HySDS job %s: %s",
47+
self._params["job_spec"],
48+
resp,
49+
)
50+
else:
51+
success = False
52+
resp = response.text
53+
logger.info(
54+
"Failed to submit HySDS job %s: %s", self._params["job_spec"], resp
55+
)
56+
return {"success": success, "response": resp}

src/unity_initiator/resources/routers_schema.yaml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ evaluator_config:
3636
# Currently only 2 types of actions are supported:
3737
# 1. submit payload to an SNS topic
3838
# 2. submit payload to an airflow DAG
39-
action_config: any(include("submit_dag_by_id_action"), include("submit_to_sns_topic_action"), include("submit_ogc_process_execution_action"))
39+
action_config: any(include("submit_dag_by_id_action"), include("submit_to_sns_topic_action"), include("submit_ogc_process_execution_action"), include("submit_hysds_job_action"))
4040

4141
# Configuration for submitting a payload to an airflow DAG.
4242
submit_dag_by_id_action:
@@ -69,6 +69,17 @@ submit_ogc_process_execution_action:
6969
execution_subscriber: map(required=False)
7070
on_success: include("on_success_actions", required=False)
7171

72+
# Configuration for submitting a HySDS job
73+
submit_hysds_job_action:
74+
name: str(equals="submit_hysds_job")
75+
params:
76+
mozart_base_api_endpoint: str(required=True)
77+
job_spec: str(required=True)
78+
queue: str(required=True)
79+
priority: int(min=0, max=10, required=False)
80+
tags: list(str(), required=False)
81+
on_success: include("on_success_actions", required=False)
82+
7283
# Configuration to pass onto the evaluator to use when evaluation is a success.
7384
on_success_actions:
7485
actions: list(include("action_config"), required=True, min=1, max=1)

tests/resources/test_router.yaml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,15 @@ initiator_config:
148148
topic_arn: arn:aws:sns:hilo-hawaii-1:123456789012:eval_airs_ingest
149149
on_success:
150150
actions:
151-
- name: submit_dag_by_id
151+
- name: submit_hysds_job
152152
params:
153-
dag_id: submit_airs_ingest
154-
airflow_base_api_endpoint: xxx
155-
airflow_username: <SSM parameter, e.g. /unity/airflow/username> <ARN to username entry in AWS Secrets Manager>
156-
airflow_password: <SSM parameter, e.g. /unity/airflow/password> <ARN to password entry in Secrets Manager>
157-
153+
mozart_base_api_endpoint: https://example.com/api/v0.1/job/submit
154+
job_spec: submit_airs_ingest:v1
155+
queue: ingest_queue
156+
priority: 0
157+
tags:
158+
- airs
159+
- hysds
158160

159161
- regexes:
160162
- '(?<=/)(?P<filename>hello_world\.txt)$'

tests/test_lambda.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ def setup_mock_resources():
5959
sns_client.create_topic(
6060
Name="eval_nisar_ingest", Attributes={"TracingConfig": "Active"}
6161
)
62+
sns_client.create_topic(
63+
Name="eval_airs_ingest", Attributes={"TracingConfig": "Active"}
64+
)
6265

6366
# mock airflow REST API
6467
respx.post("https://example.com/api/v1/dags/eval_nisar_l0a_readiness/dagRuns").mock(
@@ -83,6 +86,19 @@ def setup_mock_resources():
8386
)
8487
)
8588

89+
# mock mozart REST API
90+
respx.post("https://example.com/api/v0.1/job/submit").mock(
91+
return_value=Response(
92+
200,
93+
json={
94+
"success": True,
95+
"message": "",
96+
"result": "fda11fad-35f0-466e-a785-4678a0e662de",
97+
"tags": ["airs", "hysds"],
98+
},
99+
)
100+
)
101+
86102

87103
@respx.mock
88104
@mock_aws
@@ -257,6 +273,21 @@ def test_invoke_function_all_test_cases(self):
257273
for res in results:
258274
assert res["success"]
259275

276+
# AIRS RetStd use case
277+
in_data = {
278+
"payload": "s3://bucket/prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf"
279+
}
280+
invoke_res = self.client.invoke(
281+
FunctionName=self.function_name,
282+
InvocationType="Event",
283+
Payload=json.dumps(in_data),
284+
)
285+
logger.info("invoke_res: %s", invoke_res)
286+
results = json.loads(invoke_res["Payload"].read().decode("utf-8"))
287+
logger.info("results: %s", results)
288+
for res in results:
289+
assert res["success"]
290+
260291
def test_invoke_function_unrecognized(self):
261292
"""Test invocations of the router lambda using an unrecognized url."""
262293

@@ -450,3 +481,12 @@ def test_initiator_nisar_ldf(self):
450481
self.bucket_name,
451482
"prefix/NISAR_S198_PA_PA11_M00_P00922_R00_C01_G00_2024_010_18_03_05_087077000.ldf",
452483
)
484+
485+
def test_initiator_airs_retstd(self):
486+
"""Test invocations of the initiator lambda via S3 event using AIRS RetStd test case: submit_hysds_job"""
487+
488+
# Upload file to trigger notification
489+
assert self.invoke_initiator_via_s3_event(
490+
self.bucket_name,
491+
"prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf",
492+
)

tests/test_router.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,38 @@ def test_execute_actions_for_nisar_ldf_url():
178178
assert res["success"]
179179

180180

181+
@respx.mock
182+
@mock_aws
183+
def test_execute_actions_for_airs_retstd_url():
184+
"""Test routing a url payload and executing actions: AIRS RetStd example"""
185+
186+
# mock mozart REST API
187+
respx.post("https://example.com/api/v0.1/job/submit").mock(
188+
return_value=Response(
189+
200,
190+
json={
191+
"success": True,
192+
"message": "",
193+
"result": "fda11fad-35f0-466e-a785-4678a0e662de",
194+
"tags": ["airs", "hysds"],
195+
},
196+
)
197+
)
198+
199+
url = "s3://bucket/prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf"
200+
client = boto3.client("sns")
201+
router_file = files("tests.resources").joinpath("test_router.yaml")
202+
router = Router(router_file)
203+
client.create_topic(
204+
Name=list(router.get_evaluators_by_url(url))[0].name,
205+
Attributes={"TracingConfig": "Active"},
206+
)
207+
results = router.execute_actions(url)
208+
logger.info("results: %s", results)
209+
for res in results:
210+
assert res["success"]
211+
212+
181213
@mock_aws
182214
def test_unrecognized_url():
183215
"""Test routing a url payload that is unrecognized: NISAR L0B example"""

0 commit comments

Comments
 (0)