Skip to content
154 changes: 67 additions & 87 deletions airflow/dags/appgen_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,116 +7,42 @@
import os
from datetime import datetime

import boto3
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.operators.python import PythonOperator, get_current_context
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.trigger_rule import TriggerRule
from kubernetes.client import models as k8s
from unity_sps_utils import ( # DEFAULT_LOG_LEVEL,; EC2_TYPES,; POD_LABEL,; build_ec2_type_label,
from unity_sps_utils import (
DEFAULT_LOG_LEVEL,
EC2_TYPES,
NODE_POOL_DEFAULT,
NODE_POOL_HIGH_WORKLOAD,
POD_LABEL,
POD_NAMESPACE,
build_ec2_type_label,
get_affinity,
)

from airflow import DAG

POD_LABEL = "appgen_pod" + datetime.now().strftime(
"%Y%m%d_%H%M%S_%f"
) # unique pod label to assure each job runs on its own pod

CONTAINER_RESOURCES = k8s.V1ResourceRequirements(
requests={
"ephemeral-storage": "{{ params.request_storage }} ",
}
)

# >>> This part will be removed once the parameters can be imported from unity_sps_plugins.py
DEFAULT_LOG_LEVEL = 20
EC2_TYPES = {
"t3.micro": {
"desc": "General Purpose",
"cpu": 1,
"memory": 1,
},
"t3.small": {
"desc": "General Purpose",
"cpu": 2,
"memory": 2,
},
"t3.medium": {
"desc": "General Purpose",
"cpu": 2,
"memory": 4,
},
"t3.large": {
"desc": "General Purpose",
"cpu": 2,
"memory": 8,
},
"t3.xlarge": {
"desc": "General Purpose",
"cpu": 4,
"memory": 16,
},
"t3.2xlarge": {
"desc": "General Purpose",
"cpu": 8,
"memory": 32,
},
"r7i.xlarge": {
"desc": "Memory Optimized",
"cpu": 4,
"memory": 32,
},
"r7i.2xlarge": {
"desc": "Memory Optimized",
"cpu": 8,
"memory": 64,
},
"r7i.4xlarge": {
"desc": "Memory Optimized",
"cpu": 16,
"memory": 128,
},
"r7i.8xlarge": {
"desc": "Memory Optimized",
"cpu": 32,
"memory": 256,
},
"c6i.xlarge": {
"desc": "Compute Optimized",
"cpu": 4,
"memory": 8,
},
"c6i.2xlarge": {
"desc": "Compute Optimized",
"cpu": 8,
"memory": 16,
},
"c6i.4xlarge": {
"desc": "Compute Optimized",
"cpu": 16,
"memory": 32,
},
"c6i.8xlarge": {
"desc": "Compute Optimized",
"cpu": 32,
"memory": 64,
},
}


def build_ec2_type_label(key):
return f"{key} ({EC2_TYPES.get(key)['desc']}: {EC2_TYPES.get(key)['cpu']}vCPU, {EC2_TYPES.get(key)['memory']}GiB)"

# AWS SSM parameter paths for credentials
DOCKERHUB_USERNAME = "/unity/ads/app_gen/development/dockerhub_username"
DOCKERHUB_TOKEN = "/unity/ads/app_gen/development/dockerhub_api_key"
DOCKSTORE_TOKEN = "/unity/ads/app_gen/development/dockstore_token"

# <<<
LOG_LEVEL_TYPE = {10: "DEBUG", 20: "INFO"}

# Change this to the Docker image that contains the Application Package Generator
DOCKER_IMAGE = "docker.io/busybox"
DOCKER_IMAGE = "jplmdps/unity-app-gen:v1.1.1"

# Default DAG configuration
dag_default_args = {
Expand All @@ -137,7 +63,12 @@ def build_ec2_type_label(key):
max_active_tasks=30,
default_args=dag_default_args,
params={
"message": Param("Hello World", type="string", title="Message", description="The greeting message"),
"repository": Param(
"https://github.com/unity-sds/unity-example-application",
type="string",
title="Repository",
description="Git URL of application source files",
),
"log_level": Param(
DEFAULT_LOG_LEVEL,
type="integer",
Expand All @@ -160,11 +91,52 @@ def build_ec2_type_label(key):
},
)

app_gen_env_vars = [
k8s.V1EnvVar(
name="DOCKERHUB_USERNAME", value="{{ ti.xcom_pull(task_ids='Setup', key='dockerhub_username') }}"
),
k8s.V1EnvVar(name="DOCKERHUB_TOKEN", value="{{ ti.xcom_pull(task_ids='Setup', key='dockerhub_token') }}"),
k8s.V1EnvVar(name="DOCKSTORE_TOKEN", value="{{ ti.xcom_pull(task_ids='Setup', key='dockstore_token') }}"),
k8s.V1EnvVar(
name="DOCKSTORE_API_URL",
value="http://awslbdockstorestack-lb-1429770210.us-west-2.elb.amazonaws.com:9998/api",
),
k8s.V1EnvVar(name="GITHUB_REPO", value="{{ params.repository }}"),
]


def setup(ti=None, **context):
"""
Task that selects the proper Karpenter Node Pool depending on the user requested resources.
"""

## Retrieve the docker credentials and DockStore token
ssm_client = boto3.client("ssm", region_name="us-west-2")
ssm_response = ssm_client.get_parameters(
Names=[DOCKERHUB_USERNAME, DOCKERHUB_TOKEN, DOCKSTORE_TOKEN], WithDecryption=True
)
logging.info(ssm_response)

# Somehow get the correct variables from SSM here
credentials_dict = {}
for param in ssm_response["Parameters"]:
if param["Name"] == DOCKERHUB_USERNAME:
credentials_dict["dockerhub_username"] = param["Value"]
elif param["Name"] == DOCKERHUB_TOKEN:
credentials_dict["dockerhub_token"] = param["Value"]
elif param["Name"] == DOCKSTORE_TOKEN:
credentials_dict["dockstore_token"] = param["Value"]

required_credentials = ["dockerhub_username", "dockerhub_token", "dockstore_token"]
# make sure all required credentials are provided
if not set(required_credentials).issubset(list(credentials_dict.keys())):
logging.error(f"Expected all of credentials to run mdps app generator {required_credentials}")

# use xcom to push to avoid putting credentials to the logs
ti.xcom_push(key="dockerhub_username", value=credentials_dict["dockerhub_username"])
ti.xcom_push(key="dockerhub_token", value=credentials_dict["dockerhub_token"])
ti.xcom_push(key="dockstore_token", value=credentials_dict["dockstore_token"])

context = get_current_context()
logging.info(f"DAG Run parameters: {json.dumps(context['params'], sort_keys=True, indent=4)}")

Expand Down Expand Up @@ -204,13 +176,21 @@ def setup(ti=None, **context):
retries=1,
task_id="appgen_task",
namespace=POD_NAMESPACE,
env_vars=app_gen_env_vars,
name="appgen-task-pod",
image=DOCKER_IMAGE,
service_account_name="airflow-worker",
in_cluster=True,
get_logs=True,
startup_timeout_seconds=600,
arguments=["echo", "{{ti.xcom_pull(task_ids='Setup', key='message')}}"],
arguments=[
"-r",
"{{ params.repository }}",
"-l",
"{{ params.log_level }}",
"-e",
"{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}",
],
container_security_context={"privileged": True},
container_resources=k8s.V1ResourceRequirements(
requests={
Expand Down