Skip to content

Commit 0ae5ac7

Browse files
committed
metrics(v1): structured conversion metrics + CLI flags; fsspec size calc; optional JSON schema validation
1 parent e0222d0 commit 0ae5ac7

File tree

2 files changed

+74
-78
lines changed

2 files changed

+74
-78
lines changed

docs/pipeline-integration-recommendations.md

Lines changed: 0 additions & 74 deletions
This file was deleted.

src/eopf_geozarr/metrics.py

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,19 +88,41 @@ def build_output_summary(self, output_path: str) -> None:
8888
"metadata": {"consolidated": None},
8989
}
9090

91-
# Size (local only)
91+
# Size (local or remote via fsspec)
9292
try:
93+
total = None
9394
op = Path(output_path)
9495
if op.exists():
95-
total = 0
96+
acc = 0
9697
for p in op.rglob("*"):
9798
if p.is_file():
9899
try:
99-
total += p.stat().st_size
100+
acc += p.stat().st_size
100101
except Exception:
101102
pass
102-
info["store_total_bytes"] = total
103+
total = acc
104+
else:
105+
try:
106+
import fsspec
107+
108+
fs, root = fsspec.core.url_to_fs(output_path)
109+
# find() returns all file-like entries under the root
110+
acc = 0
111+
for path in fs.find(root):
112+
try:
113+
info_dict = fs.info(path)
114+
size = info_dict.get("size")
115+
if isinstance(size, (int, float)):
116+
acc += int(size)
117+
except Exception:
118+
# ignore paths that fail info()
119+
pass
120+
total = acc
121+
except Exception:
122+
total = None
123+
info["store_total_bytes"] = total
103124
except Exception:
125+
# best-effort only
104126
pass
105127

106128
# Structure via xarray datatree (works for local and remote)
@@ -218,6 +240,54 @@ def _walk_z(g: Any, prefix: str = "") -> None:
218240

219241
self.output_info = info
220242

243+
@staticmethod
244+
def _schema() -> Dict[str, Any]:
245+
return {
246+
"$schema": "http://json-schema.org/draft-07/schema#",
247+
"type": "object",
248+
"required": [
249+
"schema_version",
250+
"run",
251+
"input",
252+
"output",
253+
"performance",
254+
"environment",
255+
],
256+
"properties": {
257+
"schema_version": {"type": "string"},
258+
"run": {
259+
"type": "object",
260+
"required": ["run_id", "status", "started_at"],
261+
"properties": {
262+
"run_id": {"type": "string"},
263+
"attempt": {"type": ["integer", "null"]},
264+
"started_at": {"type": "string"},
265+
"ended_at": {"type": ["string", "null"]},
266+
"status": {"type": "string"},
267+
"exception": {"type": ["string", "null"]},
268+
},
269+
},
270+
"input": {"type": "object"},
271+
"output": {"type": "object"},
272+
"performance": {"type": "object"},
273+
"environment": {"type": "object"},
274+
},
275+
"additionalProperties": True,
276+
}
277+
278+
@staticmethod
279+
def validate_payload(payload: Dict[str, Any]) -> List[str]:
280+
try:
281+
# Optional validation if jsonschema is available
282+
import jsonschema
283+
284+
validator = jsonschema.Draft7Validator(MetricsRecorder._schema())
285+
errors = [e.message for e in validator.iter_errors(payload)]
286+
return errors
287+
except Exception:
288+
# If jsonschema not installed or validation fails unexpectedly, skip
289+
return []
290+
221291
def finalize(self, status: str, exception: Optional[str] = None) -> Dict[str, Any]:
222292
self.ended_at = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
223293
wall = 0.0

0 commit comments

Comments
 (0)