Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/wipac-cicd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ jobs:
with:
token: ${{ secrets.PERSONAL_ACCESS_TOKEN }}
ref: ${{ github.ref }} # dont lock to sha (action needs to push)
- uses: WIPACrepo/wipac-dev-py-setup-action@v5.6
- uses: WIPACrepo/wipac-dev-py-setup-action@v5.8
with:
mode: PACKAGING
python_min: 3.13
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ _Launch a new scan of an event_
| Argument | Type | Required/Default | Description |
|-----------------------------------|-----------------------------------------------------------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `"docker_tag"` | str | *[REQUIRED]* | the docker tag of the Skymap Scanner image (must be in CVMFS). Ex: `v3.1.4`, `v3.5`, `v3`, `latest`, `eqscan-6207146` (branch-based tag)
| `"cluster"` | dict or list | *[REQUIRED]* | the worker cluster(s) to use *along with the number of workers for each:* Example: `{"sub-2": 1234}`. NOTE: To request a schedd more than once, provide a list of 2-lists instead (Ex: `[ ["sub-2", 56], ["sub-2", 1234] ]`)
| `"cluster"` | dict or list | *[REQUIRED]* | the worker cluster(s) to use *along with the number of workers for each:* Example: `{"osg": 1234}`. NOTE: To request a cluster more than once, provide a list of 2-lists instead (Ex: `[ ["osg", 56], ["osg", 1234] ]`)
| `"reco_algo"` | bool | *[REQUIRED]* | which reco algorithm to use (see [Skymap Scanner](https://github.com/icecube/skymap_scanner/tree/main/skymap_scanner/recos))
| `"event_i3live_json"` | dict or str | *[REQUIRED]* | Realtime's JSON event format
| `"nsides"` | dict | *[REQUIRED]* | the nside progression to use (see [Skymap Scanner](https://github.com/icecube/skymap_scanner))
Expand Down
2 changes: 1 addition & 1 deletion resources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This is a module that runs the Skymap Scanner CI tests within a production instance (`skydriver` or `skydriver-dev`).

```
python -m prod_tester --skydriver (dev|prod) --cluster sub-2 --n-workers 100 --priority 100 [--one]
python -m prod_tester --skydriver (dev|prod) --cluster osg --n-workers 100 --priority 100 [--one]
```

## SkyDriver Client Request Helper Scripts
Expand Down
21 changes: 17 additions & 4 deletions resources/prod_tester/test_suit_prod.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import shutil
import subprocess
import tarfile
from datetime import datetime
from datetime import date, datetime
from pathlib import Path

import texttable # type: ignore
Expand Down Expand Up @@ -63,6 +63,15 @@ def compare_results(
diffs_dir = config.SANDBOX_DIR / "result_diffs"
diffs_dir.mkdir(parents=True, exist_ok=True)

def _check_time_bomb() -> bool:
# see https://github.com/icecube/skymap_scanner/blob/cb422e412d1607ce1e0ea2db4402a4e3461908ed/.github/workflows/tests.yml#L539-L560
if date.today() > date(2026, 3, 18):
raise RuntimeError(
"************************* HEY LOOK AT THIS FAILURE ************************* "
"we need to attend to # see https://github.com/icecube/skymap_scanner/blob/cb422e412d1607ce1e0ea2db4402a4e3461908ed/.github/workflows/tests.yml#L539-L560"
)
return True

result = subprocess.run(
[
"python",
Expand All @@ -74,8 +83,12 @@ def compare_results(
"--diff-out-dir",
str(diffs_dir),
"--assert",
"--compare-different-versions-ok",
],
]
+ ( # see https://github.com/icecube/skymap_scanner/blob/cb422e412d1607ce1e0ea2db4402a4e3461908ed/.github/workflows/tests.yml#L539-L560
["--compare-different-versions-ok"]
if test.reco_algo == "splinempe" and _check_time_bomb()
else []
),
capture_output=True,
text=True,
)
Expand Down Expand Up @@ -377,7 +390,7 @@ async def main():
parser.add_argument(
"--cluster",
required=True,
help="the cluster to use for running workers. Ex: sub-2",
help="the cluster to use for running workers. Ex: osg",
)
parser.add_argument(
"--skyscan-docker-tag",
Expand Down
3 changes: 2 additions & 1 deletion resources/pyrequest/add_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ async def main():
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"scan_ids",
"--scan-ids",
nargs="+",
required=True,
help="one or more scan IDs to rescan",
)
parser.add_argument(
Expand Down
78 changes: 52 additions & 26 deletions resources/pyrequest/get_ewms_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@
import tempfile
from pathlib import Path
from typing import AsyncIterable
from urllib.parse import urlparse

import requests
from rest_tools.client import RestClient
from wipac_dev_tools import logging_tools

from _connect import get_rest_client # type: ignore[import-not-found]

EWMS_HELPER_SCRIPT_URLS = [
"https://raw.githubusercontent.com/Observation-Management-Service/ewms-workflow-management-service/refs/heads/main/resources/get_all_ids.py",
"https://raw.githubusercontent.com/Observation-Management-Service/ewms-workflow-management-service/refs/heads/main/resources/utils.py",
]
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)

EWMS_HELPER_SCRIPT_URL = "https://raw.githubusercontent.com/Observation-Management-Service/ewms-workflow-management-service/refs/heads/main/resources/get_all_ids.py"


def print_scan_id(scan_id: str) -> None:
"""Print a scan ID in a formatted way."""
Expand Down Expand Up @@ -47,36 +51,57 @@ async def grab_scan_info(rc: RestClient, scan_ids: list[str]) -> AsyncIterable[d
}


def run_ewms_helper_script(cl_args: str) -> list[dict]:
def run_ewms_helper_script(ewms_args: str) -> list[dict]:
"""Download and run the EWMS helper script with the given CLI args, returning parsed JSON output."""
with tempfile.NamedTemporaryFile(suffix=".py") as f:
subprocess.run(
["curl", "-sSfL", EWMS_HELPER_SCRIPT_URL, "-o", f.name],
check=True,
)

with tempfile.TemporaryDirectory() as tmpdir:
tmpdir_path = Path(tmpdir)
downloaded = []
for url in EWMS_HELPER_SCRIPT_URLS:
name = Path(urlparse(url).path).name # keep the file's original name
dest = tmpdir_path / name
subprocess.run(
["curl", "-sSfL", url, "-o", str(dest)],
check=True,
)
downloaded.append(dest)

script = [p for p in downloaded if p.name == "get_all_ids.py"][0]

# check if the token file exists -- this is hard to do with subproc w/ captured stdout
if (
not Path(
f"~/device-refresh-token-ewms-{'prod' if 'prod' in cl_args else 'dev'}"
f"~/device-refresh-token-ewms-{'prod' if 'prod' in ewms_args else 'dev'}"
)
.expanduser()
.exists()
):
LOGGER.warning("you need a device client token first, so running 2x...")
subprocess.run(["python3", f.name, *cl_args.split()], check=True)
subprocess.run(["python3", script, *ewms_args.split()], check=True)

# get ids from ewms
proc = subprocess.run(
["python3", f.name, *cl_args.split()],
stdout=subprocess.PIPE,
stderr=sys.stderr,
check=True,
text=True,
)
print("ewms ids ::")
print(proc.stdout.strip())
return json.loads(proc.stdout)
try:
proc = subprocess.run(
["python3", script, *ewms_args.split()],
stdout=subprocess.PIPE,
stderr=sys.stderr,
check=True,
text=True,
)
print("ewms ids ::")
print(proc.stdout.strip())
except subprocess.CalledProcessError as e:
print(e.stdout.strip(), file=sys.stderr)
raise RuntimeError(
"issue running ewms helper script — see above error message "
"(if this is an issue with '--ewms-args', run with '--ewms-args -h')"
) from None # suppress 'CalledProcessError' stack trace

# done here
if " -h" in ewms_args:
sys.exit(2)
else:
return json.loads(proc.stdout)


def print_banner(char: str = "-", width: int = 40) -> None:
Expand All @@ -91,7 +116,7 @@ async def main():
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"scan_ids",
"--scan-ids",
nargs="*",
help="One or more SkyDriver scan IDs",
)
Expand All @@ -106,14 +131,15 @@ async def main():
),
)
parser.add_argument(
"--ewms-ids",
"--ewms-args",
default="",
help="Command-line args to forward to the EWMS helper script (use only if not providing scan_ids)",
help="Command-line args to forward to the EWMS helper script (use only if not providing --scan-ids)",
)
args = parser.parse_args()
logging_tools.log_argparse_args(args, LOGGER)

if bool(args.scan_ids) == bool(args.ewms_ids):
raise ValueError("Specify exactly one of: SCAN_IDS or --ewms-ids")
if bool(args.scan_ids) == bool(args.ewms_args):
raise ValueError("Specify exactly one of: --scan-ids or --ewms-args")

rc = get_rest_client(args.skydriver_type)

Expand All @@ -132,7 +158,7 @@ async def main():
# EWMS -> SkyDriver
else:
print_banner()
for ids in run_ewms_helper_script(args.ewms_ids):
for ids in run_ewms_helper_script(args.ewms_args):
resp = await rc.request(
"POST",
"/scans/find",
Expand Down
3 changes: 2 additions & 1 deletion resources/pyrequest/get_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ async def main():
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"scan_id",
"--scan-id",
required=True,
help="the scan's id",
)
parser.add_argument(
Expand Down
3 changes: 2 additions & 1 deletion resources/pyrequest/get_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ async def main() -> None:
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"scan_ids",
"--scan-ids",
nargs="+",
required=True,
help="One or more scan IDs",
)
parser.add_argument(
Expand Down
49 changes: 49 additions & 0 deletions resources/pyrequest/get_scan_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Helper to GET one or more stored scan-requests and print them.

Usage:
python get_scan_request.py --scan-ids <scan_id> [<scan_id> ...] --skydriver {dev,prod}
"""

import argparse
import asyncio
import json
import logging
from typing import Any

from _connect import get_rest_client # type: ignore[import-not-found]

logging.getLogger().setLevel(logging.INFO)


async def main() -> None:
parser = argparse.ArgumentParser(
description="GET one or more stored scan-requests and print them",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--scan-ids",
nargs="+",
required=True,
help="one or more scan IDs to retrieve",
)
parser.add_argument(
"--skydriver",
dest="skydriver_type",
required=True,
choices=["dev", "prod"],
help="SkyDriver environment",
)
args = parser.parse_args()

rc = get_rest_client(args.skydriver_type)

for scan_id in args.scan_ids:
logging.info(f"Fetching stored request for scan_id={scan_id}")
source: dict[str, Any] = await rc.request("GET", f"/scan-request/{scan_id}")
print(f"\n=== scan {scan_id} ===", flush=True)
print(json.dumps(source, indent=4), flush=True)


if __name__ == "__main__":
asyncio.run(main())
logging.info("Done.")
3 changes: 2 additions & 1 deletion resources/pyrequest/remix_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ async def main():
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"scan_id",
"--scan-id",
required=True,
help="existing scan_id whose stored scan-request will be used as a template",
)
parser.add_argument(
Expand Down
3 changes: 2 additions & 1 deletion resources/pyrequest/rescan.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ async def main():
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"scan_ids",
"--scan-ids",
nargs="+",
required=True,
help="one or more scan IDs to rescan",
)
parser.add_argument(
Expand Down
3 changes: 2 additions & 1 deletion resources/pyrequest/stop_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ async def main():
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"scan_ids",
"--scan-ids",
nargs="+",
required=True,
help="one or more scan IDs to stop",
)
parser.add_argument(
Expand Down
7 changes: 5 additions & 2 deletions skydriver/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ class EnvConfig:

# cache durations
CACHE_DURATION_EWMS: int = 1 * 60
CACHE_DURATION_DOCKER_HUB: int = 5 * 60
CACHE_DURATION_PROMETHEUS: int = 15 * 60

CVMFS_SKYSCAN_SINGULARITY_IMAGES_DIR: Path = Path(
Expand Down Expand Up @@ -183,7 +182,11 @@ def __post_init__(self) -> None:

# known cluster locations
KNOWN_CLUSTERS: dict[str, dict[str, Any]] = {
"sub-2": {
# NOTE -- even if the cluster does not have special settings, it must be present in dict
"osg": {
"max_n_clients_during_debug_mode": 100,
},
"sub-2": { # Nov 2025: sub-2 was replaced with osg — ewms will relabel any 'sub-2' requests as 'osg'
"max_n_clients_during_debug_mode": 100,
},
LOCAL_K8S_HOST: {
Expand Down
20 changes: 15 additions & 5 deletions skydriver/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path

import aiocache # type: ignore[import-untyped]
from async_lru import alru_cache
import async_lru
from wipac_dev_tools.container_registry_tools import (
CVMFSRegistryTools,
DockerHubRegistryTools,
Expand Down Expand Up @@ -57,25 +57,35 @@ def get_skyscan_docker_image(tag: str) -> str:
# utils


@alru_cache # cache it forever
@async_lru.alru_cache # cache forever
async def min_skyscan_tag_ts() -> float:
"""Get the timestamp for when the `MIN_SKYMAP_SCANNER_TAG` image was created."""
info, _ = await get_info_from_docker_hub(ENV.MIN_SKYMAP_SCANNER_TAG)
return DockerHubRegistryTools.parse_image_ts(info)


@aiocache.cached(ttl=ENV.CACHE_DURATION_DOCKER_HUB) # fyi: tags can be overwritten
@async_lru.alru_cache # cache forever -- errors aren't cached (i.e. a missing image tag)
async def get_info_from_docker_hub(docker_tag: str) -> tuple[dict, str]:
"""Cache docker hub api call."""
"""Cache docker hub api call.

Raises:
ImageNotFoundException
"""
docker_hub = DockerHubRegistryTools(_SKYSCAN_DOCKER_IMAGE_NAMESPACE, _IMAGE_NAME)
ret = docker_hub.request_info(docker_tag)

await asyncio.sleep(0) # let pending async tasks do things after http request
return ret


@aiocache.cached(ttl=5) # very short ttl just to stop stampedes
async def resolve_docker_tag(docker_tag: str) -> str:
"""Check if the docker tag exists, then resolve 'latest' if needed."""
"""Check if the docker tag exists, then resolve 'latest' if needed.

Raises:
ImageNotFoundException
ImageTooOldException
"""
LOGGER.info(f"checking docker tag: {docker_tag}")

# cvmfs is the source of truth
Expand Down
Loading
Loading