Skip to content

Commit 0443e4a

Browse files
committed
feat: workflow parallelism and idempotency
Add workflow-level controls: - parallelism: 10 (limit concurrent workflows) - activeDeadlineSeconds: 7200 (2h timeout) - mutex synchronization (prevent duplicate processing) Add idempotency to convert.py: - check_output_exists() function (S3 check before conversion) - --overwrite flag to force re-conversion - Skip conversion if output already exists
1 parent cbb4a6d commit 0443e4a

File tree

2 files changed

+44
-0
lines changed

2 files changed

+44
-0
lines changed

scripts/convert.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
import sys
1010
from urllib.parse import urlparse
1111

12+
import boto3
1213
import fsspec
1314
import httpx
1415
import xarray as xr
16+
from botocore.exceptions import ClientError
1517
from eopf_geozarr import create_geozarr_dataset
1618
from eopf_geozarr.conversion.fs_utils import get_storage_options
1719
from get_conversion_params import get_conversion_params
@@ -22,6 +24,28 @@
2224
logger = logging.getLogger(__name__)
2325

2426

27+
def check_output_exists(
28+
s3_output_bucket: str, s3_output_prefix: str, collection: str, item_id: str
29+
) -> bool:
30+
"""Check if output already exists in S3."""
31+
try:
32+
s3 = boto3.client(
33+
"s3",
34+
endpoint_url=os.getenv("AWS_ENDPOINT_URL"),
35+
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
36+
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
37+
)
38+
prefix = f"{s3_output_prefix}/{collection}/{item_id}.zarr/"
39+
response = s3.list_objects_v2(Bucket=s3_output_bucket, Prefix=prefix, MaxKeys=1)
40+
exists: bool = response.get("KeyCount", 0) > 0
41+
if exists:
42+
logger.info(f"Output exists at s3://{s3_output_bucket}/{prefix} - skipping")
43+
return exists
44+
except ClientError as e:
45+
logger.warning(f"Could not check S3: {e}")
46+
return False
47+
48+
2549
def get_zarr_url(stac_item_url: str) -> str:
2650
"""Get Zarr asset URL from STAC item."""
2751
r = httpx.get(stac_item_url, timeout=30.0, follow_redirects=True)
@@ -46,6 +70,7 @@ def run_conversion(
4670
collection: str,
4771
s3_output_bucket: str,
4872
s3_output_prefix: str,
73+
overwrite: bool = False,
4974
) -> str:
5075
"""Run GeoZarr conversion workflow.
5176
@@ -54,6 +79,7 @@ def run_conversion(
5479
collection: Collection ID for parameter lookup
5580
s3_output_bucket: S3 bucket for output
5681
s3_output_prefix: S3 prefix for output
82+
overwrite: Force conversion even if output exists
5783
5884
Returns:
5985
Output Zarr URL (s3://...)
@@ -65,6 +91,13 @@ def run_conversion(
6591
item_id = urlparse(source_url).path.rstrip("/").split("/")[-1]
6692
logger.info(f"Starting GeoZarr conversion for {item_id}")
6793

94+
# Idempotency: skip if output exists
95+
if not overwrite and check_output_exists(
96+
s3_output_bucket, s3_output_prefix, collection, item_id
97+
):
98+
output_url = f"s3://{s3_output_bucket}/{s3_output_prefix}/{collection}/{item_id}.zarr"
99+
return output_url
100+
68101
# Resolve source: STAC item or direct Zarr URL
69102
if "/items/" in source_url:
70103
logger.info("Extracting Zarr URL from STAC item...")
@@ -149,6 +182,11 @@ def main(argv: list[str] | None = None) -> int:
149182
parser.add_argument("--collection", required=True, help="Collection ID")
150183
parser.add_argument("--s3-output-bucket", required=True, help="S3 output bucket")
151184
parser.add_argument("--s3-output-prefix", required=True, help="S3 output prefix")
185+
parser.add_argument(
186+
"--overwrite",
187+
action="store_true",
188+
help="Force conversion even if output exists",
189+
)
152190
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
153191

154192
args = parser.parse_args(argv)
@@ -162,6 +200,7 @@ def main(argv: list[str] | None = None) -> int:
162200
args.collection,
163201
args.s3_output_bucket,
164202
args.s3_output_prefix,
203+
overwrite=args.overwrite,
165204
)
166205
logger.info(f"Success: {output_url}")
167206
return 0

workflows/base/workflowtemplate.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ spec:
77
serviceAccountName: operate-workflow-sa
88
entrypoint: main
99
archiveLogs: false
10+
parallelism: 10
11+
activeDeadlineSeconds: 7200
1012
ttlStrategy:
1113
secondsAfterCompletion: 86400
1214
podGC:
@@ -45,6 +47,9 @@ spec:
4547

4648
- name: convert-geozarr
4749
activeDeadlineSeconds: 3600
50+
synchronization:
51+
mutex:
52+
name: "item-{{workflow.parameters.source_url | sha1sum | trunc 8}}"
4853
script:
4954
image: ghcr.io/eopf-explorer/data-pipeline:{{workflow.parameters.pipeline_image_version}}
5055
imagePullPolicy: Always

0 commit comments

Comments
 (0)