Skip to content

Commit 18f3807

Browse files
committed
ARGO-5355 Check readiness automation component
1 parent abb92bf commit 18f3807

File tree

8 files changed

+240
-13
lines changed

8 files changed

+240
-13
lines changed

automation/argo_automator

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import requests
1313
from argo_ams_library import ArgoMessagingService
1414

1515
from argo_config import ArgoConfig
16+
from check_readiness import check_readiness
1617
from init_ams import init_ams
1718
from init_compute_engine import init_compute_engine
1819
from init_mongo import init_mongo
@@ -31,6 +32,7 @@ class JobName(Enum):
3132
INIT_MONGO = "INIT_MONGO"
3233
INIT_AMS = "INIT_AMS"
3334
INIT_COMPUTE_ENGINE = "INIT_COMPUTE_ENGINE"
35+
CHECK_READINESS = "CHECK_READINESS"
3436

3537

3638
class JobStatus(Enum):
@@ -178,7 +180,9 @@ class ArgoAutomator:
178180
f"ams job {job_name} event for tenant: {tenant_name} ({tenant_id}) discarded"
179181
)
180182
return
181-
183+
if job_name == JobName.CHECK_READINESS.value:
184+
self.executor.submit(self.job_check_readiness, tenant_id, tenant_name)
185+
return
182186
if job_name == JobName.INIT_MONGO.value:
183187
self.executor.submit(self.job_init_mongo, tenant_id, tenant_name)
184188
return
@@ -194,6 +198,54 @@ class ArgoAutomator:
194198
except Exception as e:
195199
logger.exception(f"Failed to process event: {e}")
196200

201+
def job_check_readiness(self, tenant_id: str, tenant_name: str):
202+
"""Job placeholder to check readiness mongo"""
203+
try:
204+
logger.info(
205+
f"job started: initialising check readiness tenant {tenant_name} with id: {tenant_id}"
206+
)
207+
# To do stuff here
208+
self.mon_api.update_status(
209+
tenant_id,
210+
tenant_name,
211+
JobName.CHECK_READINESS.value,
212+
JobStatus.IN_PROGRESS.value,
213+
"Running checks for tenant readiness",
214+
)
215+
216+
# run the checks and collect information
217+
job_done = check_readiness(
218+
self.config,
219+
tenant_id,
220+
tenant_name,
221+
)
222+
223+
224+
if job_done:
225+
self.mon_api.update_status(
226+
tenant_id,
227+
tenant_name,
228+
JobName.CHECK_READINESS.value,
229+
JobStatus.COMPLETED.value,
230+
"Check readiness completed succesfully",
231+
)
232+
logger.info(
233+
f"job completed: check_readiness for tenant {tenant_name}"
234+
)
235+
else:
236+
self.mon_api.update_status(
237+
tenant_id,
238+
tenant_name,
239+
JobName.CHECK_READINESS.value,
240+
JobStatus.FAILED.value,
241+
"Check readiness failed to complete!",
242+
)
243+
logger.error(f"job failed: check readiness for tenant {tenant_name}")
244+
except Exception as e:
245+
logger.exception(f"job failed for tenant {tenant_name}: {e}")
246+
247+
248+
197249
def job_init_mongo(self, tenant_id: str, tenant_name: str):
198250
"""Job placeholder to init mongo"""
199251
try:

automation/argo_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def __init__(self, path: str):
3838
self.web_api_token = automation.get("web_api_token")
3939
self.default_ops_profile_file = automation.get("default_ops_profile_file")
4040
self.hdfs_path = run.get("hdfs_path")
41+
self.hdfs_check_path = run.get("hdfs_check_path")
4142
self.flink_path = run.get("flink_path")
4243
self.batch_jar_path = run.get("batch_jar_path")
4344
self.ingest_jar_path = run.get("ingest_jar_path")

automation/argo_web_api.py

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from enum import Enum
23
from typing import Dict, Optional
34

45
import requests
@@ -10,6 +11,12 @@
1011
REQUEST_TIMEOUT = 30
1112

1213

14+
class TopoItem(Enum):
15+
ENDPOINTS = "endpoints"
16+
GROUPS = "groups"
17+
SERVICE_TYPES = "service-types"
18+
19+
1320
class ArgoWebApi:
1421

1522
def __init__(self, config: ArgoConfig):
@@ -90,6 +97,28 @@ def update_tenant_db_info(
9097
f"tenant: {tenant_name} ({tenant_id}) - web-api updating db conf updated"
9198
)
9299

100+
def get_topology(
101+
self,
102+
tenant_id: str,
103+
tenant_name: str,
104+
tenant_access_token: str,
105+
topology_item: TopoItem,
106+
):
107+
"""Retrieve topology items for specific tenant"""
108+
logger.debug(
109+
f"tenant: {tenant_name} ({tenant_id}) - retrieving report information from web-api..."
110+
)
111+
url = f"https://{self.config.web_api_endpoint}/api/v2/topology/{topology_item.value}"
112+
headers = {
113+
"x-api-key": tenant_access_token,
114+
"Accept": "application/json",
115+
}
116+
117+
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
118+
response.raise_for_status()
119+
120+
return response.json().get("data")
121+
93122
def get_reports(
94123
self,
95124
tenant_id: str,
@@ -109,8 +138,14 @@ def get_reports(
109138
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
110139
response.raise_for_status()
111140

112-
results = response.json().get("data")
113-
return {item["info"]["name"]: item["id"] for item in results}
141+
return response.json().get("data")
142+
143+
def get_report_ids(
144+
self, tenant_id: str, tenant_name: str, tenant_access_token: str
145+
):
146+
"""Retrieve report names and ids for specific tenant"""
147+
reports = self.get_reports(tenant_id, tenant_name, tenant_access_token)
148+
return {item["info"]["name"]: item["id"] for item in reports}
114149

115150
def create_ops_profile(
116151
self,
@@ -147,6 +182,30 @@ def create_ops_profile(
147182
f"tenant: {tenant_name} ({tenant_id}) - web-api ops profile created"
148183
)
149184

185+
def update_ready_state(
186+
self,
187+
tenant_id: str,
188+
tenant_name: str,
189+
payload: object,
190+
):
191+
"""Http call to web-api to update the readiness state for a specific tenant"""
192+
logger.debug(
193+
f"tenant: {tenant_name} ({tenant_id}) - web-api update readiness state..."
194+
)
195+
196+
url = f"https://{self.config.web_api_endpoint}/api/v2/admin/tenants/{tenant_id}/ready"
197+
headers = {
198+
"x-api-key": self.config.web_api_token,
199+
"Accept": "application/json",
200+
}
201+
202+
response = requests.put(
203+
url, json=payload, headers=headers, timeout=REQUEST_TIMEOUT
204+
)
205+
response.raise_for_status()
206+
207+
logger.info(f"tenant: {tenant_name} ({tenant_id}) - web-api readiness updated")
208+
150209
def get_component_user(
151210
self, tenant_id: str, tenant_name: str, component: str
152211
) -> Optional[Dict]:

automation/check_readiness.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import datetime
2+
import logging
3+
4+
import requests
5+
6+
from argo_config import ArgoConfig
7+
from argo_web_api import ArgoWebApi, TopoItem
8+
9+
REQUEST_TIMEOUT = 30
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
def check_hdfs(config: ArgoConfig, tenant_id: str, tenant_name: str) -> bool:
15+
"""Checks if data for today exist in hdfs tenant folders"""
16+
17+
logger.debug(
18+
f"tenant: {tenant_name} ({tenant_id}) - retrieving report information from web-api..."
19+
)
20+
today = datetime.date.today().strftime("%Y-%m-%d")
21+
url = f"{config.hdfs_check_path}/{tenant_name}/mdata/{today}?op=LISTSTATUS"
22+
headers = {
23+
"Accept": "application/json",
24+
}
25+
26+
try:
27+
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
28+
response.raise_for_status()
29+
30+
result = response.json().get("FileStatuses").get("FileStatus")
31+
if len(result) > 0:
32+
return True
33+
except requests.exceptions.HTTPError as e:
34+
if e.response.status_code == 404:
35+
logger.warning(
36+
f"tenant: {tenant_name} ({tenant_id}) - tenant path not found in hdfs"
37+
)
38+
return False
39+
else:
40+
raise
41+
42+
return False
43+
44+
45+
def check_readiness(config: ArgoConfig, tenant_id: str, tenant_name: str) -> object:
46+
"""Checks tenants readiness by doing web-api requests to see if topology and
47+
reports are defined and also by checking if data are present both in ams and hdfs"""
48+
49+
web_api = ArgoWebApi(config)
50+
51+
# get access token from config file
52+
tenant_token = config.tenants.get(tenant_name, {}).get("web_api_token")
53+
54+
# check if topology exists
55+
topology_ready = True
56+
topology_msg = []
57+
topo_endpoints = web_api.get_topology(
58+
tenant_id, tenant_name, tenant_token, TopoItem.ENDPOINTS
59+
)
60+
topo_groups = web_api.get_topology(
61+
tenant_id, tenant_name, tenant_token, TopoItem.GROUPS
62+
)
63+
topo_service_types = web_api.get_topology(
64+
tenant_id, tenant_name, tenant_token, TopoItem.SERVICE_TYPES
65+
)
66+
67+
if len(topo_endpoints) > 0:
68+
topology_msg.append("Topology endpoints are set.")
69+
else:
70+
topology_msg.append("Topology endpoints are missing!")
71+
topology_ready = False
72+
73+
if len(topo_groups) > 0:
74+
topology_msg.append("Topology groups are set.")
75+
else:
76+
topology_msg.append("Topology groups are missing!")
77+
topology_ready = False
78+
79+
if len(topo_service_types) > 0:
80+
topology_msg.append("Topology service-types are set.")
81+
else:
82+
topology_msg.append("Topology service-types are missing!")
83+
topology_ready = False
84+
85+
# check reports
86+
reports_ready = True
87+
reports_msg = "Tenant has at least one report"
88+
89+
reports = web_api.get_reports(tenant_id, tenant_name, tenant_token)
90+
91+
if len(reports) < 0:
92+
reports_msg = "Tenant has no reports!"
93+
94+
# check metric data in hdfs
95+
hdfs_ready = True
96+
hdfs_msg = "Tenant has metric data in HDFS for today"
97+
hdfs_check = check_hdfs(config, tenant_id, tenant_name)
98+
99+
if not hdfs_check:
100+
hdfs_ready = False
101+
hdfs_msg = "Tenant doesn't have metric data in HDFS for today!"
102+
103+
# update the state
104+
payload = {
105+
"data": {"ready": hdfs_ready, "message": hdfs_msg},
106+
"topology": {"ready": topology_ready, "message": " ".join(topology_msg)},
107+
"reports": {"ready": reports_ready, "message": reports_msg},
108+
"last_check": datetime.datetime.now(datetime.timezone.utc).strftime(
109+
"%Y:%m:%dT%H:%M:%SZ"
110+
),
111+
}
112+
113+
# update the payload to web-api
114+
result = web_api.update_ready_state(tenant_id, tenant_name, payload)
115+
if result:
116+
return True
117+
return False

automation/config.yml.example

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ automation:
2424

2525
run:
2626
hdfs_path: hdfs://localhost:9000/user/argo/test/tenants/
27+
hdfs_check_path: http://localhost:50070/user/argo/test/tenants
2728
flink_path: /opt/flink/bin/flink
28-
batch_jar_path: /opt/argo/jars/multijob.jar
29+
batch_jar_path: /opt/argo/jars/multijob.jar
30+
ingest_jar_path: /opt/argo/jars/ingest.jar
2931

3032
# The tenants section is automatically configured by engine - don't edit manually
3133
#

automation/init_ams.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
import logging
22

3-
from argo_ams_library import (
4-
AmsServiceException,
5-
AmsUser,
6-
AmsUserProject,
7-
ArgoMessagingService,
8-
)
3+
from argo_ams_library import (AmsServiceException, AmsUser, AmsUserProject,
4+
ArgoMessagingService)
95

106
from argo_config import ArgoConfig
117

automation/run_batch

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def run_batch(
6161

6262

6363
def main():
64-
parser = argparse.ArgumentParser(description="Argo Automator")
64+
parser = argparse.ArgumentParser(description="Argo Run Batch")
6565
parser.add_argument(
6666
"-c",
6767
"--config",
@@ -141,7 +141,7 @@ def main():
141141
)
142142
if not report_id:
143143
# try to update the tenant report configuration
144-
reports = web_api.get_reports(
144+
reports = web_api.get_report_ids(
145145
tenant["id"], args.tenant, tenant["web_api_token"]
146146
)
147147
config.set_tenant_reports(tenant["id"], args.tenant, reports)

automation/run_ingest

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def run_ingest(
6464

6565

6666
def main():
67-
parser = argparse.ArgumentParser(description="Argo Automator")
67+
parser = argparse.ArgumentParser(description="Argo Run Ingest")
6868
parser.add_argument(
6969
"-c",
7070
"--config",

0 commit comments

Comments
 (0)