Skip to content

Commit 26b77bb

Browse files
committed
chore: remove unused type: ignore on zarr import
1 parent b0bdd36 commit 26b77bb

File tree

4 files changed

+476
-19
lines changed

4 files changed

+476
-19
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Pipeline Integration Recommendations
2+
3+
This document lists enhancements observed while integrating `eopf-geozarr` into an Argo-based batch conversion pipeline (data-model-pipeline). They are candidates for upstream inclusion or API refinement.
4+
5+
## 1. Output Prefix Expansion
6+
**Current (pipeline)**: Wrapper detects `output_zarr` ending with `/` and appends `<item_id>_geozarr.zarr` derived from the input STAC/Zarr URL.
7+
**Recommendation**: Support `--output-prefix` OR accept a trailing slash on positional `output_path` and perform the expansion internally (emitting the resolved final path). Add a log line: `Resolved output store: s3://.../S2A_..._geozarr.zarr`.
8+
9+
## 2. Group Existence Validation (Pre-flight)
10+
**Current (pipeline)**: `validate_params_groups.py` inspects filesystem structure (presence of `.zarray` / `.zgroup`).
11+
**Recommendation**: Native CLI flag `--validate-groups` to prune or fail fast when groups don’t exist. Modes:
12+
- `--validate-groups=warn` (default): drop missing, report.
13+
- `--validate-groups=error`: abort if any missing.
14+
Emit JSON or structured summary when `--verbose`.
15+
16+
## 3. Profiles (WOZ Profiles)
17+
**Current (pipeline)**: External JSON profile expansion before calling CLI.
18+
**Recommendation**: Provide `--profile <name>` in CLI mapping to preset groups + chunk params. Add `eopf-geozarr profile list` / `profile show <name>` subcommands. Keep external mechanism as fallback.
19+
20+
## 4. Compressor Handling
21+
**Current**: Template attempted to pass `--compressor`; CLI does not expose codec choice.
22+
**Recommendation**: If codec selection is desired, add `--compressor <name>` now (zstd, lz4, blosc) with validation; else document fixed default explicitly to avoid confusion.
23+
24+
## 5. CRS Groups Convenience
25+
**Current**: `--crs-groups` optional list.
26+
**Recommendation**: Discover candidate groups automatically (search for geometry-like datasets) unless `--crs-groups` provided (override). Provide `eopf-geozarr info --crs-scan` to preview.
27+
28+
## 6. Dask Cluster Ergonomics
29+
**Current**: `--dask-cluster` toggles local cluster with no feedback.
30+
**Recommendation**: Print cluster dashboard URL (if available) and add `--dask-workers N` for quick scaling.
31+
32+
## 7. Structured Logging / Run Metadata
33+
**Current**: Plain prints; pipeline scrapes logs.
34+
**Recommendation**: Optional `--run-metadata <path.json>` to write machine-readable summary: inputs, resolved groups, timings, warnings. Eases automation and reproducibility.
35+
36+
## 8. Validation Command Enhancements
37+
**Current**: `validate` skeleton present but incomplete.
38+
**Recommendation**:
39+
- Implement spec checks (multiscales, attributes, chunk shape policy).
40+
- Exit code non-zero on *hard* failures, zero with warnings for soft issues.
41+
- `--format json` for programmatic consumption.
42+
43+
## 9. HTML Tree Generation
44+
**Current**: `_generate_html_output` scaffold incomplete.
45+
**Recommendation**: Finish implementation; integrate with `info --html-output`. Provide minimal inline CSS (already drafted) and optional `--open-browser` flag.
46+
47+
## 10. Progress Reporting
48+
**Recommendation**: Emit periodic progress per group: `group=/measurements/r10m scale=2 written=...MB elapsed=...s` to assist monitoring in batch workflows.
49+
50+
## 11. Retry / Resumability
51+
**Recommendation**: Add `--resume` to skip already existing multiscale levels if output store partially present.
52+
53+
## 12. Exit Codes (Contract)
54+
Document exit code meanings:
55+
- 0 success
56+
- 2 validation (input) error
57+
- 3 group resolution failure
58+
- 4 conversion runtime error
59+
60+
## 13. Environment Variable Overrides
61+
Allow `EOZ_DEFAULT_PROFILE`, `EOZ_OUTPUT_PREFIX` env vars as implicit defaults (still overridden by flags).
62+
63+
## 14. Example Invocation Block in README
64+
Provide ready-to-copy examples for Sentinel-2 & Sentinel-1 including polarization groups when Phase 1 logic is public.
65+
66+
---
67+
**Next Steps (Suggested Order)**
68+
1. Implement output prefix expansion (low risk, high UX win)
69+
2. Group validation flag (prevents silent empty writes)
70+
3. Finish validate + info HTML features
71+
4. Add structured run metadata output
72+
5. Introduce profiles subcommands then deprecate external expansion path over time
73+
74+
Feedback welcome—pipeline experience will continue surfacing actionable deltas.

src/eopf_geozarr/cli.py

Lines changed: 74 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
is_s3_path,
2222
validate_s3_access,
2323
)
24+
from .metrics import MetricsRecorder
2425

2526

2627
def setup_dask_cluster(
@@ -196,6 +197,7 @@ def convert_command(args: argparse.Namespace) -> None:
196197
run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
197198
status = "unknown"
198199
error_msg = None
200+
metrics = None
199201

200202
try:
201203
# Validate input path (handle both local paths and URLs)
@@ -266,12 +268,31 @@ def convert_command(args: argparse.Namespace) -> None:
266268
# Load the EOPF DataTree with appropriate storage options
267269
print("Loading EOPF dataset...")
268270
storage_options = get_storage_options(input_path)
269-
dt = xr.open_datatree(
270-
str(input_path),
271-
engine="zarr",
272-
chunks="auto",
273-
storage_options=storage_options,
274-
)
271+
# Metrics setup
272+
if getattr(args, "metrics", True):
273+
metrics = MetricsRecorder()
274+
metrics.set_environment()
275+
metrics.set_input(
276+
source_uri=str(input_path),
277+
profile=None,
278+
groups=args.groups,
279+
dask={
280+
"enabled": bool(dask_client is not None),
281+
"mode": getattr(args, "dask_mode", None),
282+
"workers": getattr(args, "dask_workers", None),
283+
"threads_per_worker": getattr(
284+
args, "dask_threads_per_worker", None
285+
),
286+
},
287+
)
288+
289+
with metrics.time_step("open_input") if metrics else nullcontext():
290+
dt = xr.open_datatree(
291+
str(input_path),
292+
engine="zarr",
293+
chunks="auto",
294+
storage_options=storage_options,
295+
)
275296

276297
if args.verbose:
277298
print(f"Loaded DataTree with {len(dt.children)} groups")
@@ -306,6 +327,21 @@ def convert_command(args: argparse.Namespace) -> None:
306327
from dask.distributed import get_task_stream
307328

308329
with get_task_stream(client=dask_client, plot=False) as ts:
330+
with metrics.time_step("convert") if metrics else nullcontext():
331+
dt_geozarr = create_geozarr_dataset(
332+
dt_input=dt,
333+
groups=args.groups,
334+
output_path=output_path,
335+
spatial_chunk=args.spatial_chunk,
336+
min_dimension=args.min_dimension,
337+
tile_width=args.tile_width,
338+
max_retries=args.max_retries,
339+
crs_groups=args.crs_groups,
340+
)
341+
task_events = ts.data
342+
else:
343+
with perf_ctx:
344+
with metrics.time_step("convert") if metrics else nullcontext():
309345
dt_geozarr = create_geozarr_dataset(
310346
dt_input=dt,
311347
groups=args.groups,
@@ -316,19 +352,6 @@ def convert_command(args: argparse.Namespace) -> None:
316352
max_retries=args.max_retries,
317353
crs_groups=args.crs_groups,
318354
)
319-
task_events = ts.data
320-
else:
321-
with perf_ctx:
322-
dt_geozarr = create_geozarr_dataset(
323-
dt_input=dt,
324-
groups=args.groups,
325-
output_path=output_path,
326-
spatial_chunk=args.spatial_chunk,
327-
min_dimension=args.min_dimension,
328-
tile_width=args.tile_width,
329-
max_retries=args.max_retries,
330-
crs_groups=args.crs_groups,
331-
)
332355

333356
wall_clock = perf_counter() - t0
334357
if (
@@ -366,6 +389,16 @@ def convert_command(args: argparse.Namespace) -> None:
366389
summary_path.write_text(_dumps(summary, indent=2))
367390
if args.verbose:
368391
print(f"🧾 Wrote run summary: {summary_path}")
392+
# New metrics: run_summary.json and chunk_report.json
393+
if metrics and debug_dir is not None and getattr(args, "metrics", True):
394+
# Build output summary best-effort
395+
with metrics.time_step("summarize_output"):
396+
metrics.build_output_summary(output_path)
397+
payload = metrics.finalize(status="ok")
398+
run_summary = Path(output_path) / "debug" / "run_summary.json"
399+
MetricsRecorder.write_json(run_summary, payload)
400+
if args.verbose:
401+
print(f"🧾 Wrote metrics: {run_summary}")
369402
except Exception as _exc:
370403
if args.verbose:
371404
print(f"(debug) could not write run summary: {_exc}")
@@ -387,6 +420,16 @@ def convert_command(args: argparse.Namespace) -> None:
387420
import traceback
388421

389422
traceback.print_exc()
423+
# Write failure metrics if possible
424+
try:
425+
if metrics and debug_dir is not None and getattr(args, "metrics", True):
426+
payload = metrics.finalize(status="error", exception=error_msg)
427+
run_summary = Path(args.output_path) / "debug" / "run_summary.json"
428+
MetricsRecorder.write_json(run_summary, payload)
429+
if args.verbose:
430+
print(f"🧾 Wrote failure metrics: {run_summary}")
431+
except Exception:
432+
pass
390433
sys.exit(1)
391434
finally:
392435
# Clean up dask client if it was created
@@ -1359,6 +1402,18 @@ def create_parser() -> argparse.ArgumentParser:
13591402
type=str,
13601403
help="Write a Dask performance report HTML to this path (local only).",
13611404
)
1405+
convert_parser.add_argument(
1406+
"--metrics",
1407+
action="store_true",
1408+
default=True,
1409+
help="Emit structured conversion metrics JSON (default: on). Use --no-metrics to disable.",
1410+
)
1411+
convert_parser.add_argument(
1412+
"--no-metrics",
1413+
dest="metrics",
1414+
action="store_false",
1415+
help="Disable metrics emission.",
1416+
)
13621417
convert_parser.set_defaults(func=convert_command)
13631418

13641419
# Info command

0 commit comments

Comments
 (0)