Skip to content

Commit 2f04b33

Browse files
authored
Merge pull request #1 from unity-sds/cmr-trigger
research and prototype CMR trigger options: scheduled CMR queries
2 parents c277af6 + 7d2e539 commit 2f04b33

File tree

22 files changed

+743
-45
lines changed

22 files changed

+743
-45
lines changed

README.md

Lines changed: 96 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ Trigger events by themselves don't automatically mean that SDS processing is rea
4646
As described by [Hua et al. [2022]](#1):
4747
> A fundamental capability of an SDS is to systematically process science data through a series of data transformations from raw instrument data to geophysical measurements. Data are first made available to the SDS from GDS to be processed to higher level data products. The data transformation steps may utilize ancillary and auxiliary files as well as production rules that stipulate conditions for when each step should be executed.
4848
49-
In an SDS, evaluators are functions (irrespective of how they are deployed and called) that perform adaptation-specific evaluation to determine if the next step in the processing pipeline is ready for execution.
49+
In an SDS, evaluators are functions (irrespective of how they are deployed and called) that perform adaptation-specific evaluation to determine if the next step in the processing pipeline is ready for execution.
5050

5151
As an example, the following shows the input-output diagram for the NISAR L-SAR L0B PGE (a.k.a. science algorithm):
5252

@@ -113,7 +113,7 @@ and a trigger event payload for a new file that was triggered:
113113

114114
The router will iterate over the set of url configs and attempt to match the URL against its set of regexes. If a match is successful, the router will iterate over the configured evaluators configs and perform the configured action to submit the URL payload to the evaluator interface (either SNS topic or DAG submission). In this case, the router sees that the action is `submit_to_sns_topic` and thus publishes the URL payload (and the regular expression captured groups as `payload_info`) to the SNS topic (`topic_arn`) configured in the action's parameters. In addition to the payload URL and the payload info, the router also includes the `on_success` parameters configured for the action. This will propagate pertinent info to the underlying evaluator code which would be used if evaluation is successful. In this case, if the evaulator successfully evaluates that everything is ready for this input file, it can proceed to submit a DAG run for the `submit_nisar_tlm_ingest` DAG in the underlying SPS.
115115

116-
Let's consider another example but this time the configured action is to submit a DAG run instead of publishing to an evaluator's SNS topic:
116+
Let's consider another example but this time the configured action is to submit a DAG run instead of publishing to an evaluator's SNS topic:
117117
```
118118
initiator_config:
119119
name: minimal config example
@@ -171,11 +171,13 @@ In this case, the router sees that the action is `submit_dag_by_id` and thus mak
171171
* [Quick Start](#quick-start)
172172
* [Setting Up the End-to-End Demo](#setting-up-the-end-to-end-demo)
173173
* [Deploying the Inititator](#deploying-the-initiator)
174-
* [Deploying an Example Evaluator](#deploying-an-example-evaluator-sns-topic-sqs-queue-lambda)
174+
* [Deploying Example Evaluators](#deploying-example-evaluators-sns-topic-sqs-queue-lambda)
175175
* [Deploying an S3 Event Notification Trigger](#deploying-an-s3-event-notification-trigger)
176176
* [Verify End-to-End Functionality (part 1)](#verify-end-to-end-functionality-part-1)
177177
* [Deploying an EventBridge Scheduler Trigger](#deploying-an-eventbridge-scheduler-trigger)
178178
* [Verify End-to-End Functionality (part 2)](#verify-end-to-end-functionality-part-2)
179+
* [Deploying an EventBridge Scheduler Trigger for Periodic CMR Queries](#deploying-an-eventbridge-scheduler-trigger-for-periodic-cmr-queries)
180+
* [Verify End-to-End Functionality (part 3)](#verify-end-to-end-functionality-part-3)
179181
* [Tear Down](#tear-down)
180182
* [Setup Instructions for Development](#setup-instructions-for-development)
181183
* [Build Instructions](#build-instructions)
@@ -212,13 +214,14 @@ This guide provides a quick way to get started with our project. Please see our
212214
```
213215
cd unity-initiator/terraform-unity/initiator/
214216
```
215-
1. Copy a sample router configuration YAML file to use for deployment and update the AWS region and AWS account ID to match your AWS environment. We will be using the NISAR TLM test case for this demo so we also rename the SNS topic ARN for it accordingly:
217+
1. Copy a sample router configuration YAML file to use for deployment and update the AWS region and AWS account ID to match your AWS environment. We will be using the NISAR TLM and AIRS RetStd test cases for this demo so we also rename the SNS topic ARNs for them accordingly:
216218
```
217219
cp ../../tests/resources/test_router.yaml .
218220
export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --output text | awk '{print $1}')
219221
export AWS_REGION=$(aws configure get region)
220222
sed -i "s/hilo-hawaii-1/${AWS_REGION}/g" test_router.yaml
221223
sed -i "s/123456789012:eval_nisar_ingest/${AWS_ACCOUNT_ID}:uod-dev-eval_nisar_ingest-evaluator_topic/g" test_router.yaml
224+
sed -i "s/123456789012:eval_airs_ingest/${AWS_ACCOUNT_ID}:uod-dev-eval_airs_ingest-evaluator_topic/g" test_router.yaml
222225
```
223226
1. You will need an S3 bucket for terraform to stage the router Lambda zip file during deployment. Create one or reuse an existing one and set an environment variable for it:
224227
```
@@ -247,10 +250,19 @@ This guide provides a quick way to get started with our project. Please see our
247250
```
248251
**Take note of the `initiator_topic_arn` that is output by terraform. It will be used when setting up any triggers.**
249252

250-
#### Deploying an Example Evaluator (SNS topic->SQS queue->Lambda)
251-
1. Change directory to the location of the sns_sqs_lambda evaluator terraform:
253+
#### Deploying Example Evaluators (SNS topic->SQS queue->Lambda)
254+
##### Evaluator Deployment for NISAR TLM (via staged data to the ISL)
255+
1. Change directory to the location of the evaluators terraform:
252256
```
253-
cd ../evaluators/sns_sqs_lambda/
257+
cd ../evaluators
258+
```
259+
1. Make a copy of the `sns_sqs_lambda` directory for the NISAR TLM evaluator:
260+
```
261+
cp -rp sns_sqs_lambda sns_sqs_lambda-nisar_tlm
262+
```
263+
1. Change directory into the NISAR TLM evaluator terraform:
264+
```
265+
cd sns_sqs_lambda-nisar_tlm/
254266
```
255267
1. Set the name of the evaluator to our NISAR example:
256268
```
@@ -270,8 +282,41 @@ This guide provides a quick way to get started with our project. Please see our
270282
--var evaluator_name=${EVALUATOR_NAME} \
271283
-auto-approve
272284
```
273-
**Take note of the `evaluator_topic_arn` that is output by terraform. It should match the topic ARN in the test_router.yaml file you used during the initiator deployment. If they match then the router Lambda is now able to submit payloads to this evaluator SNS topic.**
274-
285+
**Take note of the `evaluator_topic_arn` that is output by terraform. It should match the respective topic ARN in the test_router.yaml file you used during the initiator deployment. If they match then the router Lambda is now able to submit payloads to this evaluator SNS topic.**
286+
287+
##### Evaluator Deployment for AIRS RetStd (via scheduled CMR query)
288+
1. Change directory to the location of the evaluators terraform:
289+
```
290+
cd ..
291+
```
292+
1. Make a copy of the `sns_sqs_lambda` directory for the AIRS RetStd evaluator:
293+
```
294+
cp -rp sns_sqs_lambda sns_sqs_lambda-airs_retstd
295+
```
296+
1. Change directory into the AIRS RetStd evaluator terraform:
297+
```
298+
cd sns_sqs_lambda-airs_retstd/
299+
```
300+
1. Set the name of the evaluator to our AIRS example:
301+
```
302+
export EVALUATOR_NAME=eval_airs_ingest
303+
```
304+
1. Note the implementation of the evaluator code. It currently doesn't do any real evaluation but simply returns that evaluation was successful:
305+
```
306+
cat data.tf
307+
```
308+
1. Initialize terraform:
309+
```
310+
terraform init
311+
```
312+
1. Run terraform apply:
313+
```
314+
terraform apply \
315+
--var evaluator_name=${EVALUATOR_NAME} \
316+
-auto-approve
317+
```
318+
**Take note of the `evaluator_topic_arn` that is output by terraform. It should match the respective topic ARN in the test_router.yaml file you used during the initiator deployment. If they match then the router Lambda is now able to submit payloads to this evaluator SNS topic.**
319+
275320
#### Deploying an S3 Event Notification Trigger
276321
1. Change directory to the location of the s3_bucket_notification trigger terraform:
277322
```
@@ -341,9 +386,50 @@ This guide provides a quick way to get started with our project. Please see our
341386
1. The deployed EventBridge scheduler runs the trigger Lambda function with schedule expression of `rate(1 minute)`. After a minute, verify that the `eval_nisar_ingest` evaluator Lambda function was called successfully for each of those scheduled invocations by looking at its CloudWatch logs for entries similar to this:
342387
![eval_log_2](https://github.com/unity-sds/unity-initiator/assets/387300/cae82e10-a736-43b7-8957-790fc29b5fea)
343388

389+
#### Deploying an EventBridge Scheduler Trigger for Periodic CMR Queries
390+
1. Change directory to the location of the s3_bucket_notification trigger terraform:
391+
```
392+
cd ../cmr_query/
393+
```
394+
1. Note the implementation of the trigger lambda code. It will query CMR for granules for a particular collection within a timeframe, query its dynamodb table if they already exist, and if not, submit them as payload URLs to the initiator SNS topic and save them into the dynamodb table:
395+
```
396+
cat lambda_handler.py
397+
```
398+
1. Set the CMR provider ID for the AIRS RetStd collection:
399+
```
400+
export PROVIDER_ID=GES_DISC
401+
```
402+
1. Set the CMR concept ID for the AIRS RetStd collection:
403+
```
404+
export CONCEPT_ID=C1701805619-GES_DISC
405+
```
406+
1. Set the amount of seconds to look back from the current epoch for granules in the collection. For example, we will set this value to 2 days (172800 seconds) so that when the CMR query lambda kicks off, it will query for all AIRS RetStd granules using a temporal search of `now - 172800 seconds` to `now`:
407+
```
408+
export SECONDS_BACK=172800
409+
```
410+
1. Initialize terraform:
411+
```
412+
terraform init
413+
```
414+
1. Run terraform apply. Note the DEPLOYMENT_NAME, CODE_BUCKET and INITIATOR_TOPIC_ARN environment variables should have been set in the previous steps. If not set them again:
415+
```
416+
terraform apply \
417+
--var deployment_name=${DEPLOYMENT_NAME} \
418+
--var code_bucket=${CODE_BUCKET} \
419+
--var initiator_topic_arn=${INITIATOR_TOPIC_ARN} \
420+
--var provider_id=${PROVIDER_ID} \
421+
--var concept_id=${CONCEPT_ID} \
422+
--var seconds_back=${SECONDS_BACK} \
423+
-auto-approve
424+
```
425+
426+
#### Verify End-to-End Functionality (part 3)
427+
1. The deployed EventBridge scheduler runs the trigger CMR query Lambda function with schedule expression of `rate(1 minute)`. After a minute, verify that the `eval_airs_ingest` evaluator Lambda function was called successfully for each of those scheduled invocations by looking at its CloudWatch logs for entries similar to this:
428+
![eval_log_3](https://github.com/user-attachments/assets/54b26349-91b2-4958-9082-47613da6c675)
429+
344430
#### Tear Down
345431
1. Simply go back into each of the terraform directories for which `terraform apply` was run and run `terraform destroy`.
346-
432+
347433
### Setup Instructions for Development
348434

349435
1. Clone repo:
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#!/bin/bash
2+
BASE_PATH=$(dirname "${BASH_SOURCE}")
3+
BASE_PATH=$(cd "${BASE_PATH}/.."; pwd)
4+
DIST_DIR=${BASE_PATH}/dist
5+
PKG_DIR=${DIST_DIR}/lambda_packages
6+
CMR_QUERY_DIR=${BASE_PATH}/terraform-unity/triggers/cmr_query
7+
8+
set -ex
9+
10+
rm -rf $DIST_DIR
11+
pip install hatch
12+
hatch clean
13+
hatch build
14+
VERSION=$(hatch run python -c 'from importlib.metadata import version; print(version("unity_initiator"))')
15+
echo "{\"version\": \"$VERSION\"}" > ${DIST_DIR}/version.json
16+
mkdir -p $PKG_DIR
17+
pip install -t $PKG_DIR ${DIST_DIR}/unity_initiator-*.whl
18+
pip install -t $PKG_DIR python_cmr
19+
cp ${CMR_QUERY_DIR}/lambda_handler.py $PKG_DIR/
20+
cd $PKG_DIR
21+
zip -rq ${DIST_DIR}/cmr_query-${VERSION}-lambda.zip .

scripts/build_lambda_package.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ BASE_PATH=$(dirname "${BASH_SOURCE}")
33
BASE_PATH=$(cd "${BASE_PATH}/.."; pwd)
44
DIST_DIR=${BASE_PATH}/dist
55
PKG_DIR=${DIST_DIR}/lambda_packages
6-
TEST_DIR=${BASE_PATH}/tests
76

87
set -ex
98

src/unity_initiator/cloud/lambda_handler.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,25 +57,25 @@ def lambda_handler_initiator(event, context):
5757
# TODO: Find cleaner way of parsing payloads from a variety of sources (S3 event notification, EventBridge->Lambda).
5858
# For now we use brittle assumptions on the payload structure for each of the supported sources.
5959

60-
# skip S3 test event
61-
if message.get("Event", None) == "s3:TestEvent":
62-
logger.info("Skipped s3:TestEvent")
63-
continue
64-
65-
# payload comes from S3 notification
66-
if "Records" in message:
67-
for rec in message["Records"]:
68-
s3_info = rec["s3"]
69-
payloads.append(
70-
{
71-
"payload": f"s3://{s3_info['bucket']['name']}/{s3_info['object']['key']}"
72-
}
73-
)
74-
# payload comes from EventBridge scheduled task
60+
if isinstance(message, dict):
61+
# skip S3 test event
62+
if message.get("Event", None) == "s3:TestEvent":
63+
logger.info("Skipped s3:TestEvent")
64+
continue
65+
66+
# payload comes from S3 notification
67+
if "Records" in message:
68+
for rec in message["Records"]:
69+
s3_info = rec["s3"]
70+
payloads.append(
71+
{
72+
"payload": f"s3://{s3_info['bucket']['name']}/{s3_info['object']['key']}"
73+
}
74+
)
75+
elif isinstance(message, list):
76+
# payload comes from EventBridge scheduled task
77+
payloads.extend(message)
7578
else:
76-
if isinstance(message, list):
77-
payloads.extend(message)
78-
else:
79-
payloads.append(message)
79+
payloads.append(message)
8080

8181
return lambda_handler_multiple_payloads(payloads, context)

terraform-unity/evaluators/sns_sqs_lambda/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ No modules.
2626

2727
| Name | Type |
2828
|------|------|
29+
| [aws_cloudwatch_log_group.evaluator_lambda_log_group](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource |
2930
| [aws_iam_policy.evaluator_lambda_policy](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
3031
| [aws_iam_role.evaluator_lambda_iam_role](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role) | resource |
3132
| [aws_iam_role_policy_attachment.lambda_base_policy_attachment](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |

terraform-unity/evaluators/sns_sqs_lambda/locals.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
locals {
2+
function_name = "${var.project}-${var.venue}-${var.evaluator_name}-evaluator"
23
tags = {
34
Venue = "dev"
45
ServiceArea = "cs"

terraform-unity/evaluators/sns_sqs_lambda/main.tf

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ resource "aws_lambda_function" "evaluator_lambda" {
88
]
99
}
1010

11-
function_name = "${var.project}-${var.venue}-${var.evaluator_name}-evaluator"
11+
function_name = local.function_name
1212
role = aws_iam_role.evaluator_lambda_iam_role.arn
1313
handler = "lambda_function.lambda_handler"
1414
runtime = "python3.11"
@@ -18,8 +18,13 @@ resource "aws_lambda_function" "evaluator_lambda" {
1818
tags = local.tags
1919
}
2020

21+
resource "aws_cloudwatch_log_group" "evaluator_lambda_log_group" {
22+
name = "/aws/lambda/${local.function_name}"
23+
retention_in_days = 14
24+
}
25+
2126
resource "aws_iam_role" "evaluator_lambda_iam_role" {
22-
name = "${var.project}-${var.venue}-${var.evaluator_name}-evaluator_lambda_iam_role"
27+
name = "${local.function_name}_lambda_iam_role"
2328

2429
assume_role_policy = jsonencode({
2530
Version = "2012-10-17",
@@ -38,7 +43,7 @@ resource "aws_iam_role" "evaluator_lambda_iam_role" {
3843
}
3944

4045
resource "aws_iam_policy" "evaluator_lambda_policy" {
41-
name = "${var.project}-${var.venue}-${var.evaluator_name}-evaluator_lambda_policy"
46+
name = "${local.function_name}_lambda_policy"
4247
description = "A policy for the evaluator lambda function to access S3 and SQS"
4348

4449
policy = jsonencode({
@@ -89,7 +94,7 @@ resource "aws_ssm_parameter" "evaluator_lambda_function_name" {
8994

9095

9196
resource "aws_sqs_queue" "evaluator_dead_letter_queue" {
92-
name = "${var.project}-${var.venue}-${var.evaluator_name}-evaluator_dead_letter_queue"
97+
name = "${local.function_name}_dead_letter_queue"
9398
delay_seconds = 0
9499
max_message_size = 2048
95100
message_retention_seconds = 1209600
@@ -99,7 +104,7 @@ resource "aws_sqs_queue" "evaluator_dead_letter_queue" {
99104
}
100105

101106
resource "aws_sqs_queue" "evaluator_queue" {
102-
name = "${var.project}-${var.venue}-${var.evaluator_name}-evaluator_queue"
107+
name = "${local.function_name}_queue"
103108
delay_seconds = 0
104109
max_message_size = 2048
105110
message_retention_seconds = 1209600
@@ -113,7 +118,7 @@ resource "aws_sqs_queue" "evaluator_queue" {
113118
}
114119

115120
resource "aws_sns_topic" "evaluator_topic" {
116-
name = "${var.project}-${var.venue}-${var.evaluator_name}-evaluator_topic"
121+
name = "${local.function_name}_topic"
117122
}
118123

119124
resource "aws_sns_topic_policy" "evaluator_topic_policy" {

terraform-unity/initiator/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ No modules.
2626

2727
| Name | Type |
2828
|------|------|
29+
| [aws_cloudwatch_log_group.initiator_lambda_log_group](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource |
2930
| [aws_iam_policy.initiator_lambda_policy](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
3031
| [aws_iam_role.initiator_lambda_iam_role](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role) | resource |
3132
| [aws_iam_role_policy_attachment.lambda_base_policy_attachment](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |

terraform-unity/initiator/locals.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
locals {
2+
function_name = "${var.project}-${var.venue}-${var.deployment_name}-inititator"
23
tags = {
34
Venue = "dev"
45
ServiceArea = "cs"

0 commit comments

Comments
 (0)