diff --git a/README.md b/README.md index 7f1e160..c8aaae7 100644 --- a/README.md +++ b/README.md @@ -56,33 +56,21 @@ Transforms Sentinel-1/2 satellite data into web-ready visualizations: - Sentinel-1 GRD (SAR backscatter) -## Requirements & Setup +## Setup -### Prerequisites +**Prerequisites:** +- Kubernetes cluster with [platform-deploy](https://github.com/EOPF-Explorer/platform-deploy) (Argo Workflows, RabbitMQ, STAC API, TiTiler) +- Python 3.13+ with `uv` +- `kubectl` configured -- **Kubernetes cluster** with [platform-deploy](https://github.com/EOPF-Explorer/platform-deploy) infrastructure - - Argo Workflows (pipeline orchestration) - - RabbitMQ (event-driven automation) - - STAC API & TiTiler (catalog & visualization) -- **Python 3.13+** with `uv` package manager -- **S3 storage** credentials (OVH de region) -- **Kubeconfig** in `.work/kubeconfig` +**📖 Complete setup guide:** See [workflows/README.md](workflows/README.md) for: +- kubectl configuration (OVH Manager kubeconfig download) +- Required secrets (RabbitMQ, S3, STAC API) +- Workflow deployment (`kubectl apply -k`) -Verify infrastructure: +**Quick verification:** ```bash -export KUBECONFIG=$(pwd)/.work/kubeconfig -kubectl get pods -n core -l app.kubernetes.io/name=argo-workflows -kubectl get pods -n core -l app.kubernetes.io/name=rabbitmq -``` - -### Deploy Workflows - -```bash -# Apply to staging -kubectl apply -k workflows/overlays/staging - -# Apply to production -kubectl apply -k workflows/overlays/production +kubectl get wf,sensor,eventsource -n devseed-staging ``` --- @@ -208,60 +196,18 @@ docker/Dockerfile # Pipeline image tools/submit_burst.py # RabbitMQ burst submission tool ``` -Tests are available in `tests/` directory (unit and integration tests using pytest). - ---- - -## Deploy - -```bash -# Apply to staging -kubectl apply -k workflows/overlays/staging - -# Apply to production -kubectl apply -k workflows/overlays/production -``` - -**Config:** Image version, S3 endpoints, STAC API URLs, RabbitMQ exchanges configured via kustomize overlays. +Tests are planned for `tests/` directory (structure exists, test files to be added). --- ## Configuration -### S3 Storage - -```bash -kubectl create secret generic geozarr-s3-credentials -n devseed-staging \ - --from-literal=AWS_ACCESS_KEY_ID="" \ - --from-literal=AWS_SECRET_ACCESS_KEY="" -``` - -| Setting | Value | -|---------|-------| -| **Endpoint** | `https://s3.de.io.cloud.ovh.net` | -| **Bucket** | `esa-zarr-sentinel-explorer-fra` | -| **Region** | `de` | +**📖 Full configuration:** See [workflows/README.md](workflows/README.md) for secrets setup and parameters. -### RabbitMQ - -Get password: -```bash -kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d -``` - -| Setting | Value | -|---------|-------| -| **URL** | `amqp://user:PASSWORD@rabbitmq.core.svc.cluster.local:5672/` | -| **Exchange** | `geozarr-staging` | -| **Routing key** | `eopf.items.test` | - -**Message format:** -```json -{ - "source_url": "https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items/...", - "collection": "sentinel-2-l2a-dp-test" -} -``` +**Quick reference:** +- S3: `s3.de.io.cloud.ovh.net` / `esa-zarr-sentinel-explorer-fra` +- Staging collection: `sentinel-2-l2a-dp-test` +- Production collection: `sentinel-2-l2a` --- diff --git a/pyproject.toml b/pyproject.toml index 5acce80..3633aac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,6 @@ dependencies = [ "click>=8.1.0", "pika>=1.3.0", "tenacity>=8.0.0", - "requests>=2.31.0", "morecantile>=5.0.0", "cf-xarray>=0.9.0", "eopf-geozarr @ git+https://github.com/EOPF-Explorer/data-model.git@fix/s1-encoding-conflict", @@ -146,6 +145,7 @@ warn_no_return = true strict_equality = true exclude = ["examples/"] +# Relax type checking for test files (structure exists, tests to be added) [[tool.mypy.overrides]] module = "tests.*" disallow_untyped_defs = false diff --git a/scripts/augment_stac_item.py b/scripts/augment_stac_item.py index 8b7097c..2bb1cff 100755 --- a/scripts/augment_stac_item.py +++ b/scripts/augment_stac_item.py @@ -66,9 +66,8 @@ def add_visualization(item: Item, raster_base: str, collection_id: str) -> None: _add_tile_links(item, base_url, query, "Sentinel-1 GRD VH") elif coll_lower.startswith(("sentinel-2", "sentinel2")): - # S2: Point to overview level 0 for quicklook TCI - # Use /r10m/0/tci path to access the overview array with spatial_ref - var_path = "/quality/l2a_quicklook/r10m/0/tci" + # S2: Use colon separator for TiTiler variable path + var_path = "/quality/l2a_quicklook/r10m:tci" query = ( f"variables={urllib.parse.quote(var_path, safe='')}&bidx=1&bidx=2&bidx=3&assets=TCI_10m" ) @@ -172,7 +171,7 @@ def main(argv: list[str] | None = None) -> int: return 0 except Exception as e: - logger.error(f"Failed to augment {args.item_id}: {e}") + logger.error(f"Failed to augment {item_url}: {e}") return 1 diff --git a/scripts/convert.py b/scripts/convert.py index 1dc1283..d5d83ed 100755 --- a/scripts/convert.py +++ b/scripts/convert.py @@ -9,9 +9,11 @@ import sys from urllib.parse import urlparse +import boto3 import fsspec import httpx import xarray as xr +from botocore.exceptions import ClientError from eopf_geozarr import create_geozarr_dataset from eopf_geozarr.conversion.fs_utils import get_storage_options from get_conversion_params import get_conversion_params @@ -22,6 +24,28 @@ logger = logging.getLogger(__name__) +def check_output_exists( + s3_output_bucket: str, s3_output_prefix: str, collection: str, item_id: str +) -> bool: + """Check if output already exists in S3.""" + try: + s3 = boto3.client( + "s3", + endpoint_url=os.getenv("AWS_ENDPOINT_URL"), + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + ) + prefix = f"{s3_output_prefix}/{collection}/{item_id}.zarr/" + response = s3.list_objects_v2(Bucket=s3_output_bucket, Prefix=prefix, MaxKeys=1) + exists: bool = response.get("KeyCount", 0) > 0 + if exists: + logger.info(f"Output exists at s3://{s3_output_bucket}/{prefix} - skipping") + return exists + except ClientError as e: + logger.warning(f"Could not check S3: {e}") + return False + + def get_zarr_url(stac_item_url: str) -> str: """Get Zarr asset URL from STAC item.""" r = httpx.get(stac_item_url, timeout=30.0, follow_redirects=True) @@ -46,6 +70,7 @@ def run_conversion( collection: str, s3_output_bucket: str, s3_output_prefix: str, + overwrite: bool = False, ) -> str: """Run GeoZarr conversion workflow. @@ -54,6 +79,7 @@ def run_conversion( collection: Collection ID for parameter lookup s3_output_bucket: S3 bucket for output s3_output_prefix: S3 prefix for output + overwrite: Force conversion even if output exists Returns: Output Zarr URL (s3://...) @@ -65,6 +91,13 @@ def run_conversion( item_id = urlparse(source_url).path.rstrip("/").split("/")[-1] logger.info(f"Starting GeoZarr conversion for {item_id}") + # Idempotency: skip if output exists + if not overwrite and check_output_exists( + s3_output_bucket, s3_output_prefix, collection, item_id + ): + output_url = f"s3://{s3_output_bucket}/{s3_output_prefix}/{collection}/{item_id}.zarr" + return output_url + # Resolve source: STAC item or direct Zarr URL if "/items/" in source_url: logger.info("Extracting Zarr URL from STAC item...") @@ -149,6 +182,11 @@ def main(argv: list[str] | None = None) -> int: parser.add_argument("--collection", required=True, help="Collection ID") parser.add_argument("--s3-output-bucket", required=True, help="S3 output bucket") parser.add_argument("--s3-output-prefix", required=True, help="S3 output prefix") + parser.add_argument( + "--overwrite", + action="store_true", + help="Force conversion even if output exists", + ) parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") args = parser.parse_args(argv) @@ -162,6 +200,7 @@ def main(argv: list[str] | None = None) -> int: args.collection, args.s3_output_bucket, args.s3_output_prefix, + overwrite=args.overwrite, ) logger.info(f"Success: {output_url}") return 0 diff --git a/scripts/create_geozarr_item.py b/scripts/create_geozarr_item.py index 782ca66..8bf95b7 100755 --- a/scripts/create_geozarr_item.py +++ b/scripts/create_geozarr_item.py @@ -5,7 +5,6 @@ import argparse import logging -import re from typing import Any from urllib.parse import urlparse @@ -31,30 +30,6 @@ def s3_to_https(s3_url: str, endpoint: str) -> str: return f"https://{bucket}.{host}/{path}" -def normalize_r60m_href(href: str) -> str: - """Add /0/ subdirectory to r10m/r20m/r60m paths to match GeoZarr output structure. - - GeoZarr conversion creates /0/ subdirectories (overview level 0) for all - resolution bands. This normalizes asset hrefs accordingly. - - Example: .../r10m/tci → .../r10m/0/tci - .../r60m/b09 → .../r60m/0/b09 - """ - # Check for any resolution level pattern - for res in ["r10m", "r20m", "r60m"]: - if f"/{res}/" not in href: - continue - - # If already has /0/ or other digit subdirectory, don't modify - if re.search(rf"/{res}/\d+/", href): - continue - - # Insert /0/ after /{res}/ - href = re.sub(rf"/({res})/", r"/\1/0/", href) - - return href - - def find_source_zarr_base(source_item: dict) -> str | None: """Find the base Zarr URL from source item assets.""" for asset in source_item.get("assets", {}).values(): @@ -112,9 +87,6 @@ def create_geozarr_item( subpath = old_href[len(source_zarr_base) :] new_href = output_zarr_base + subpath - # Normalize r60m paths to include /0/ subdirectory (GeoZarr structure) - new_href = normalize_r60m_href(new_href) - # Convert to https if needed if new_href.startswith("s3://"): new_href = s3_to_https(new_href, s3_endpoint) diff --git a/scripts/get_zarr_url.py b/scripts/get_zarr_url.py new file mode 100755 index 0000000..04cc4d8 --- /dev/null +++ b/scripts/get_zarr_url.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +"""Extract Zarr URL from STAC item - standalone script for workflow templates.""" + +import sys + +import httpx + + +def get_zarr_url(stac_item_url: str) -> str: + """Get Zarr asset URL from STAC item.""" + r = httpx.get(stac_item_url, timeout=30.0, follow_redirects=True) + r.raise_for_status() + assets = r.json().get("assets", {}) + + # Priority: product, zarr, then any .zarr asset + for key in ["product", "zarr"]: + if key in assets and (href := assets[key].get("href")): + return str(href) + + # Fallback: any asset with .zarr in href + for asset in assets.values(): + if ".zarr" in asset.get("href", ""): + return str(asset["href"]) + + raise RuntimeError("No Zarr asset found in STAC item") + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: get_zarr_url.py ", file=sys.stderr) + sys.exit(1) + + try: + zarr_url = get_zarr_url(sys.argv[1]) + print(zarr_url) + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) diff --git a/uv.lock b/uv.lock index 7948ac3..1774fa9 100644 --- a/uv.lock +++ b/uv.lock @@ -410,7 +410,6 @@ dependencies = [ { name = "pika" }, { name = "pystac" }, { name = "pystac-client" }, - { name = "requests" }, { name = "s3fs" }, { name = "tenacity" }, { name = "xarray" }, @@ -444,7 +443,6 @@ requires-dist = [ { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0.0" }, { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=4.1.0" }, { name = "pytest-mock", marker = "extra == 'dev'", specifier = ">=3.12.0" }, - { name = "requests", specifier = ">=2.31.0" }, { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8.0" }, { name = "s3fs", specifier = ">=2024.0.0" }, { name = "tenacity", specifier = ">=8.0.0" }, diff --git a/workflows/README.md b/workflows/README.md index 0495bfc..eaade21 100644 --- a/workflows/README.md +++ b/workflows/README.md @@ -1,142 +1,150 @@ # Workflows -Argo Workflows configuration using Kustomize for environment management. +Event-driven Argo Workflows for Sentinel-2 GeoZarr conversion and STAC registration. -## Purpose +**Architecture**: RabbitMQ messages → Sensor → WorkflowTemplate (convert → register) → S3 + STAC API -Event-driven pipeline orchestration for Sentinel-2 GeoZarr conversion and STAC registration. RabbitMQ messages trigger workflows that run a 2-step DAG: **convert → register**. +--- -## Structure +## Quick Setup -``` -workflows/ -├── base/ # Core resources (namespace-agnostic) -│ ├── kustomization.yaml # References all resources -│ ├── workflowtemplate.yaml # 2-step pipeline DAG -│ ├── sensor.yaml # RabbitMQ → Workflow trigger -│ ├── eventsource.yaml # RabbitMQ connection config -│ └── rbac.yaml # ServiceAccount + permissions -└── overlays/ - ├── staging/ - │ └── kustomization.yaml # devseed-staging namespace patches - └── production/ - └── kustomization.yaml # devseed namespace patches -``` +### 1. Configure kubectl -## Apply to Cluster +Download kubeconfig from [OVH Manager → Kubernetes](https://www.ovh.com/manager/#/public-cloud/pci/projects/bcc5927763514f499be7dff5af781d57/kubernetes/f5f25708-bd15-45b9-864e-602a769a5fcf/service) (**Access and Security** tab). -**Staging (devseed-staging):** ```bash -kubectl apply -k workflows/overlays/staging +mv ~/Downloads/kubeconfig-*.yml .work/kubeconfig +export KUBECONFIG=$(pwd)/.work/kubeconfig +kubectl get nodes # Verify: should list 3-5 nodes ``` -**Production (devseed):** +### 2. Create Required Secrets + +The pipeline needs 3 secrets for: **event ingestion** (RabbitMQ), **output storage** (S3), and **STAC registration** (API auth). + +**RabbitMQ credentials** (receives workflow trigger events): ```bash -kubectl apply -k workflows/overlays/production +# Get password from cluster-managed secret +RABBITMQ_PASS=$(kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d) + +kubectl create secret generic rabbitmq-credentials -n devseed-staging \ + --from-literal=username=user \ + --from-literal=password="$RABBITMQ_PASS" ``` -**Verify deployment:** +**S3 credentials** (writes converted GeoZarr files): ```bash -# Check resources (expected output shows 1 of each) -kubectl get workflowtemplate,sensor,eventsource,sa -n devseed-staging +# Get from OVH Manager → Users & Roles → OpenStack credentials +# https://www.ovh.com/manager/\#/public-cloud/pci/projects/bcc5927763514f499be7dff5af781d57/users -# Example output: -# NAME AGE -# workflowtemplate.argoproj.io/geozarr-pipeline 5m -# -# NAME AGE -# sensor.argoproj.io/geozarr-sensor 5m -# -# NAME AGE -# eventsource.argoproj.io/rabbitmq-geozarr 5m -# -# NAME SECRETS AGE -# serviceaccount/operate-workflow-sa 0 5m - -# Watch for workflows (should show Running/Succeeded/Failed) -kubectl get wf -n devseed-staging --watch +kubectl create secret generic geozarr-s3-credentials -n devseed-staging \ + --from-literal=AWS_ACCESS_KEY_ID= \ + --from-literal=AWS_SECRET_ACCESS_KEY= ``` -## Required Secrets +**STAC API token** (registers items, optional if API is public): +```bash +kubectl create secret generic stac-api-token -n devseed-staging \ + --from-literal=token= +``` -The pipeline requires these Kubernetes secrets in the target namespace: +### 3. Deploy Workflows -### 1. `rabbitmq-credentials` -RabbitMQ authentication for EventSource: +```bash +kubectl apply -k workflows/overlays/staging # Staging (devseed-staging) +kubectl apply -k workflows/overlays/production # Production (devseed) +``` +**Verify deployment:** ```bash -kubectl create secret generic rabbitmq-credentials \ - --from-literal=username= \ - --from-literal=password= \ - -n devseed-staging +kubectl get workflowtemplate,sensor,eventsource,sa -n devseed-staging +# Expected: 1 WorkflowTemplate, 1 Sensor, 1 EventSource, 1 ServiceAccount ``` -### 2. `geozarr-s3-credentials` -S3 credentials for GeoZarr output: +--- + +## Structure -```bash -kubectl create secret generic geozarr-s3-credentials \ - --from-literal=AWS_ACCESS_KEY_ID= \ - --from-literal=AWS_SECRET_ACCESS_KEY= \ - -n devseed-staging ``` +workflows/ +├── base/ # Core resources (namespace-agnostic) +│ ├── workflowtemplate.yaml # 2-step DAG: convert → register +│ ├── sensor.yaml # RabbitMQ trigger +│ ├── eventsource.yaml # RabbitMQ connection +│ ├── rbac.yaml # Permissions +│ └── kustomization.yaml +└── overlays/ + ├── staging/ # devseed-staging namespace + └── production/ # devseed namespace +``` + +--- -### 3. `stac-api-token` (optional) -Bearer token for STAC API authentication (if required): +## Monitoring +**Watch workflows:** ```bash -kubectl create secret generic stac-api-token \ - --from-literal=token= \ - -n devseed-staging +kubectl get wf -n devseed-staging --watch +``` + +**Example output:** +``` +NAME STATUS AGE +geozarr-79jmg Running 5m +geozarr-95rgx Succeeded 9h +geozarr-jflnj Failed 10h ``` -## WorkflowTemplate Parameters +--- + +## Configuration + +### S3 Storage + +- **Endpoint**: `https://s3.de.io.cloud.ovh.net` (OVH Frankfurt) +- **Bucket**: `esa-zarr-sentinel-explorer-fra` +- **Paths**: `tests-output/` (staging), `geozarr/` (production) -See main [README.md](../README.md) for complete parameter reference. +### Workflow Parameters -| Parameter | Default | Description | -|-----------|---------|-------------| -| `source_url` | - | STAC item URL or direct Zarr URL | -| `register_collection` | sentinel-2-l2a-dp-test | STAC collection ID | -| `stac_api_url` | https://api... | STAC API endpoint | -| `raster_api_url` | https://api... | TiTiler endpoint | -| `s3_output_bucket` | esa-zarr... | S3 output bucket | -| `pipeline_image_version` | fix-unit-tests | Docker image tag | +Key parameters (see [../README.md](../README.md) for full reference): -## Resource Configuration +- `source_url`: STAC item URL or Zarr URL +- `register_collection`: Target STAC collection (default: `sentinel-2-l2a-dp-test`) +- `s3_output_bucket`: Output bucket +- `pipeline_image_version`: Docker image tag -To adjust CPU/memory limits, edit `workflows/base/workflowtemplate.yaml`: +### Resource Tuning + +Edit `workflows/base/workflowtemplate.yaml`: ```yaml -- name: convert-geozarr - resources: - requests: - memory: 4Gi # Increase for larger datasets - cpu: '1' - limits: - memory: 8Gi - cpu: '2' +resources: + requests: { memory: 4Gi, cpu: '1' } + limits: { memory: 8Gi, cpu: '2' } # Increase for larger datasets ``` +--- + ## Troubleshooting -**Kustomize build fails:** +**Workflow not triggered:** ```bash -# Validate structure -kubectl kustomize workflows/overlays/staging +kubectl logs -n devseed-staging -l eventsource-name=rabbitmq # Check RabbitMQ connection +kubectl get sensor -n devseed-staging geozarr-trigger -o yaml # Check sensor status +``` -# Check for duplicate resources -find workflows -name "*.yaml" -not -path "*/base/*" -not -path "*/overlays/*" +**Workflow fails:** +```bash +kubectl logs -n devseed-staging # View logs +kubectl get secret -n devseed-staging # Verify secrets exist ``` -**Workflow not triggered:** -- Check EventSource connection: `kubectl logs -n devseed-staging -l eventsource-name=rabbitmq` -- Check Sensor status: `kubectl get sensor -n devseed-staging geozarr-trigger -o yaml` -- Verify RabbitMQ port-forward or service access +**Kustomize validation:** +```bash +kubectl kustomize workflows/overlays/staging # Validate YAML +``` -**Workflow fails:** -- Check pod logs: `kubectl logs -n devseed-staging ` -- Verify secrets exist: `kubectl get secret -n devseed-staging geozarr-s3-credentials stac-api-token` -- Check RBAC: `kubectl auth can-i create workflows --as=system:serviceaccount:devseed-staging:operate-workflow-sa` +--- -For full pipeline documentation, see [../README.md](../README.md). +For complete documentation, see [../README.md](../README.md). diff --git a/workflows/base/workflowtemplate.yaml b/workflows/base/workflowtemplate.yaml index 95b2a57..5196e5c 100644 --- a/workflows/base/workflowtemplate.yaml +++ b/workflows/base/workflowtemplate.yaml @@ -7,6 +7,8 @@ spec: serviceAccountName: operate-workflow-sa entrypoint: main archiveLogs: false + parallelism: 10 + activeDeadlineSeconds: 7200 ttlStrategy: secondsAfterCompletion: 86400 podGC: @@ -45,6 +47,9 @@ spec: - name: convert-geozarr activeDeadlineSeconds: 3600 + synchronization: + mutex: + name: "item-{{workflow.parameters.source_url | sha1sum | trunc 8}}" script: image: ghcr.io/eopf-explorer/data-pipeline:{{workflow.parameters.pipeline_image_version}} imagePullPolicy: Always