Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cms_pricing/ingestion/observability/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
DISObservabilityCollector, MetricsCollector, Metric, ObservabilityReport,
MetricType
)
from .dis_observability import AlertDecision
from ..contracts.ingestor_spec import ValidationSeverity

__all__ = [
"DISObservabilityCollector",
"MetricsCollector",
"Metric",
"ObservabilityReport",
"ObservabilityReport",
"MetricType",
"ValidationSeverity"
"ValidationSeverity",
"AlertDecision",
]
285 changes: 256 additions & 29 deletions cms_pricing/ingestion/observability/dis_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
"""

import json
import hashlib
from dataclasses import dataclass, asdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Union
from pathlib import Path
Expand Down Expand Up @@ -178,6 +177,17 @@ def __post_init__(self):
self.lineage_score = 1.0


@dataclass
class AlertDecision:
"""Represents alert routing decisions for a run."""

severity: str
should_page: bool
should_block_run: bool
triggers: List[str] = field(default_factory=list)
evaluated_at: datetime = field(default_factory=datetime.utcnow)


@dataclass
class DISObservabilityReport:
"""Complete 5-pillar observability report"""
Expand All @@ -191,7 +201,8 @@ class DISObservabilityReport:
overall_score: float
critical_alerts: List[str]
warnings: List[str]

alert_decision: AlertDecision = None

def __post_init__(self):
# Calculate overall score as weighted average
weights = {
Expand Down Expand Up @@ -222,6 +233,83 @@ def __post_init__(self):
self.critical_alerts = [alert for alert in all_alerts if any(keyword in alert.lower() for keyword in ['failed', 'breaking', 'critical', 'stale'])]
self.warnings = [alert for alert in all_alerts if alert not in self.critical_alerts]

# Determine alert routing
severity = "normal"
should_page = False
should_block = False
triggers: List[str] = []

if self.critical_alerts:
severity = "critical"
should_page = True
should_block = True
triggers = list(self.critical_alerts)
elif self.warnings:
severity = "warning"
triggers = list(self.warnings)

if self.overall_score < 0.5 and severity == "normal":
severity = "degraded"
triggers.append(
f"Overall score {self.overall_score:.2f} below 0.50 threshold"
)

self.alert_decision = AlertDecision(
severity=severity,
should_page=should_page,
should_block_run=should_block,
triggers=triggers,
)


class ObservabilityHistoryStore:
"""Persistent storage for observability report history."""

def __init__(self, output_dir: Path, filename: str = "observability_history.json"):
self.output_dir = output_dir
self.index_path = self.output_dir / filename
self._cache: Optional[List[Dict[str, Any]]] = None

def load_index(self) -> List[Dict[str, Any]]:
if self._cache is not None:
return self._cache

if not self.index_path.exists():
self._cache = []
return self._cache

try:
with open(self.index_path, "r") as fh:
data = json.load(fh)
if isinstance(data, list):
self._cache = data
else:
logger.warning(
"Observability history index invalid format; resetting",
path=str(self.index_path),
)
self._cache = []
except (OSError, json.JSONDecodeError) as exc:
logger.warning(
"Failed to load observability history index; starting fresh",
path=str(self.index_path),
error=str(exc),
)
self._cache = []

return self._cache

def append_entry(self, entry: Dict[str, Any]) -> None:
history = list(self.load_index())
history.append(entry)
self._write(history)
self._cache = history

def _write(self, history: List[Dict[str, Any]]) -> None:
self.output_dir.mkdir(parents=True, exist_ok=True)
with open(self.index_path, "w") as fh:
json.dump(history, fh, indent=2)


class DISObservabilityCollector:
"""
Expand All @@ -231,7 +319,9 @@ class DISObservabilityCollector:
def __init__(self, output_dir: str = "data/observability"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.metrics_history: List[DISObservabilityReport] = []
self._history_store = ObservabilityHistoryStore(self.output_dir)
self.metrics_history: List[DISObservabilityReport] = self._load_history()
self.last_alert_decision: Optional[AlertDecision] = None

def collect_freshness_metrics(
self,
Expand Down Expand Up @@ -374,41 +464,176 @@ def generate_observability_report(

# Store in history
self.metrics_history.append(report)

# Save to disk
self._save_report(report)

self._persist_history_entry(report)
self.last_alert_decision = report.alert_decision
self._dispatch_alerts(report)

return report

def _save_report(self, report: DISObservabilityReport):
"""Save observability report to disk"""
timestamp = report.report_timestamp.strftime("%Y%m%d_%H%M%S")
filename = f"{report.dataset_name}_observability_{timestamp}.json"
filepath = self.output_dir / filename

# Convert to dict for JSON serialization
report_dict = asdict(report)

# Convert datetime objects to ISO strings
def convert_datetime(obj):
if isinstance(obj, datetime):
return obj.isoformat()
return obj

def recursive_convert(obj):
if isinstance(obj, dict):
return {k: recursive_convert(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [recursive_convert(item) for item in obj]
else:
return convert_datetime(obj)

report_dict = recursive_convert(report_dict)

report_dict = self._serialize_dataclass(report)

with open(filepath, 'w') as f:
json.dump(report_dict, f, indent=2)

logger.info(f"Saved observability report: {filepath}")

def _serialize_dataclass(self, obj: Any) -> Any:
if isinstance(obj, datetime):
return obj.isoformat()
if hasattr(obj, "__dataclass_fields__"):
return {
key: self._serialize_dataclass(getattr(obj, key))
for key in obj.__dataclass_fields__.keys()
}
if isinstance(obj, dict):
return {key: self._serialize_dataclass(value) for key, value in obj.items()}
if isinstance(obj, list):
return [self._serialize_dataclass(item) for item in obj]
return obj

def _persist_history_entry(self, report: DISObservabilityReport) -> None:
entry = {
"run_id": report.lineage.ingest_run_id,
"dataset": report.dataset_name,
"release_id": report.lineage.release_id,
"report_timestamp": report.report_timestamp.isoformat(),
"overall_score": report.overall_score,
"critical_alerts": list(report.critical_alerts),
"warnings": list(report.warnings),
"alert_decision": self._serialize_dataclass(report.alert_decision)
if report.alert_decision
else None,
"report": self._serialize_dataclass(report),
}

self._history_store.append_entry(entry)

def _load_history(self) -> List[DISObservabilityReport]:
reports: List[DISObservabilityReport] = []
for entry in self._history_store.load_index():
payload = entry.get("report")
if not payload:
continue
try:
reports.append(self._report_from_dict(payload))
except Exception as exc: # pragma: no cover - defensive logging
logger.warning(
"Failed to hydrate observability report from history",
run_id=entry.get("run_id"),
error=str(exc),
)
return reports

def _parse_datetime(self, value: Optional[str], *, default: Optional[datetime] = None) -> Optional[datetime]:
if value is None:
return default
try:
return datetime.fromisoformat(value)
except ValueError:
return default

def _report_from_dict(self, data: Dict[str, Any]) -> DISObservabilityReport:
freshness_dict = data.get("freshness", {})
freshness = FreshnessMetrics(
last_updated=self._parse_datetime(freshness_dict.get("last_updated"), default=datetime.utcnow()),
expected_frequency_hours=freshness_dict.get("expected_frequency_hours", 0.0),
actual_frequency_hours=freshness_dict.get("actual_frequency_hours"),
)

volume_dict = data.get("volume", {})
volume = VolumeMetrics(
total_records=volume_dict.get("total_records", 0),
total_size_bytes=volume_dict.get("total_size_bytes", 0),
expected_records=volume_dict.get("expected_records"),
expected_size_bytes=volume_dict.get("expected_size_bytes"),
record_growth_rate=volume_dict.get("record_growth_rate"),
size_growth_rate=volume_dict.get("size_growth_rate"),
)

schema_dict = data.get("schema", {})
schema = SchemaMetrics(
schema_version=schema_dict.get("schema_version", ""),
schema_contract_valid=schema_dict.get("schema_contract_valid", True),
schema_evolution_detected=schema_dict.get("schema_evolution_detected", False),
breaking_changes=schema_dict.get("breaking_changes", 0),
non_breaking_changes=schema_dict.get("non_breaking_changes", 0),
)

quality_dict = data.get("quality", {})
quality = QualityMetrics(
quality_score=quality_dict.get("quality_score", 1.0),
validation_rules_passed=quality_dict.get("validation_rules_passed", 0),
validation_rules_failed=quality_dict.get("validation_rules_failed", 0),
null_rate=quality_dict.get("null_rate", 0.0),
duplicate_rate=quality_dict.get("duplicate_rate", 0.0),
completeness_rate=quality_dict.get("completeness_rate", 1.0),
accuracy_rate=quality_dict.get("accuracy_rate", 1.0),
quality_threshold=quality_dict.get("quality_threshold", 0.95),
)

lineage_dict = data.get("lineage", {})
lineage = LineageMetrics(
source_urls=lineage_dict.get("source_urls", []),
source_checksums=lineage_dict.get("source_checksums", []),
transformation_steps=lineage_dict.get("transformation_steps", []),
processing_timestamp=self._parse_datetime(
lineage_dict.get("processing_timestamp"), default=datetime.utcnow()
),
ingest_run_id=lineage_dict.get("ingest_run_id", ""),
batch_id=lineage_dict.get("batch_id", ""),
release_id=lineage_dict.get("release_id", ""),
)

report_timestamp = self._parse_datetime(
data.get("report_timestamp"), default=datetime.utcnow()
) or datetime.utcnow()

report = DISObservabilityReport(
dataset_name=data.get("dataset_name", ""),
report_timestamp=report_timestamp,
freshness=freshness,
volume=volume,
schema=schema,
quality=quality,
lineage=lineage,
overall_score=data.get("overall_score", 0.0),
critical_alerts=data.get("critical_alerts", []),
warnings=data.get("warnings", []),
)

return report
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: History Reports: Time Rewrites The Past

When loading reports from history, _report_from_dict reconstructs metric dataclasses which triggers their __post_init__ methods. These methods recalculate time-dependent values like staleness_hours using datetime.utcnow() instead of the original report timestamp, causing historical "healthy" reports to appear increasingly stale over time. The recalculated alerts then cause DISObservabilityReport.__post_init__ to generate incorrect alert_decision values, breaking the persistence feature where a "normal" severity report could become "critical" when reloaded.

Fix in Cursor Fix in Web


def _dispatch_alerts(self, report: DISObservabilityReport) -> None:
decision = report.alert_decision
if not decision:
return

context = {
"dataset": report.dataset_name,
"run_id": report.lineage.ingest_run_id,
"release_id": report.lineage.release_id,
"severity": decision.severity,
"triggers": decision.triggers,
}

if decision.should_block_run:
logger.error("Blocking run due to observability alerts", **context)
elif decision.should_page:
logger.warning("Paging on observability alerts", **context)
elif decision.triggers:
logger.info("Observability warnings recorded", **context)
else:
logger.info("Observability run healthy", **context)

def get_latest_report(self, dataset_name: str) -> Optional[DISObservabilityReport]:
"""Get the latest observability report for a dataset"""
Expand Down Expand Up @@ -439,9 +664,11 @@ def get_observability_summary(self) -> Dict[str, Any]:
"overall_score": report.overall_score,
"critical_alerts": len(report.critical_alerts),
"warnings": len(report.warnings),
"last_updated": report.report_timestamp.isoformat()
"last_updated": report.report_timestamp.isoformat(),
"alert_severity": report.alert_decision.severity if report.alert_decision else "unknown",
"block_run": report.alert_decision.should_block_run if report.alert_decision else False,
}

critical_count += len(report.critical_alerts)
warning_count += len(report.warnings)

Expand Down
Loading
Loading