From acc8da77dbbd2c14dd2eee1d023fe10bb551223e Mon Sep 17 00:00:00 2001 From: Aldon Smith Date: Wed, 27 May 2026 10:57:04 -0400 Subject: [PATCH] feat: add scheduled sentinel worker --- .env.example | 1 + docs/LEARNING_LOG.md | 60 +++++ docs/runtime/DOCKER_COMPOSE.md | 15 +- docs/runtime/TROUBLESHOOTING.md | 1 + infra/docker/docker-compose.yml | 15 +- services/process-sentinel/README.md | 24 ++ .../process-sentinel/process_sentinel/cli.py | 19 +- .../process_sentinel/worker.py | 242 ++++++++++++++++++ .../process-sentinel/tests/test_worker.py | 135 ++++++++++ .../simulator/tests/test_demo_make_targets.py | 2 +- .../tests/test_docker_compose_runtime_docs.py | 2 + 11 files changed, 495 insertions(+), 21 deletions(-) create mode 100644 services/process-sentinel/process_sentinel/worker.py create mode 100644 services/process-sentinel/tests/test_worker.py diff --git a/.env.example b/.env.example index 1cfcb3d..189be6c 100644 --- a/.env.example +++ b/.env.example @@ -11,6 +11,7 @@ DEMO_FACTORY_MQTT_ENDPOINT=mqtt://host.docker.internal:1883 DEMO_FACTORY_BACNET_ENDPOINT=host.docker.internal:47808 CONNECTOR_POLL_INTERVAL_SECONDS=30 CONNECTOR_INTERVAL_SECONDS=30 +SENTINEL_RUN_INTERVAL_SECONDS=60 SENTINEL_INTERVAL_SECONDS=60 MQTT_BROKER_URL=mqtt://localhost:1883 SIMULATOR_SEED=42 diff --git a/docs/LEARNING_LOG.md b/docs/LEARNING_LOG.md index 2415e78..4acad1d 100644 --- a/docs/LEARNING_LOG.md +++ b/docs/LEARNING_LOG.md @@ -22,6 +22,66 @@ This file should be updated by Codex after each meaningful change. ### What to learn next ``` +## 2026-05-27 - Scheduled Process Sentinel worker + +### What changed + +Added a scheduled Process Sentinel worker entrypoint and wired the FIP Compose +`sentinel-worker` service to run it directly. + +### Why it matters + +The Docker runtime can now run Process Sentinel repeatedly against stored +external-source FactoryEvents without relying on manual `make demo-*` commands. +The worker still produces advisory detections and recommendations only. + +### How it works + +`process_sentinel.worker` builds the configured event store and Sentinel state +store, then loops over the existing deterministic `run_sentinel` rules. Each +run logs the input event count plus detection, evidence, and recommendation +counts. Empty event stores produce an empty saved state, and transient storage +errors are logged and retried on the next interval. + +### How to run it + +```bash +PYTHONPATH=packages/factory-events:services/simulator:services/ingestion:services/process-sentinel:services/api \ +FACTORY_STORAGE_BACKEND=jsonl \ +FACTORY_EVENTS_STORE=.local/storage/events.jsonl \ +SENTINEL_STATE_DIR=.local/storage/sentinel \ +python -m process_sentinel.worker --once +``` + +For Compose, use: + +```bash +SENTINEL_RUN_INTERVAL_SECONDS=60 make compose-up +``` + +### How to test it + +```bash +.venv/bin/python -m pytest services/process-sentinel/tests/test_worker.py +make test-unit +make test-integration +make lint +make typecheck +``` + +### Key files + +- `services/process-sentinel/process_sentinel/worker.py` +- `services/process-sentinel/tests/test_worker.py` +- `infra/docker/docker-compose.yml` +- `services/process-sentinel/README.md` +- `docs/runtime/DOCKER_COMPOSE.md` + +### What to learn next + +Add the Compose smoke test that verifies connector ingestion, Sentinel output, +API health, and Workbench rendering in one end-to-end path. + ## 2026-05-27 - Connector worker runtime ### What changed diff --git a/docs/runtime/DOCKER_COMPOSE.md b/docs/runtime/DOCKER_COMPOSE.md index 201e2f7..51a699e 100644 --- a/docs/runtime/DOCKER_COMPOSE.md +++ b/docs/runtime/DOCKER_COMPOSE.md @@ -57,7 +57,8 @@ The default Compose services use explicit runtime variables: | `DEMO_FACTORY_BACNET_ENDPOINT` | `host.docker.internal:47808` | Demo-Factory BACnet source endpoint | | `CONNECTOR_POLL_INTERVAL_SECONDS` | `30` | Connector worker poll interval | | `CONNECTOR_INTERVAL_SECONDS` | `30` | Backwards-compatible connector interval fallback | -| `SENTINEL_INTERVAL_SECONDS` | `60` | Process Sentinel worker schedule interval | +| `SENTINEL_RUN_INTERVAL_SECONDS` | `60` | Process Sentinel worker run interval | +| `SENTINEL_INTERVAL_SECONDS` | `60` | Backwards-compatible Sentinel interval fallback | ## Start Demo-Factory @@ -115,6 +116,18 @@ BACnet adapters, and writes normalized FactoryEvents through the configured event store backend. Source unavailability is logged and retried on the next poll instead of crashing the whole Compose stack unnecessarily. +The `sentinel-worker` service runs: + +```bash +python -m process_sentinel.worker +``` + +That worker reads FactoryEvents from the configured event store, runs the +current Process Sentinel advisory rules, and writes detections, evidence, and +recommendations through the configured Sentinel state store. Each run logs the +event, detection, evidence, and recommendation counts. Transient storage errors +are logged and retried on the next interval. + Equivalent Makefile wrappers are available: ```bash diff --git a/docs/runtime/TROUBLESHOOTING.md b/docs/runtime/TROUBLESHOOTING.md index d2a85f1..b2a9353 100644 --- a/docs/runtime/TROUBLESHOOTING.md +++ b/docs/runtime/TROUBLESHOOTING.md @@ -134,6 +134,7 @@ DEMO_FACTORY_MQTT_ENDPOINT=mqtt://host.docker.internal:1883 DEMO_FACTORY_BACNET_ENDPOINT=host.docker.internal:47808 CONNECTOR_POLL_INTERVAL_SECONDS=30 CONNECTOR_INTERVAL_SECONDS=30 +SENTINEL_RUN_INTERVAL_SECONDS=60 ``` If connection profiles are missing in the API, load the checked-in fixture diff --git a/infra/docker/docker-compose.yml b/infra/docker/docker-compose.yml index 50c13b0..75929b9 100644 --- a/infra/docker/docker-compose.yml +++ b/infra/docker/docker-compose.yml @@ -115,24 +115,19 @@ services: <<: *fip-python-service environment: <<: *fip-runtime-environment + SENTINEL_RUN_INTERVAL_SECONDS: ${SENTINEL_RUN_INTERVAL_SECONDS:-60} SENTINEL_INTERVAL_SECONDS: ${SENTINEL_INTERVAL_SECONDS:-60} command: - - sh - - -c - - | - while true; do - python -m process_sentinel.cli \ - --storage-backend "$$FACTORY_STORAGE_BACKEND" \ - --database-url "$$DATABASE_URL" - sleep "$$SENTINEL_INTERVAL_SECONDS" - done + - python + - -m + - process_sentinel.worker healthcheck: test: [ "CMD", "python", "-c", - "import os, process_sentinel.cli; raise SystemExit(0 if os.getenv('SENTINEL_INTERVAL_SECONDS') else 1)", + "import os, process_sentinel.worker; raise SystemExit(0 if os.getenv('SENTINEL_RUN_INTERVAL_SECONDS') else 1)", ] interval: 30s timeout: 5s diff --git a/services/process-sentinel/README.md b/services/process-sentinel/README.md index 3771aef..ab833a4 100644 --- a/services/process-sentinel/README.md +++ b/services/process-sentinel/README.md @@ -43,6 +43,30 @@ Sentinel should use `FACTORY_STORAGE_BACKEND=postgres` and so detections, evidence, recommendations, approval decisions, and audit events are persisted in the shared Postgres store. +## Scheduled Worker Runtime + +The scheduled Process Sentinel worker entrypoint is: + +```bash +python -m process_sentinel.worker +``` + +It reads FactoryEvents from the configured runtime event store, runs the +current advisory Process Sentinel rules, and writes detections, evidence items, +recommendations, and supporting state through the configured Sentinel state +store. The run interval is controlled by `SENTINEL_RUN_INTERVAL_SECONDS`, with +`SENTINEL_INTERVAL_SECONDS` retained as a backwards-compatible fallback. + +For a one-shot local worker run against JSONL state: + +```bash +PYTHONPATH=packages/factory-events:services/simulator:services/ingestion:services/process-sentinel:services/api \ +FACTORY_STORAGE_BACKEND=jsonl \ +FACTORY_EVENTS_STORE=.local/storage/events.jsonl \ +SENTINEL_STATE_DIR=.local/storage/sentinel \ +python -m process_sentinel.worker --once +``` + ## Focused Development From the repository root: diff --git a/services/process-sentinel/process_sentinel/cli.py b/services/process-sentinel/process_sentinel/cli.py index 7241e31..9a0c17e 100644 --- a/services/process-sentinel/process_sentinel/cli.py +++ b/services/process-sentinel/process_sentinel/cli.py @@ -6,8 +6,8 @@ from factory_ingestion.storage import event_store_from_config -from process_sentinel.rules import run_sentinel from process_sentinel.storage import sentinel_store_from_config +from process_sentinel.worker import run_sentinel_once def main() -> None: @@ -22,23 +22,24 @@ def main() -> None: parser.add_argument("--database-url", default=os.getenv("DATABASE_URL")) args = parser.parse_args() - events = event_store_from_config( + events_store = event_store_from_config( events_store_path=args.events_store, database_url=args.database_url, storage_backend=args.storage_backend, - ).list_events() - result = run_sentinel(events) - sentinel_store_from_config( + ) + sentinel_store = sentinel_store_from_config( state_dir=args.state_dir, database_url=args.database_url, storage_backend=args.storage_backend, - ).save_run_result(result) + ) + summary = run_sentinel_once(events_store=events_store, sentinel_store=sentinel_store) print( "sentinel complete: " - f"detections={len(result.detections)} " - f"evidence={len(result.evidence_items)} " - f"recommendations={len(result.recommendations)} " + f"events={summary.event_count} " + f"detections={summary.detection_count} " + f"evidence={summary.evidence_count} " + f"recommendations={summary.recommendation_count} " f"storage_backend={args.storage_backend}" ) diff --git a/services/process-sentinel/process_sentinel/worker.py b/services/process-sentinel/process_sentinel/worker.py new file mode 100644 index 0000000..5c63422 --- /dev/null +++ b/services/process-sentinel/process_sentinel/worker.py @@ -0,0 +1,242 @@ +from __future__ import annotations + +import argparse +import asyncio +import logging +import os +from collections.abc import Awaitable, Callable, Sequence +from dataclasses import dataclass, field +from pathlib import Path + +from factory_ingestion.storage import EventStore, event_store_from_config + +from process_sentinel.models import SentinelRunResult +from process_sentinel.rules import run_sentinel +from process_sentinel.storage import SentinelStore, sentinel_store_from_config + +LOGGER = logging.getLogger(__name__) +DEFAULT_EVENTS_STORE = Path(".local/storage/events.jsonl") +DEFAULT_STATE_DIR = Path(".local/storage/sentinel") +DEFAULT_RUN_INTERVAL_SECONDS = 60.0 + +SleepCallable = Callable[[float], Awaitable[None]] + + +@dataclass(frozen=True) +class SentinelWorkerConfig: + events_store_path: Path = DEFAULT_EVENTS_STORE + state_dir: Path = DEFAULT_STATE_DIR + storage_backend: str | None = None + database_url: str | None = None + run_interval_seconds: float = DEFAULT_RUN_INTERVAL_SECONDS + max_iterations: int | None = None + + +@dataclass(frozen=True) +class SentinelWorkerIterationSummary: + run_index: int + event_count: int = 0 + detection_count: int = 0 + evidence_count: int = 0 + recommendation_count: int = 0 + error: str | None = None + + +@dataclass(frozen=True) +class SentinelWorkerRunSummary: + iterations: int + runs: tuple[SentinelWorkerIterationSummary, ...] = field(default_factory=tuple) + + @property + def error_count(self) -> int: + return sum(1 for run in self.runs if run.error is not None) + + @property + def detection_count(self) -> int: + return sum(run.detection_count for run in self.runs) + + @property + def evidence_count(self) -> int: + return sum(run.evidence_count for run in self.runs) + + @property + def recommendation_count(self) -> int: + return sum(run.recommendation_count for run in self.runs) + + +def run_sentinel_once( + *, + events_store: EventStore, + sentinel_store: SentinelStore, + run_index: int = 0, + logger: logging.Logger | None = None, +) -> SentinelWorkerIterationSummary: + resolved_logger = logger or LOGGER + resolved_logger.info("process sentinel run start: run_index=%s", run_index) + + events = events_store.list_events() + result = run_sentinel(events) + sentinel_store.save_run_result(result) + summary = _summary_from_result(run_index, event_count=len(events), result=result) + resolved_logger.info( + "process sentinel run complete: " + "run_index=%s events=%s detections=%s evidence=%s recommendations=%s", + summary.run_index, + summary.event_count, + summary.detection_count, + summary.evidence_count, + summary.recommendation_count, + ) + return summary + + +async def run_sentinel_worker( + config: SentinelWorkerConfig, + *, + events_store: EventStore | None = None, + sentinel_store: SentinelStore | None = None, + sleep: SleepCallable = asyncio.sleep, + logger: logging.Logger | None = None, +) -> SentinelWorkerRunSummary: + _validate_config(config) + resolved_logger = logger or LOGGER + resolved_events_store = events_store or event_store_from_config( + events_store_path=config.events_store_path, + database_url=config.database_url, + storage_backend=config.storage_backend, + ) + resolved_sentinel_store = sentinel_store or sentinel_store_from_config( + state_dir=config.state_dir, + database_url=config.database_url, + storage_backend=config.storage_backend, + ) + runs: list[SentinelWorkerIterationSummary] = [] + run_index = 0 + + while config.max_iterations is None or run_index < config.max_iterations: + try: + summary = run_sentinel_once( + events_store=resolved_events_store, + sentinel_store=resolved_sentinel_store, + run_index=run_index, + logger=resolved_logger, + ) + except Exception as exc: + summary = SentinelWorkerIterationSummary(run_index=run_index, error=str(exc)) + resolved_logger.warning( + "process sentinel run failed; will retry on next interval: " + "run_index=%s error=%s", + run_index, + exc, + ) + runs.append(summary) + run_index += 1 + + if config.max_iterations is not None and run_index >= config.max_iterations: + break + await sleep(config.run_interval_seconds) + + return SentinelWorkerRunSummary(iterations=run_index, runs=tuple(runs)) + + +def parse_config(argv: Sequence[str] | None = None) -> SentinelWorkerConfig: + parser = argparse.ArgumentParser( + description="Run the scheduled Process Sentinel worker." + ) + parser.add_argument( + "--events-store", + type=Path, + default=Path(os.getenv("FACTORY_EVENTS_STORE", str(DEFAULT_EVENTS_STORE))), + help="JSONL event store path when FACTORY_STORAGE_BACKEND=jsonl.", + ) + parser.add_argument( + "--state-dir", + type=Path, + default=Path(os.getenv("SENTINEL_STATE_DIR", str(DEFAULT_STATE_DIR))), + help="JSON state directory when FACTORY_STORAGE_BACKEND=jsonl.", + ) + parser.add_argument( + "--storage-backend", + choices=("jsonl", "postgres"), + default=os.getenv("FACTORY_STORAGE_BACKEND", "jsonl"), + ) + parser.add_argument("--database-url", default=os.getenv("DATABASE_URL")) + parser.add_argument( + "--run-interval", + type=float, + default=_run_interval_from_env(), + help="Seconds to wait between Sentinel runs.", + ) + parser.add_argument( + "--once", + action="store_true", + help="Run one Sentinel pass and exit.", + ) + parser.add_argument( + "--max-iterations", + type=int, + default=None, + help="Run a finite number of Sentinel passes and exit. Intended for tests.", + ) + args = parser.parse_args(argv) + + max_iterations = 1 if args.once else args.max_iterations + return SentinelWorkerConfig( + events_store_path=args.events_store, + state_dir=args.state_dir, + storage_backend=args.storage_backend, + database_url=args.database_url, + run_interval_seconds=args.run_interval, + max_iterations=max_iterations, + ) + + +def main(argv: Sequence[str] | None = None) -> int: + logging.basicConfig( + level=os.getenv("LOG_LEVEL", "INFO").upper(), + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + try: + config = parse_config(argv) + asyncio.run(run_sentinel_worker(config)) + except KeyboardInterrupt: + LOGGER.info("process sentinel worker stopped") + return 0 + except ValueError as exc: + LOGGER.error("process sentinel worker configuration error: %s", exc) + return 2 + return 0 + + +def _summary_from_result( + run_index: int, *, event_count: int, result: SentinelRunResult +) -> SentinelWorkerIterationSummary: + return SentinelWorkerIterationSummary( + run_index=run_index, + event_count=event_count, + detection_count=len(result.detections), + evidence_count=len(result.evidence_items), + recommendation_count=len(result.recommendations), + ) + + +def _run_interval_from_env() -> float: + raw_value = ( + os.getenv("SENTINEL_RUN_INTERVAL_SECONDS") + or os.getenv("SENTINEL_INTERVAL_SECONDS") + or str(DEFAULT_RUN_INTERVAL_SECONDS) + ) + return float(raw_value) + + +def _validate_config(config: SentinelWorkerConfig) -> None: + if config.run_interval_seconds < 0: + msg = "run_interval_seconds must be greater than or equal to 0" + raise ValueError(msg) + if config.max_iterations is not None and config.max_iterations <= 0: + msg = "max_iterations must be greater than 0 when provided" + raise ValueError(msg) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/services/process-sentinel/tests/test_worker.py b/services/process-sentinel/tests/test_worker.py new file mode 100644 index 0000000..6229ed6 --- /dev/null +++ b/services/process-sentinel/tests/test_worker.py @@ -0,0 +1,135 @@ +from __future__ import annotations + +import asyncio +import logging +from pathlib import Path + +from factory_events import EventEnvelope +from factory_ingestion.storage import JsonlEventStore +from factory_simulator import generate_events +from process_sentinel.models import SentinelRunResult +from process_sentinel.storage import SentinelStateStore +from process_sentinel.worker import ( + SentinelWorkerConfig, + parse_config, + run_sentinel_once, + run_sentinel_worker, +) + + +class FlakyEventStore: + def __init__(self, events: list[EventEnvelope]) -> None: + self.events = events + self.calls = 0 + + def list_events(self) -> list[EventEnvelope]: + self.calls += 1 + if self.calls == 1: + raise RuntimeError("temporary event store outage") + return self.events + + +class RecordingSentinelStore: + def __init__(self) -> None: + self.saved_results: list[SentinelRunResult] = [] + + def save_run_result(self, result: SentinelRunResult) -> None: + self.saved_results.append(result) + + +def test_sentinel_once_reads_events_and_writes_state(tmp_path: Path) -> None: + events_store = JsonlEventStore(tmp_path / "events.jsonl") + for event in generate_events("fill_weight_drift_demo", seed=120, count=30): + events_store.append(event) + sentinel_store = SentinelStateStore(tmp_path / "sentinel") + + summary = run_sentinel_once(events_store=events_store, sentinel_store=sentinel_store) + + assert summary.event_count == 70 + assert summary.detection_count == 1 + assert summary.evidence_count >= 2 + assert summary.recommendation_count == 1 + assert sentinel_store.list_detections() + assert sentinel_store.list_evidence("det_fill_weight_gradual_drift") + assert sentinel_store.list_recommendations(status="needs_review") + + +def test_sentinel_once_handles_empty_event_store(tmp_path: Path) -> None: + sentinel_store = SentinelStateStore(tmp_path / "sentinel") + + summary = run_sentinel_once( + events_store=JsonlEventStore(tmp_path / "events.jsonl"), + sentinel_store=sentinel_store, + ) + + assert summary.event_count == 0 + assert summary.detection_count == 0 + assert summary.evidence_count == 0 + assert summary.recommendation_count == 0 + assert sentinel_store.list_detections() == [] + assert sentinel_store.list_recommendations() == [] + + +def test_scheduled_worker_uses_configured_interval(tmp_path: Path) -> None: + slept: list[float] = [] + + async def record_sleep(seconds: float) -> None: + slept.append(seconds) + + result = asyncio.run( + run_sentinel_worker( + SentinelWorkerConfig( + events_store_path=tmp_path / "events.jsonl", + state_dir=tmp_path / "sentinel", + storage_backend="jsonl", + run_interval_seconds=2.5, + max_iterations=2, + ), + sleep=record_sleep, + ) + ) + + assert result.iterations == 2 + assert result.error_count == 0 + assert slept == [2.5] + + +def test_scheduled_worker_logs_and_retries_transient_storage_errors( + caplog, +) -> None: + events = generate_events("normal", seed=42, count=6) + event_store = FlakyEventStore(events) + sentinel_store = RecordingSentinelStore() + + async def no_sleep(seconds: float) -> None: + assert seconds == 0.01 + + caplog.set_level(logging.WARNING, logger="process_sentinel.worker") + result = asyncio.run( + run_sentinel_worker( + SentinelWorkerConfig( + storage_backend="jsonl", + run_interval_seconds=0.01, + max_iterations=2, + ), + events_store=event_store, + sentinel_store=sentinel_store, + sleep=no_sleep, + ) + ) + + assert result.iterations == 2 + assert result.error_count == 1 + assert event_store.calls == 2 + assert len(sentinel_store.saved_results) == 1 + assert "will retry on next interval" in caplog.text + + +def test_parse_config_prefers_sentinel_run_interval_env(monkeypatch) -> None: + monkeypatch.setenv("SENTINEL_RUN_INTERVAL_SECONDS", "7.5") + monkeypatch.setenv("SENTINEL_INTERVAL_SECONDS", "99") + + config = parse_config(["--once"]) + + assert config.run_interval_seconds == 7.5 + assert config.max_iterations == 1 diff --git a/services/simulator/tests/test_demo_make_targets.py b/services/simulator/tests/test_demo_make_targets.py index 54c4f07..13b0bdc 100644 --- a/services/simulator/tests/test_demo_make_targets.py +++ b/services/simulator/tests/test_demo_make_targets.py @@ -62,7 +62,7 @@ def test_demo_make_targets_reset_generate_ingest_and_run_sentinel( sentinel_output = run_make("demo-sentinel-run", variables) assert (demo_state_dir / "detections.json").exists() - assert "sentinel complete: detections=1 evidence=2 recommendations=1" in ( + assert "sentinel complete: events=70 detections=1 evidence=2 recommendations=1" in ( sentinel_output ) assert "Next: make api" in sentinel_output diff --git a/services/simulator/tests/test_docker_compose_runtime_docs.py b/services/simulator/tests/test_docker_compose_runtime_docs.py index 7681fb7..5018220 100644 --- a/services/simulator/tests/test_docker_compose_runtime_docs.py +++ b/services/simulator/tests/test_docker_compose_runtime_docs.py @@ -107,7 +107,9 @@ def test_docker_compose_runtime_defines_fip_services_without_simulator() -> None "DEMO_FACTORY_MQTT_ENDPOINT", "DEMO_FACTORY_BACNET_ENDPOINT", "freeopcua/server/", + "CONNECTOR_POLL_INTERVAL_SECONDS", "CONNECTOR_INTERVAL_SECONDS", + "SENTINEL_RUN_INTERVAL_SECONDS", "SENTINEL_INTERVAL_SECONDS", "healthcheck:", ]