66import logging
77import os
88from datetime import datetime
9+ import boto3
910
1011from airflow .models .baseoperator import chain
1112from airflow .models .param import Param
3233 }
3334)
3435
36+ # AWS SSM parameter paths for credentials
37+ DOCKERHUB_USERNAME = "/unity/ads/app_gen/development/dockerhub_username"
38+ DOCKERHUB_TOKEN = "/unity/ads/app_gen/development/dockerhub_api_key"
39+ DOCKSTORE_TOKEN = "/unity/ads/app_gen/development/dockstore_token"
40+
3541# >>> This part will be removed once the parameters can be imported from unity_sps_plugins.py
3642DEFAULT_LOG_LEVEL = 20
3743EC2_TYPES = {
@@ -116,7 +122,7 @@ def build_ec2_type_label(key):
116122LOG_LEVEL_TYPE = {10 : "DEBUG" , 20 : "INFO" }
117123
118124# Change this to the Docker image that contains the Application Package Generator
119- DOCKER_IMAGE = "docker.io/busybox "
125+ DOCKER_IMAGE = "jplmdps/unity-app-gen:v1.0.0 "
120126
121127# Default DAG configuration
122128dag_default_args = {
@@ -137,7 +143,7 @@ def build_ec2_type_label(key):
137143 max_active_tasks = 30 ,
138144 default_args = dag_default_args ,
139145 params = {
140- "message " : Param ("Hello World " , type = "string" , title = "Message " , description = "The greeting message " ),
146+ "repository " : Param ("https://github.com/unity-sds/unity-example-application " , type = "string" , title = "Repository " , description = "Repository to build from " ),
141147 "log_level" : Param (
142148 DEFAULT_LOG_LEVEL ,
143149 type = "integer" ,
@@ -160,11 +166,45 @@ def build_ec2_type_label(key):
160166 },
161167)
162168
169+ app_gen_env_vars = [
170+ k8s .V1EnvVar (name = "DOCKERHUB_USERNAME" , value = "{{ ti.xcom_pull(task_ids='Setup', key='dockerhub_username') }}" ),
171+ k8s .V1EnvVar (name = "DOCKERHUB_TOKEN" , value = "{{ ti.xcom_pull(task_ids='Setup', key='dockerhub_token') }}" ),
172+ k8s .V1EnvVar (name = "DOCKSTORE_TOKEN" , value = "{{ ti.xcom_pull(task_ids='Setup', key='dockstore_token') }}" ),
173+ k8s .V1EnvVar (name = "GITHUB_REPO" , value = "{{ params.repository }}" ),
174+ ]
163175
164176def setup (ti = None , ** context ):
165177 """
166178 Task that selects the proper Karpenter Node Pool depending on the user requested resources.
167179 """
180+
181+ ## Retrieve the docker credentials and DockStore token
182+ ssm_client = boto3 .client ("ssm" , region_name = "us-west-2" )
183+ ssm_response = ssm_client .get_parameters (
184+ Names = [DOCKERHUB_USERNAME , DOCKERHUB_TOKEN , DOCKSTORE_TOKEN ], WithDecryption = True
185+ )
186+ logging .info (ssm_response )
187+
188+ # Somehow get the correct variables from SSM here
189+ credentials_dict = {}
190+ for param in ssm_response ["Parameters" ]:
191+ if param ["Name" ] == DOCKERHUB_USERNAME :
192+ credentials_dict ["dockerhub_username" ] = param ["Value" ]
193+ elif param ["Name" ] == DOCKERHUB_TOKEN :
194+ credentials_dict ["dockerhub_token" ] = param ["Value" ]
195+ elif param ["Name" ] == DOCKSTORE_TOKEN :
196+ credentials_dict ["dockstore_token" ] = param ["Value" ]
197+
198+ required_credentials = ["dockerhub_username" , "dockerhub_token" , "dockstore_token" ]
199+ # make sure all required credentials are provided
200+ if (not set (required_credentials ).issubset (list (credentials_dict .keys ()))):
201+ logging .error (f"Expected all of credentials to run mdps app generator { required_credentials } " )
202+
203+ # use xcom to push to avoid putting credentials to the logs
204+ ti .xcom_push (key = "dockerhub_username" , value = credentials_dict ["dockerhub_username" ])
205+ ti .xcom_push (key = "dockerhub_token" , value = credentials_dict ["dockerhub_token" ])
206+ ti .xcom_push (key = "dockstore_token" , value = credentials_dict ["dockstore_token" ])
207+
168208 context = get_current_context ()
169209 logging .info (f"DAG Run parameters: { json .dumps (context ['params' ], sort_keys = True , indent = 4 )} " )
170210
@@ -204,13 +244,21 @@ def setup(ti=None, **context):
204244 retries = 1 ,
205245 task_id = "appgen_task" ,
206246 namespace = POD_NAMESPACE ,
247+ env_vars = app_gen_env_vars ,
207248 name = "appgen-task-pod" ,
208249 image = DOCKER_IMAGE ,
209250 service_account_name = "airflow-worker" ,
210251 in_cluster = True ,
211252 get_logs = True ,
212253 startup_timeout_seconds = 600 ,
213- arguments = ["echo" , "{{ti.xcom_pull(task_ids='Setup', key='message')}}" ],
254+ arguments = [
255+ "-r" ,
256+ "{{ params.repository }}" ,
257+ "-l" ,
258+ "{{ params.log_level }}" ,
259+ "-e" ,
260+ "{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}" ,
261+ ],
214262 container_security_context = {"privileged" : True },
215263 container_resources = k8s .V1ResourceRequirements (
216264 requests = {
0 commit comments