Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 17 additions & 71 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,33 +56,21 @@ Transforms Sentinel-1/2 satellite data into web-ready visualizations:
- Sentinel-1 GRD (SAR backscatter)


## Requirements & Setup
## Setup

### Prerequisites
**Prerequisites:**
- Kubernetes cluster with [platform-deploy](https://github.com/EOPF-Explorer/platform-deploy) (Argo Workflows, RabbitMQ, STAC API, TiTiler)
- Python 3.13+ with `uv`
- `kubectl` configured

- **Kubernetes cluster** with [platform-deploy](https://github.com/EOPF-Explorer/platform-deploy) infrastructure
- Argo Workflows (pipeline orchestration)
- RabbitMQ (event-driven automation)
- STAC API & TiTiler (catalog & visualization)
- **Python 3.13+** with `uv` package manager
- **S3 storage** credentials (OVH de region)
- **Kubeconfig** in `.work/kubeconfig`
**📖 Complete setup guide:** See [workflows/README.md](workflows/README.md) for:
- kubectl configuration (OVH Manager kubeconfig download)
- Required secrets (RabbitMQ, S3, STAC API)
- Workflow deployment (`kubectl apply -k`)

Verify infrastructure:
**Quick verification:**
```bash
export KUBECONFIG=$(pwd)/.work/kubeconfig
kubectl get pods -n core -l app.kubernetes.io/name=argo-workflows
kubectl get pods -n core -l app.kubernetes.io/name=rabbitmq
```

### Deploy Workflows

```bash
# Apply to staging
kubectl apply -k workflows/overlays/staging

# Apply to production
kubectl apply -k workflows/overlays/production
kubectl get wf,sensor,eventsource -n devseed-staging
```

---
Expand Down Expand Up @@ -208,60 +196,18 @@ docker/Dockerfile # Pipeline image
tools/submit_burst.py # RabbitMQ burst submission tool
```

Tests are available in `tests/` directory (unit and integration tests using pytest).

---

## Deploy

```bash
# Apply to staging
kubectl apply -k workflows/overlays/staging

# Apply to production
kubectl apply -k workflows/overlays/production
```

**Config:** Image version, S3 endpoints, STAC API URLs, RabbitMQ exchanges configured via kustomize overlays.
Tests are planned for `tests/` directory (structure exists, test files to be added).

---

## Configuration

### S3 Storage

```bash
kubectl create secret generic geozarr-s3-credentials -n devseed-staging \
--from-literal=AWS_ACCESS_KEY_ID="<your-key>" \
--from-literal=AWS_SECRET_ACCESS_KEY="<your-secret>"
```

| Setting | Value |
|---------|-------|
| **Endpoint** | `https://s3.de.io.cloud.ovh.net` |
| **Bucket** | `esa-zarr-sentinel-explorer-fra` |
| **Region** | `de` |
**📖 Full configuration:** See [workflows/README.md](workflows/README.md) for secrets setup and parameters.

### RabbitMQ

Get password:
```bash
kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d
```

| Setting | Value |
|---------|-------|
| **URL** | `amqp://user:[email protected]:5672/` |
| **Exchange** | `geozarr-staging` |
| **Routing key** | `eopf.items.test` |

**Message format:**
```json
{
"source_url": "https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items/...",
"collection": "sentinel-2-l2a-dp-test"
}
```
**Quick reference:**
- S3: `s3.de.io.cloud.ovh.net` / `esa-zarr-sentinel-explorer-fra`
- Staging collection: `sentinel-2-l2a-dp-test`
- Production collection: `sentinel-2-l2a`

---

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ dependencies = [
"click>=8.1.0",
"pika>=1.3.0",
"tenacity>=8.0.0",
"requests>=2.31.0",
"morecantile>=5.0.0",
"cf-xarray>=0.9.0",
"eopf-geozarr @ git+https://github.com/EOPF-Explorer/data-model.git@fix/s1-encoding-conflict",
Expand Down Expand Up @@ -146,6 +145,7 @@ warn_no_return = true
strict_equality = true
exclude = ["examples/"]

# Relax type checking for test files (structure exists, tests to be added)
[[tool.mypy.overrides]]
module = "tests.*"
disallow_untyped_defs = false
Expand Down
7 changes: 3 additions & 4 deletions scripts/augment_stac_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ def add_visualization(item: Item, raster_base: str, collection_id: str) -> None:
_add_tile_links(item, base_url, query, "Sentinel-1 GRD VH")

elif coll_lower.startswith(("sentinel-2", "sentinel2")):
# S2: Point to overview level 0 for quicklook TCI
# Use /r10m/0/tci path to access the overview array with spatial_ref
var_path = "/quality/l2a_quicklook/r10m/0/tci"
# S2: Use colon separator for TiTiler variable path
var_path = "/quality/l2a_quicklook/r10m:tci"
query = (
f"variables={urllib.parse.quote(var_path, safe='')}&bidx=1&bidx=2&bidx=3&assets=TCI_10m"
)
Expand Down Expand Up @@ -172,7 +171,7 @@ def main(argv: list[str] | None = None) -> int:
return 0

except Exception as e:
logger.error(f"Failed to augment {args.item_id}: {e}")
logger.error(f"Failed to augment {item_url}: {e}")
return 1


Expand Down
39 changes: 39 additions & 0 deletions scripts/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import sys
from urllib.parse import urlparse

import boto3
import fsspec
import httpx
import xarray as xr
from botocore.exceptions import ClientError
from eopf_geozarr import create_geozarr_dataset
from eopf_geozarr.conversion.fs_utils import get_storage_options
from get_conversion_params import get_conversion_params
Expand All @@ -22,6 +24,28 @@
logger = logging.getLogger(__name__)


def check_output_exists(
s3_output_bucket: str, s3_output_prefix: str, collection: str, item_id: str
) -> bool:
"""Check if output already exists in S3."""
try:
s3 = boto3.client(
"s3",
endpoint_url=os.getenv("AWS_ENDPOINT_URL"),
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
)
prefix = f"{s3_output_prefix}/{collection}/{item_id}.zarr/"
response = s3.list_objects_v2(Bucket=s3_output_bucket, Prefix=prefix, MaxKeys=1)
exists: bool = response.get("KeyCount", 0) > 0
if exists:
logger.info(f"Output exists at s3://{s3_output_bucket}/{prefix} - skipping")
return exists
except ClientError as e:
logger.warning(f"Could not check S3: {e}")
return False


def get_zarr_url(stac_item_url: str) -> str:
"""Get Zarr asset URL from STAC item."""
r = httpx.get(stac_item_url, timeout=30.0, follow_redirects=True)
Expand All @@ -46,6 +70,7 @@ def run_conversion(
collection: str,
s3_output_bucket: str,
s3_output_prefix: str,
overwrite: bool = False,
) -> str:
"""Run GeoZarr conversion workflow.

Expand All @@ -54,6 +79,7 @@ def run_conversion(
collection: Collection ID for parameter lookup
s3_output_bucket: S3 bucket for output
s3_output_prefix: S3 prefix for output
overwrite: Force conversion even if output exists

Returns:
Output Zarr URL (s3://...)
Expand All @@ -65,6 +91,13 @@ def run_conversion(
item_id = urlparse(source_url).path.rstrip("/").split("/")[-1]
logger.info(f"Starting GeoZarr conversion for {item_id}")

# Idempotency: skip if output exists
if not overwrite and check_output_exists(
s3_output_bucket, s3_output_prefix, collection, item_id
):
output_url = f"s3://{s3_output_bucket}/{s3_output_prefix}/{collection}/{item_id}.zarr"
return output_url

# Resolve source: STAC item or direct Zarr URL
if "/items/" in source_url:
logger.info("Extracting Zarr URL from STAC item...")
Expand Down Expand Up @@ -149,6 +182,11 @@ def main(argv: list[str] | None = None) -> int:
parser.add_argument("--collection", required=True, help="Collection ID")
parser.add_argument("--s3-output-bucket", required=True, help="S3 output bucket")
parser.add_argument("--s3-output-prefix", required=True, help="S3 output prefix")
parser.add_argument(
"--overwrite",
action="store_true",
help="Force conversion even if output exists",
)
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")

args = parser.parse_args(argv)
Expand All @@ -162,6 +200,7 @@ def main(argv: list[str] | None = None) -> int:
args.collection,
args.s3_output_bucket,
args.s3_output_prefix,
overwrite=args.overwrite,
)
logger.info(f"Success: {output_url}")
return 0
Expand Down
28 changes: 0 additions & 28 deletions scripts/create_geozarr_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import argparse
import logging
import re
from typing import Any
from urllib.parse import urlparse

Expand All @@ -31,30 +30,6 @@ def s3_to_https(s3_url: str, endpoint: str) -> str:
return f"https://{bucket}.{host}/{path}"


def normalize_r60m_href(href: str) -> str:
"""Add /0/ subdirectory to r10m/r20m/r60m paths to match GeoZarr output structure.

GeoZarr conversion creates /0/ subdirectories (overview level 0) for all
resolution bands. This normalizes asset hrefs accordingly.

Example: .../r10m/tci → .../r10m/0/tci
.../r60m/b09 → .../r60m/0/b09
"""
# Check for any resolution level pattern
for res in ["r10m", "r20m", "r60m"]:
if f"/{res}/" not in href:
continue

# If already has /0/ or other digit subdirectory, don't modify
if re.search(rf"/{res}/\d+/", href):
continue

# Insert /0/ after /{res}/
href = re.sub(rf"/({res})/", r"/\1/0/", href)

return href


def find_source_zarr_base(source_item: dict) -> str | None:
"""Find the base Zarr URL from source item assets."""
for asset in source_item.get("assets", {}).values():
Expand Down Expand Up @@ -112,9 +87,6 @@ def create_geozarr_item(
subpath = old_href[len(source_zarr_base) :]
new_href = output_zarr_base + subpath

# Normalize r60m paths to include /0/ subdirectory (GeoZarr structure)
new_href = normalize_r60m_href(new_href)

# Convert to https if needed
if new_href.startswith("s3://"):
new_href = s3_to_https(new_href, s3_endpoint)
Expand Down
38 changes: 38 additions & 0 deletions scripts/get_zarr_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env python3
"""Extract Zarr URL from STAC item - standalone script for workflow templates."""

import sys

import httpx


def get_zarr_url(stac_item_url: str) -> str:
"""Get Zarr asset URL from STAC item."""
r = httpx.get(stac_item_url, timeout=30.0, follow_redirects=True)
r.raise_for_status()
assets = r.json().get("assets", {})

# Priority: product, zarr, then any .zarr asset
for key in ["product", "zarr"]:
if key in assets and (href := assets[key].get("href")):
return str(href)

# Fallback: any asset with .zarr in href
for asset in assets.values():
if ".zarr" in asset.get("href", ""):
return str(asset["href"])

raise RuntimeError("No Zarr asset found in STAC item")


if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: get_zarr_url.py <stac_item_url>", file=sys.stderr)
sys.exit(1)

try:
zarr_url = get_zarr_url(sys.argv[1])
print(zarr_url)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
2 changes: 0 additions & 2 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading