Skip to content

Commit f555587

Browse files
committed
ARGO-5351 Computation submit scripts that use the new config.yml
1 parent 4ad5cf7 commit f555587

File tree

10 files changed

+420
-17
lines changed

10 files changed

+420
-17
lines changed

automation/README.md

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
## Automation folder
2+
3+
This folder contains automation components for managing automation jobs and submitting computation jobs in Argo Monitoring. The main component is `argo_automator`, a daemon that listens for events on AMS and executes triggered jobs.
4+
5+
### Getting started
6+
7+
First, set up a Python virtual environment:
8+
```bash
9+
python -m venv ./argo-venv
10+
source ./argo-venv/bin/activate
11+
pip install -r requirements.txt
12+
```
13+
14+
**Requirements:**
15+
- Python 3.9+
16+
- Dependencies: requests, argo-ams-library, pyyaml, pymongo
17+
18+
### Running the automator daemon
19+
20+
Start the automator with:
21+
```bash
22+
./argo_automator
23+
```
24+
25+
By default it looks for `.config.yml` in the current directory. To specify a different config file:
26+
```bash
27+
./argo_automator -c /path/to/config.yml
28+
```
29+
30+
See `config.yml.example` for configuration details.
31+
32+
### Job submission scripts
33+
34+
#### Ingest job
35+
Submit an ingestion job for a tenant:
36+
```bash
37+
./run_ingest -t TENANTFOO
38+
```
39+
40+
**Options:**
41+
- `-c /path/to/config.yml` - Specify config file (default: `.config.yml`)
42+
- `--no-verify` - Skip verification of remote endpoints like AMS
43+
- `--dry-run` - Preview what would be submitted without executing
44+
- `--log-level DEBUG` - Adjust logging verbosity
45+
46+
#### Batch job
47+
Submit a batch computation job (calculates AR, status, and trends):
48+
```bash
49+
./run_batch -t TENANTFOO -r Default
50+
```
51+
52+
**Options:**
53+
- `-d 2025-05-05` - Specify date (default: current day)
54+
- `-c /path/to/config.yml` - Specify config file
55+
- `--dry-run` - Preview submission
56+
- `--log-level DEBUG` - Adjust logging verbosity

automation/argo_config.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ def __init__(self, path: str):
1616
exit(1)
1717
automation = config_data.get("automation", {})
1818
tenants = config_data.get("tenants", {})
19+
run = config_data.get("run", {})
1920

2021
self.path = path
2122
self.automation = automation
2223
self.tenants = tenants
24+
self.run = run
2325
self.ams_endpoint = automation.get("ams_endpoint")
2426
self.ams_event_token = automation.get("ams_event_token")
2527
self.ams_event_project = automation.get("ams_event_project")
@@ -35,11 +37,19 @@ def __init__(self, path: str):
3537
self.web_api_endpoint = automation.get("web_api_endpoint")
3638
self.web_api_token = automation.get("web_api_token")
3739
self.default_ops_profile_file = automation.get("default_ops_profile_file")
40+
self.hdfs_path = run.get("hdfs_path")
41+
self.flink_path = run.get("flink_path")
42+
self.batch_jar_path = run.get("batch_jar_path")
43+
self.ingest_jar_path = run.get("ingest_jar_path")
3844

3945
def save(self) -> None:
4046
"""Save current configuration back to yaml file"""
4147
with open(self.path, "w") as f:
42-
data = {"automation": self.automation, "tenants": self.tenants}
48+
data = {
49+
"automation": self.automation,
50+
"run": self.run,
51+
"tenants": self.tenants,
52+
}
4353
yaml.dump(data, f, default_flow_style=False, sort_keys=False)
4454
logger.info("engine config - saved to disk")
4555

@@ -49,6 +59,12 @@ def set_tenant_web_api_access(self, tenant_id, tenant_name, web_api_token):
4959
logger.info(f"engine config - tenant {tenant_name} web_api_token prop set")
5060
self.save()
5161

62+
def set_tenant_reports(self, tenant_id: str, tenant_name: str, reports: dict):
63+
self.ensure_tenant(tenant_id, tenant_name)
64+
self.tenants.get(tenant_name)["reports"] = reports
65+
logger.info(f"engine config - tenant {tenant_name} reports prop set")
66+
self.save()
67+
5268
def set_tenant_ams_access(self, tenant_id, tenant_name, ams_token):
5369
self.ensure_tenant(tenant_id, tenant_name)
5470
self.tenants.get(tenant_name)["ams_token"] = ams_token
@@ -58,10 +74,10 @@ def set_tenant_ams_access(self, tenant_id, tenant_name, ams_token):
5874
def ensure_tenant(self, tenant_id, tenant_name):
5975
cur_tenant = self.tenants.get(tenant_name)
6076
if not cur_tenant:
61-
self.tenants[tenant_name] = {"tenant_id": tenant_id}
77+
self.tenants[tenant_name] = {"id": tenant_id}
6278
logger.info(f"engine config - tenant {tenant_name} definition created")
6379
return
64-
cur_tenant_id = self.tenants.get("tenant_id")
80+
cur_tenant_id = cur_tenant.get("id")
6581
if not cur_tenant_id or cur_tenant_id != tenant_id:
6682
cur_tenant["tenant_id"] = tenant_id
6783
logger.info(f"engine config - tenant {tenant_name} tenant_id prop set")

automation/argo_web_api.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,7 @@ def __init__(self, config: ArgoConfig):
1616
self.config = config
1717

1818
def create_user(
19-
self,
20-
tenant_id: str,
21-
tenant_name: str,
22-
username: str,
23-
role: str,
24-
component: str
19+
self, tenant_id: str, tenant_name: str, username: str, role: str, component: str
2520
):
2621
"""Http call to web-api to create a user"""
2722
logger.debug(
@@ -32,7 +27,7 @@ def create_user(
3227
"name": username,
3328
"email": self.config.argo_ops_email,
3429
"roles": [role],
35-
"component": component
30+
"component": component,
3631
}
3732

3833
url = f"https://{self.config.web_api_endpoint}/api/v2/admin/tenants/{tenant_id}/users"
@@ -95,6 +90,28 @@ def update_tenant_db_info(
9590
f"tenant: {tenant_name} ({tenant_id}) - web-api updating db conf updated"
9691
)
9792

93+
def get_reports(
94+
self,
95+
tenant_id: str,
96+
tenant_name: str,
97+
tenant_access_token: str,
98+
):
99+
"""Retrieve report names and report ids for specific tenant"""
100+
logger.debug(
101+
f"tenant: {tenant_name} ({tenant_id}) - retrieving report information from web-api..."
102+
)
103+
url = f"https://{self.config.web_api_endpoint}/api/v2/reports"
104+
headers = {
105+
"x-api-key": tenant_access_token,
106+
"Accept": "application/json",
107+
}
108+
109+
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
110+
response.raise_for_status()
111+
112+
results = response.json().get("data")
113+
return {item["info"]["name"]: item["id"] for item in results}
114+
98115
def create_ops_profile(
99116
self,
100117
tenant_id: str,
@@ -148,7 +165,9 @@ def get_component_user(
148165

149166
users = response.json().get("data")
150167
if users:
151-
return next((user for user in users if user.get("component") == component), None)
168+
return next(
169+
(user for user in users if user.get("component") == component), None
170+
)
152171
return None
153172

154173
def get_user(self, tenant_id: str, tenant_name: str, user_id: str):

automation/config.yml.example

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ automation:
2222
argo_ops_email: argo_ops@localhost
2323
default_ops_profile_file: ./default.ops.json
2424

25+
run:
26+
hdfs_path: hdfs://localhost:9000/user/argo/test/tenants/
27+
flink_path: /opt/flink/bin/flink
28+
batch_jar_path: /opt/argo/jars/multijob.jar
29+
2530
# The tenants section is automatically configured by engine - don't edit manually
2631
#
2732
# tenants:

automation/init_ams.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import logging
22

3-
from argo_ams_library import (AmsServiceException, AmsUser, AmsUserProject,
4-
ArgoMessagingService)
3+
from argo_ams_library import (
4+
AmsServiceException,
5+
AmsUser,
6+
AmsUserProject,
7+
ArgoMessagingService,
8+
)
59

610
from argo_config import ArgoConfig
711

automation/init_compute_engine.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ def init_compute_engine(
4848

4949
else:
5050
# create the user
51-
user = web_api.create_user(tenant_id, tenant_name, username, role, component)
51+
user = web_api.create_user(
52+
tenant_id, tenant_name, username, role, component
53+
)
5254
if user and username == engine_username:
5355
engine_user_key = user.get("api_key")
5456

automation/init_mongo.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ def init_mongo(
4646
("status_services", index_report_dateint),
4747
("threshold_profiles", index_desc_dateint_id),
4848
("weights", index_desc_dateint_id),
49-
("topology_endpoints",index_desc_dateint_id),
50-
("topology_groups",index_desc_dateint_id),
51-
("topology_service_types",index_desc_dateint_name)
49+
("topology_endpoints", index_desc_dateint_id),
50+
("topology_groups", index_desc_dateint_id),
51+
("topology_service_types", index_desc_dateint_name),
5252
]
5353

5454
for collection_name, index_type in indexes:

automation/run_batch

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
#!/usr/bin/env python3
2+
3+
import argparse
4+
import logging
5+
import subprocess
6+
import sys
7+
from datetime import datetime, timedelta
8+
9+
from argo_config import ArgoConfig
10+
from argo_web_api import ArgoWebApi
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
def run_batch(
16+
config: ArgoConfig,
17+
tenant_name: str,
18+
report_id: str,
19+
cur_date_str: str,
20+
prev_date_str: str,
21+
tenant_web_api_token: str,
22+
dry_run: bool,
23+
):
24+
"""Function that composes the appropriate cli command to submit a batch job execution in flink"""
25+
26+
cmd = [
27+
config.flink_path,
28+
"run",
29+
"-c",
30+
"argo.batch.ArgoMultiJob",
31+
config.batch_jar_path,
32+
"--run.date",
33+
cur_date_str,
34+
"--mongo.url",
35+
f"{config.mongodb_url}/{config.tenant_db_prefix}{tenant_name}",
36+
"--pdata",
37+
f"{config.hdfs_path}/{tenant_name}/mdata/{prev_date_str}",
38+
"--mdata",
39+
f"{config.hdfs_path}/{tenant_name}/mdata/{cur_date_str}",
40+
"--api.endpoint",
41+
config.web_api_endpoint,
42+
"--mongo.method",
43+
"insert",
44+
"--clear.mongo",
45+
"true",
46+
"--api.token",
47+
tenant_web_api_token,
48+
"--report.id",
49+
report_id,
50+
]
51+
52+
if dry_run:
53+
print(("\033[92m" + " ".join(str(x) for x in cmd) + "\033[0m"))
54+
return 0
55+
else:
56+
try:
57+
subprocess.run(cmd, check=True)
58+
except Exception as e:
59+
logger.error(f"Batch Job Error: {e}")
60+
return 1
61+
62+
63+
def main():
64+
parser = argparse.ArgumentParser(description="Argo Automator")
65+
parser.add_argument(
66+
"-c",
67+
"--config",
68+
default="config.yml",
69+
help="Path to configuration file (default: config.yml)",
70+
)
71+
parser.add_argument(
72+
"-l",
73+
"--log-level",
74+
default="INFO",
75+
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
76+
help="Set logging level (default: INFO)",
77+
)
78+
parser.add_argument(
79+
"-d",
80+
"--date",
81+
type=str,
82+
help="set computation date as YYYY-MM-DD",
83+
)
84+
parser.add_argument(
85+
"-t",
86+
"--tenant",
87+
required=True,
88+
type=str,
89+
help="select tenant name for computation",
90+
)
91+
parser.add_argument(
92+
"-r",
93+
"--report",
94+
required=True,
95+
type=str,
96+
help="select report name for computation",
97+
)
98+
parser.add_argument(
99+
"--dry-run",
100+
help="Runs in test mode without actually submitting the job",
101+
action="store_true",
102+
dest="dry_run",
103+
)
104+
105+
args = parser.parse_args()
106+
107+
logging.basicConfig(
108+
level=getattr(logging, args.log_level),
109+
format="%(asctime)s - %(levelname)s - %(message)s",
110+
)
111+
112+
config = ArgoConfig(args.config)
113+
114+
if not config.flink_path:
115+
logging.error(
116+
f"You need to set-up the run.flink_path parameter in your configuration file {args.config}"
117+
)
118+
return 1
119+
120+
if not config.batch_jar_path:
121+
logging.error(
122+
f"You need to set-up the run.batch_jar_path parameter in your configuration file {args.config}"
123+
)
124+
return 1
125+
126+
web_api = ArgoWebApi(config)
127+
128+
ref_date = datetime.strptime(args.date, "%Y-%m-%d") if args.date else datetime.now()
129+
prev_date_str = (ref_date - timedelta(days=1)).strftime("%Y-%m-%d")
130+
cur_date_str = ref_date.strftime("%Y-%m-%d")
131+
132+
# check if tenant exists in config
133+
tenant = config.tenants.get(args.tenant, {})
134+
if not tenant or not tenant.get("id", {}) or not tenant.get("web_api_token", {}):
135+
logging.error(f"Tenant {args.tenant} not configured in compute engine")
136+
return 1
137+
138+
# check if report exists
139+
report_id = (
140+
config.tenants.get(args.tenant, {}).get("reports", {}).get(args.report, {})
141+
)
142+
if not report_id:
143+
# try to update the tenant report configuration
144+
reports = web_api.get_reports(
145+
tenant["id"], args.tenant, tenant["web_api_token"]
146+
)
147+
config.set_tenant_reports(tenant["id"], args.tenant, reports)
148+
149+
# check again if report is included in the updated web-api result
150+
report_id = (
151+
config.tenants.get(args.tenant, {}).get("reports", {}).get(args.report, {})
152+
)
153+
if not report_id:
154+
logging.error(
155+
f"Tenant {args.tenant} does not include a report named: {args.report}"
156+
)
157+
return 1
158+
159+
return run_batch(
160+
config, args.tenant, report_id, cur_date_str, prev_date_str, tenant.get("web_api_token"), args.dry_run
161+
)
162+
163+
164+
if __name__ == "__main__":
165+
sys.exit(main())

0 commit comments

Comments
 (0)