11
11
from pathlib import Path
12
12
from time import perf_counter
13
13
from typing import Any , Optional
14
+ from urllib .parse import urlparse
14
15
15
16
import xarray as xr
16
17
@@ -175,6 +176,62 @@ def _summarize_dask_metrics(
175
176
return base
176
177
177
178
179
+ def _derive_item_id_from_input_path (path : str ) -> str :
180
+ """Best-effort item id from local path or URL."""
181
+ try :
182
+ if path .startswith (("http://" , "https://" , "s3://" , "gs://" )):
183
+ parsed = urlparse (path )
184
+ name = Path (parsed .path .rstrip ("/" )).name
185
+ else :
186
+ name = Path (str (path ).rstrip ("/" )).name
187
+ if name .endswith (".zarr" ):
188
+ name = name [: - len (".zarr" )]
189
+ return name or "dataset"
190
+ except Exception :
191
+ return "dataset"
192
+
193
+
194
+ def _resolve_output_prefix (prefix : str , item_id : str ) -> str :
195
+ target_name = f"{ item_id } _geozarr.zarr"
196
+ if prefix .startswith (("http://" , "https://" , "s3://" , "gs://" )):
197
+ if not prefix .endswith ("/" ):
198
+ prefix = prefix + "/"
199
+ return prefix + target_name
200
+ return str (Path (prefix ) / target_name )
201
+
202
+
203
+ def _has_group (tree : xr .DataTree , path : str ) -> bool :
204
+ try :
205
+ parts = [p for p in str (path ).strip ("/" ).split ("/" ) if p ]
206
+ node = tree
207
+ for seg in parts :
208
+ children = getattr (node , "children" , {}) or {}
209
+ if seg not in children :
210
+ return False
211
+ node = children [seg ]
212
+ return True
213
+ except Exception :
214
+ return False
215
+
216
+
217
+ def _local_path_exists (p : str ) -> bool :
218
+ try :
219
+ return Path (p ).exists ()
220
+ except Exception :
221
+ return False
222
+
223
+
224
+ def _remove_local_tree (p : str ) -> bool :
225
+ try :
226
+ import shutil
227
+
228
+ if Path (p ).exists ():
229
+ shutil .rmtree (p )
230
+ return True
231
+ except Exception :
232
+ return False
233
+
234
+
178
235
def convert_command (args : argparse .Namespace ) -> None :
179
236
"""Convert EOPF dataset to GeoZarr compliant format.
180
237
@@ -213,8 +270,27 @@ def convert_command(args: argparse.Namespace) -> None:
213
270
sys .exit (1 )
214
271
input_path = str (input_path )
215
272
273
+ # Fast path: just list groups and exit
274
+ if getattr (args , "list_groups" , False ):
275
+ storage_options = get_storage_options (input_path )
276
+ dt = xr .open_datatree (
277
+ str (input_path ),
278
+ engine = "zarr" ,
279
+ chunks = "auto" ,
280
+ storage_options = storage_options ,
281
+ )
282
+ print ("Available groups:" )
283
+ for group_name in dt .children :
284
+ print (f" - { group_name } " )
285
+ return
286
+
216
287
# Handle output path validation
217
288
output_path_str = args .output_path
289
+ # Expand trailing-slash prefix to a concrete store
290
+ if output_path_str .endswith ("/" ):
291
+ item_id = _derive_item_id_from_input_path (str (input_path ))
292
+ output_path_str = _resolve_output_prefix (output_path_str , item_id )
293
+ print (f"Resolved output store: { output_path_str } " )
218
294
if is_s3_path (output_path_str ):
219
295
# S3 path - validate S3 access
220
296
print ("🔍 Validating S3 access..." )
@@ -247,14 +323,17 @@ def convert_command(args: argparse.Namespace) -> None:
247
323
else :
248
324
# Local path - create directory if it doesn't exist
249
325
output_path = Path (output_path_str )
250
- output_path .parent .mkdir (parents = True , exist_ok = True )
326
+ # In dry-run mode, don't create anything
327
+ if not getattr (args , "dry_run" , False ):
328
+ output_path .parent .mkdir (parents = True , exist_ok = True )
251
329
output_path = str (output_path )
252
- # Prepare debug dir for metrics
253
- debug_dir = Path (output_path ) / "debug"
254
- try :
255
- debug_dir .mkdir (parents = True , exist_ok = True )
256
- except Exception :
257
- debug_dir = None
330
+ # Prepare debug dir for metrics (skip in dry-run)
331
+ if not getattr (args , "dry_run" , False ):
332
+ debug_dir = Path (output_path ) / "debug"
333
+ try :
334
+ debug_dir .mkdir (parents = True , exist_ok = True )
335
+ except Exception :
336
+ debug_dir = None
258
337
259
338
if args .verbose :
260
339
print (f"Loading EOPF dataset from: { input_path } " )
@@ -268,14 +347,48 @@ def convert_command(args: argparse.Namespace) -> None:
268
347
# Load the EOPF DataTree with appropriate storage options
269
348
print ("Loading EOPF dataset..." )
270
349
storage_options = get_storage_options (input_path )
271
- # Metrics setup
350
+ # Metrics setup (environment first; set_input after group validation)
272
351
if getattr (args , "metrics" , True ):
273
352
metrics = MetricsRecorder ()
274
353
metrics .set_environment ()
354
+
355
+ with metrics .time_step ("open_input" ) if metrics else nullcontext ():
356
+ dt = xr .open_datatree (
357
+ str (input_path ),
358
+ engine = "zarr" ,
359
+ chunks = "auto" ,
360
+ storage_options = storage_options ,
361
+ )
362
+
363
+ # Validate/prune groups if requested
364
+ groups_effective = list (getattr (args , "groups" , []) or [])
365
+ validate_mode = getattr (args , "validate_groups" , None )
366
+ missing : list [str ] = []
367
+ if validate_mode in {"warn" , "error" } and groups_effective :
368
+ existing : list [str ] = []
369
+ for g in groups_effective :
370
+ if _has_group (dt , g ):
371
+ existing .append (g )
372
+ else :
373
+ missing .append (g )
374
+ if missing :
375
+ msg = f"Groups not found: { ', ' .join (missing )} "
376
+ if validate_mode == "error" :
377
+ print (f"❌ { msg } " )
378
+ sys .exit (3 )
379
+ else :
380
+ print (f"⚠️ { msg } ; proceeding with remaining groups" )
381
+ groups_effective = existing
382
+ if not groups_effective :
383
+ print ("❌ No valid groups to convert after validation" )
384
+ sys .exit (3 )
385
+
386
+ # Now that groups are finalized, set input metadata for metrics
387
+ if metrics is not None :
275
388
metrics .set_input (
276
389
source_uri = str (input_path ),
277
390
profile = None ,
278
- groups = args . groups ,
391
+ groups = groups_effective ,
279
392
dask = {
280
393
"enabled" : bool (dask_client is not None ),
281
394
"mode" : getattr (args , "dask_mode" , None ),
@@ -286,19 +399,60 @@ def convert_command(args: argparse.Namespace) -> None:
286
399
},
287
400
)
288
401
289
- with metrics .time_step ("open_input" ) if metrics else nullcontext ():
290
- dt = xr .open_datatree (
291
- str (input_path ),
292
- engine = "zarr" ,
293
- chunks = "auto" ,
294
- storage_options = storage_options ,
295
- )
402
+ # Overwrite policy handling (local only)
403
+ overwrite = getattr (args , "overwrite" , "fail" )
404
+ is_remote = is_s3_path (output_path )
405
+ if not is_remote :
406
+ exists = _local_path_exists (output_path )
407
+ if exists :
408
+ if overwrite == "fail" :
409
+ print (
410
+ f"❌ Output already exists and overwrite policy is 'fail': { output_path } "
411
+ )
412
+ sys .exit (2 )
413
+ if overwrite == "skip" :
414
+ print (f"⏭️ Output exists; skipping as per policy: { output_path } " )
415
+ return
416
+ if overwrite == "replace" :
417
+ if getattr (args , "dry_run" , False ):
418
+ print (
419
+ f"🧪 Dry-run: would remove existing output: { output_path } "
420
+ )
421
+ else :
422
+ ok = _remove_local_tree (output_path )
423
+ if not ok :
424
+ print (f"❌ Failed to remove existing output: { output_path } " )
425
+ sys .exit (2 )
426
+ # merge: do nothing
427
+ else :
428
+ if overwrite == "fail" :
429
+ print (
430
+ "ℹ️ Remote output existence not checked; 'fail' policy may not prevent overwrite."
431
+ )
432
+ elif overwrite == "replace" :
433
+ print (
434
+ "⚠️ 'replace' is not implemented for remote outputs; proceeding may overwrite keys."
435
+ )
436
+
437
+ # Print dry-run plan and exit early
438
+ if getattr (args , "dry_run" , False ):
439
+ print ("\n Dry-run plan:" )
440
+ print ("============" )
441
+ print (f"Input: { input_path } " )
442
+ print (f"Output: { output_path } " )
443
+ print (f"Groups: { groups_effective } " )
444
+ print (f"Overwrite policy: { overwrite } " )
445
+ print (f"Dask: { 'on' if dask_client is not None else 'off' } " )
446
+ print ("No data will be written." )
447
+ return
296
448
297
449
if args .verbose :
298
450
print (f"Loaded DataTree with { len (dt .children )} groups" )
299
451
print ("Available groups:" )
300
452
for group_name in dt .children :
301
453
print (f" - { group_name } " )
454
+ if missing :
455
+ print (f"After validation, converting groups: { groups_effective } " )
302
456
303
457
# Convert to GeoZarr compliant format
304
458
print ("Converting to GeoZarr compliant format..." )
@@ -330,7 +484,7 @@ def convert_command(args: argparse.Namespace) -> None:
330
484
with metrics .time_step ("convert" ) if metrics else nullcontext ():
331
485
dt_geozarr = create_geozarr_dataset (
332
486
dt_input = dt ,
333
- groups = args . groups ,
487
+ groups = groups_effective ,
334
488
output_path = output_path ,
335
489
spatial_chunk = args .spatial_chunk ,
336
490
min_dimension = args .min_dimension ,
@@ -344,7 +498,7 @@ def convert_command(args: argparse.Namespace) -> None:
344
498
with metrics .time_step ("convert" ) if metrics else nullcontext ():
345
499
dt_geozarr = create_geozarr_dataset (
346
500
dt_input = dt ,
347
- groups = args . groups ,
501
+ groups = groups_effective ,
348
502
output_path = output_path ,
349
503
spatial_chunk = args .spatial_chunk ,
350
504
min_dimension = args .min_dimension ,
@@ -381,7 +535,7 @@ def convert_command(args: argparse.Namespace) -> None:
381
535
),
382
536
"perf_report" : getattr (args , "dask_perf_html" , None ),
383
537
"wall_clock_s" : wall_clock if dask_client is not None else None ,
384
- "groups" : args . groups ,
538
+ "groups" : groups_effective ,
385
539
"spatial_chunk" : args .spatial_chunk ,
386
540
"min_dimension" : args .min_dimension ,
387
541
"tile_width" : args .tile_width ,
@@ -399,6 +553,17 @@ def convert_command(args: argparse.Namespace) -> None:
399
553
MetricsRecorder .write_json (run_summary , payload )
400
554
if args .verbose :
401
555
print (f"🧾 Wrote metrics: { run_summary } " )
556
+ # Optional external run-metadata path
557
+ if getattr (args , "run_metadata" , None ):
558
+ try :
559
+ outp = Path (args .run_metadata )
560
+ outp .parent .mkdir (parents = True , exist_ok = True )
561
+ MetricsRecorder .write_json (outp , payload )
562
+ if args .verbose :
563
+ print (f"🧾 Wrote run metadata: { outp } " )
564
+ except Exception as _e :
565
+ if args .verbose :
566
+ print (f"(debug) could not write run-metadata: { _e } " )
402
567
except Exception as _exc :
403
568
if args .verbose :
404
569
print (f"(debug) could not write run summary: { _exc } " )
@@ -424,10 +589,22 @@ def convert_command(args: argparse.Namespace) -> None:
424
589
try :
425
590
if metrics and debug_dir is not None and getattr (args , "metrics" , True ):
426
591
payload = metrics .finalize (status = "error" , exception = error_msg )
427
- run_summary = Path (args .output_path ) / "debug" / "run_summary.json"
428
- MetricsRecorder .write_json (run_summary , payload )
429
- if args .verbose :
430
- print (f"🧾 Wrote failure metrics: { run_summary } " )
592
+ run_summary = Path (output_path ) / "debug" / "run_summary.json"
593
+ try :
594
+ MetricsRecorder .write_json (run_summary , payload )
595
+ if args .verbose :
596
+ print (f"🧾 Wrote failure metrics: { run_summary } " )
597
+ except Exception :
598
+ pass
599
+ if getattr (args , "run_metadata" , None ):
600
+ try :
601
+ outp = Path (args .run_metadata )
602
+ outp .parent .mkdir (parents = True , exist_ok = True )
603
+ MetricsRecorder .write_json (outp , payload )
604
+ if args .verbose :
605
+ print (f"🧾 Wrote run metadata (error): { outp } " )
606
+ except Exception :
607
+ pass
431
608
except Exception :
432
609
pass
433
610
sys .exit (1 )
@@ -1414,6 +1591,34 @@ def create_parser() -> argparse.ArgumentParser:
1414
1591
action = "store_false" ,
1415
1592
help = "Disable metrics emission." ,
1416
1593
)
1594
+ convert_parser .add_argument (
1595
+ "--list-groups" ,
1596
+ action = "store_true" ,
1597
+ help = "List available groups in the input and exit." ,
1598
+ )
1599
+ convert_parser .add_argument (
1600
+ "--dry-run" ,
1601
+ action = "store_true" ,
1602
+ help = "Validate I/O, resolve groups and output, and print the plan without writing." ,
1603
+ )
1604
+ convert_parser .add_argument (
1605
+ "--overwrite" ,
1606
+ type = str ,
1607
+ choices = ["fail" , "skip" , "replace" , "merge" ],
1608
+ default = "fail" ,
1609
+ help = "Behavior when output exists (local): fail, skip, replace, or merge (default: fail)." ,
1610
+ )
1611
+ convert_parser .add_argument (
1612
+ "--validate-groups" ,
1613
+ type = str ,
1614
+ choices = ["warn" , "error" ],
1615
+ help = "Validate requested groups against input; warn to prune, error to abort." ,
1616
+ )
1617
+ convert_parser .add_argument (
1618
+ "--run-metadata" ,
1619
+ type = str ,
1620
+ help = "Also write finalized metrics payload to this JSON path." ,
1621
+ )
1417
1622
convert_parser .set_defaults (func = convert_command )
1418
1623
1419
1624
# Info command
0 commit comments