11
11
from pathlib import Path
12
12
from time import perf_counter
13
13
from typing import Any , Optional
14
+ from urllib .parse import urlparse
14
15
15
16
import xarray as xr
16
17
@@ -175,6 +176,44 @@ def _summarize_dask_metrics(
175
176
return base
176
177
177
178
179
+ def _derive_item_id_from_input_path (path : str ) -> str :
180
+ """Best-effort item id from local path or URL."""
181
+ try :
182
+ if path .startswith (("http://" , "https://" , "s3://" , "gs://" )):
183
+ parsed = urlparse (path )
184
+ name = Path (parsed .path .rstrip ("/" )).name
185
+ else :
186
+ name = Path (str (path ).rstrip ("/" )).name
187
+ if name .endswith (".zarr" ):
188
+ name = name [: - len (".zarr" )]
189
+ return name or "dataset"
190
+ except Exception :
191
+ return "dataset"
192
+
193
+
194
+ def _resolve_output_prefix (prefix : str , item_id : str ) -> str :
195
+ target_name = f"{ item_id } _geozarr.zarr"
196
+ if prefix .startswith (("http://" , "https://" , "s3://" , "gs://" )):
197
+ if not prefix .endswith ("/" ):
198
+ prefix = prefix + "/"
199
+ return prefix + target_name
200
+ return str (Path (prefix ) / target_name )
201
+
202
+
203
+ def _has_group (tree : xr .DataTree , path : str ) -> bool :
204
+ try :
205
+ parts = [p for p in str (path ).strip ("/" ).split ("/" ) if p ]
206
+ node = tree
207
+ for seg in parts :
208
+ children = getattr (node , "children" , {}) or {}
209
+ if seg not in children :
210
+ return False
211
+ node = children [seg ]
212
+ return True
213
+ except Exception :
214
+ return False
215
+
216
+
178
217
def convert_command (args : argparse .Namespace ) -> None :
179
218
"""Convert EOPF dataset to GeoZarr compliant format.
180
219
@@ -215,6 +254,11 @@ def convert_command(args: argparse.Namespace) -> None:
215
254
216
255
# Handle output path validation
217
256
output_path_str = args .output_path
257
+ # Expand trailing-slash prefix to a concrete store
258
+ if output_path_str .endswith ("/" ):
259
+ item_id = _derive_item_id_from_input_path (str (input_path ))
260
+ output_path_str = _resolve_output_prefix (output_path_str , item_id )
261
+ print (f"Resolved output store: { output_path_str } " )
218
262
if is_s3_path (output_path_str ):
219
263
# S3 path - validate S3 access
220
264
print ("🔍 Validating S3 access..." )
@@ -268,14 +312,48 @@ def convert_command(args: argparse.Namespace) -> None:
268
312
# Load the EOPF DataTree with appropriate storage options
269
313
print ("Loading EOPF dataset..." )
270
314
storage_options = get_storage_options (input_path )
271
- # Metrics setup
315
+ # Metrics setup (environment first; set_input after group validation)
272
316
if getattr (args , "metrics" , True ):
273
317
metrics = MetricsRecorder ()
274
318
metrics .set_environment ()
319
+
320
+ with metrics .time_step ("open_input" ) if metrics else nullcontext ():
321
+ dt = xr .open_datatree (
322
+ str (input_path ),
323
+ engine = "zarr" ,
324
+ chunks = "auto" ,
325
+ storage_options = storage_options ,
326
+ )
327
+
328
+ # Validate/prune groups if requested
329
+ groups_effective = list (getattr (args , "groups" , []) or [])
330
+ validate_mode = getattr (args , "validate_groups" , None )
331
+ missing : list [str ] = []
332
+ if validate_mode in {"warn" , "error" } and groups_effective :
333
+ existing : list [str ] = []
334
+ for g in groups_effective :
335
+ if _has_group (dt , g ):
336
+ existing .append (g )
337
+ else :
338
+ missing .append (g )
339
+ if missing :
340
+ msg = f"Groups not found: { ', ' .join (missing )} "
341
+ if validate_mode == "error" :
342
+ print (f"❌ { msg } " )
343
+ sys .exit (3 )
344
+ else :
345
+ print (f"⚠️ { msg } ; proceeding with remaining groups" )
346
+ groups_effective = existing
347
+ if not groups_effective :
348
+ print ("❌ No valid groups to convert after validation" )
349
+ sys .exit (3 )
350
+
351
+ # Now that groups are finalized, set input metadata for metrics
352
+ if metrics is not None :
275
353
metrics .set_input (
276
354
source_uri = str (input_path ),
277
355
profile = None ,
278
- groups = args . groups ,
356
+ groups = groups_effective ,
279
357
dask = {
280
358
"enabled" : bool (dask_client is not None ),
281
359
"mode" : getattr (args , "dask_mode" , None ),
@@ -286,19 +364,13 @@ def convert_command(args: argparse.Namespace) -> None:
286
364
},
287
365
)
288
366
289
- with metrics .time_step ("open_input" ) if metrics else nullcontext ():
290
- dt = xr .open_datatree (
291
- str (input_path ),
292
- engine = "zarr" ,
293
- chunks = "auto" ,
294
- storage_options = storage_options ,
295
- )
296
-
297
367
if args .verbose :
298
368
print (f"Loaded DataTree with { len (dt .children )} groups" )
299
369
print ("Available groups:" )
300
370
for group_name in dt .children :
301
371
print (f" - { group_name } " )
372
+ if missing :
373
+ print (f"After validation, converting groups: { groups_effective } " )
302
374
303
375
# Convert to GeoZarr compliant format
304
376
print ("Converting to GeoZarr compliant format..." )
@@ -330,7 +402,7 @@ def convert_command(args: argparse.Namespace) -> None:
330
402
with metrics .time_step ("convert" ) if metrics else nullcontext ():
331
403
dt_geozarr = create_geozarr_dataset (
332
404
dt_input = dt ,
333
- groups = args . groups ,
405
+ groups = groups_effective ,
334
406
output_path = output_path ,
335
407
spatial_chunk = args .spatial_chunk ,
336
408
min_dimension = args .min_dimension ,
@@ -344,7 +416,7 @@ def convert_command(args: argparse.Namespace) -> None:
344
416
with metrics .time_step ("convert" ) if metrics else nullcontext ():
345
417
dt_geozarr = create_geozarr_dataset (
346
418
dt_input = dt ,
347
- groups = args . groups ,
419
+ groups = groups_effective ,
348
420
output_path = output_path ,
349
421
spatial_chunk = args .spatial_chunk ,
350
422
min_dimension = args .min_dimension ,
@@ -381,7 +453,7 @@ def convert_command(args: argparse.Namespace) -> None:
381
453
),
382
454
"perf_report" : getattr (args , "dask_perf_html" , None ),
383
455
"wall_clock_s" : wall_clock if dask_client is not None else None ,
384
- "groups" : args . groups ,
456
+ "groups" : groups_effective ,
385
457
"spatial_chunk" : args .spatial_chunk ,
386
458
"min_dimension" : args .min_dimension ,
387
459
"tile_width" : args .tile_width ,
@@ -399,6 +471,17 @@ def convert_command(args: argparse.Namespace) -> None:
399
471
MetricsRecorder .write_json (run_summary , payload )
400
472
if args .verbose :
401
473
print (f"🧾 Wrote metrics: { run_summary } " )
474
+ # Optional external run-metadata path
475
+ if getattr (args , "run_metadata" , None ):
476
+ try :
477
+ outp = Path (args .run_metadata )
478
+ outp .parent .mkdir (parents = True , exist_ok = True )
479
+ MetricsRecorder .write_json (outp , payload )
480
+ if args .verbose :
481
+ print (f"🧾 Wrote run metadata: { outp } " )
482
+ except Exception as _e :
483
+ if args .verbose :
484
+ print (f"(debug) could not write run-metadata: { _e } " )
402
485
except Exception as _exc :
403
486
if args .verbose :
404
487
print (f"(debug) could not write run summary: { _exc } " )
@@ -424,10 +507,22 @@ def convert_command(args: argparse.Namespace) -> None:
424
507
try :
425
508
if metrics and debug_dir is not None and getattr (args , "metrics" , True ):
426
509
payload = metrics .finalize (status = "error" , exception = error_msg )
427
- run_summary = Path (args .output_path ) / "debug" / "run_summary.json"
428
- MetricsRecorder .write_json (run_summary , payload )
429
- if args .verbose :
430
- print (f"🧾 Wrote failure metrics: { run_summary } " )
510
+ run_summary = Path (output_path ) / "debug" / "run_summary.json"
511
+ try :
512
+ MetricsRecorder .write_json (run_summary , payload )
513
+ if args .verbose :
514
+ print (f"🧾 Wrote failure metrics: { run_summary } " )
515
+ except Exception :
516
+ pass
517
+ if getattr (args , "run_metadata" , None ):
518
+ try :
519
+ outp = Path (args .run_metadata )
520
+ outp .parent .mkdir (parents = True , exist_ok = True )
521
+ MetricsRecorder .write_json (outp , payload )
522
+ if args .verbose :
523
+ print (f"🧾 Wrote run metadata (error): { outp } " )
524
+ except Exception :
525
+ pass
431
526
except Exception :
432
527
pass
433
528
sys .exit (1 )
@@ -1414,6 +1509,17 @@ def create_parser() -> argparse.ArgumentParser:
1414
1509
action = "store_false" ,
1415
1510
help = "Disable metrics emission." ,
1416
1511
)
1512
+ convert_parser .add_argument (
1513
+ "--validate-groups" ,
1514
+ type = str ,
1515
+ choices = ["warn" , "error" ],
1516
+ help = "Validate requested groups against input; warn to prune, error to abort." ,
1517
+ )
1518
+ convert_parser .add_argument (
1519
+ "--run-metadata" ,
1520
+ type = str ,
1521
+ help = "Also write finalized metrics payload to this JSON path." ,
1522
+ )
1417
1523
convert_parser .set_defaults (func = convert_command )
1418
1524
1419
1525
# Info command
0 commit comments