Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ DEMO_FACTORY_MQTT_ENDPOINT=mqtt://host.docker.internal:1883
DEMO_FACTORY_BACNET_ENDPOINT=host.docker.internal:47808
CONNECTOR_POLL_INTERVAL_SECONDS=30
CONNECTOR_INTERVAL_SECONDS=30
SENTINEL_RUN_INTERVAL_SECONDS=60
SENTINEL_INTERVAL_SECONDS=60
MQTT_BROKER_URL=mqtt://localhost:1883
SIMULATOR_SEED=42
Expand Down
60 changes: 60 additions & 0 deletions docs/LEARNING_LOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,66 @@ This file should be updated by Codex after each meaningful change.
### What to learn next
```

## 2026-05-27 - Scheduled Process Sentinel worker

### What changed

Added a scheduled Process Sentinel worker entrypoint and wired the FIP Compose
`sentinel-worker` service to run it directly.

### Why it matters

The Docker runtime can now run Process Sentinel repeatedly against stored
external-source FactoryEvents without relying on manual `make demo-*` commands.
The worker still produces advisory detections and recommendations only.

### How it works

`process_sentinel.worker` builds the configured event store and Sentinel state
store, then loops over the existing deterministic `run_sentinel` rules. Each
run logs the input event count plus detection, evidence, and recommendation
counts. Empty event stores produce an empty saved state, and transient storage
errors are logged and retried on the next interval.

### How to run it

```bash
PYTHONPATH=packages/factory-events:services/simulator:services/ingestion:services/process-sentinel:services/api \
FACTORY_STORAGE_BACKEND=jsonl \
FACTORY_EVENTS_STORE=.local/storage/events.jsonl \
SENTINEL_STATE_DIR=.local/storage/sentinel \
python -m process_sentinel.worker --once
```

For Compose, use:

```bash
SENTINEL_RUN_INTERVAL_SECONDS=60 make compose-up
```

### How to test it

```bash
.venv/bin/python -m pytest services/process-sentinel/tests/test_worker.py
make test-unit
make test-integration
make lint
make typecheck
```

### Key files

- `services/process-sentinel/process_sentinel/worker.py`
- `services/process-sentinel/tests/test_worker.py`
- `infra/docker/docker-compose.yml`
- `services/process-sentinel/README.md`
- `docs/runtime/DOCKER_COMPOSE.md`

### What to learn next

Add the Compose smoke test that verifies connector ingestion, Sentinel output,
API health, and Workbench rendering in one end-to-end path.

## 2026-05-27 - Connector worker runtime

### What changed
Expand Down
15 changes: 14 additions & 1 deletion docs/runtime/DOCKER_COMPOSE.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ The default Compose services use explicit runtime variables:
| `DEMO_FACTORY_BACNET_ENDPOINT` | `host.docker.internal:47808` | Demo-Factory BACnet source endpoint |
| `CONNECTOR_POLL_INTERVAL_SECONDS` | `30` | Connector worker poll interval |
| `CONNECTOR_INTERVAL_SECONDS` | `30` | Backwards-compatible connector interval fallback |
| `SENTINEL_INTERVAL_SECONDS` | `60` | Process Sentinel worker schedule interval |
| `SENTINEL_RUN_INTERVAL_SECONDS` | `60` | Process Sentinel worker run interval |
| `SENTINEL_INTERVAL_SECONDS` | `60` | Backwards-compatible Sentinel interval fallback |

## Start Demo-Factory

Expand Down Expand Up @@ -115,6 +116,18 @@ BACnet adapters, and writes normalized FactoryEvents through the configured
event store backend. Source unavailability is logged and retried on the next
poll instead of crashing the whole Compose stack unnecessarily.

The `sentinel-worker` service runs:

```bash
python -m process_sentinel.worker
```

That worker reads FactoryEvents from the configured event store, runs the
current Process Sentinel advisory rules, and writes detections, evidence, and
recommendations through the configured Sentinel state store. Each run logs the
event, detection, evidence, and recommendation counts. Transient storage errors
are logged and retried on the next interval.

Equivalent Makefile wrappers are available:

```bash
Expand Down
1 change: 1 addition & 0 deletions docs/runtime/TROUBLESHOOTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ DEMO_FACTORY_MQTT_ENDPOINT=mqtt://host.docker.internal:1883
DEMO_FACTORY_BACNET_ENDPOINT=host.docker.internal:47808
CONNECTOR_POLL_INTERVAL_SECONDS=30
CONNECTOR_INTERVAL_SECONDS=30
SENTINEL_RUN_INTERVAL_SECONDS=60
```

If connection profiles are missing in the API, load the checked-in fixture
Expand Down
15 changes: 5 additions & 10 deletions infra/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,19 @@ services:
<<: *fip-python-service
environment:
<<: *fip-runtime-environment
SENTINEL_RUN_INTERVAL_SECONDS: ${SENTINEL_RUN_INTERVAL_SECONDS:-60}
SENTINEL_INTERVAL_SECONDS: ${SENTINEL_INTERVAL_SECONDS:-60}
command:
- sh
- -c
- |
while true; do
python -m process_sentinel.cli \
--storage-backend "$$FACTORY_STORAGE_BACKEND" \
--database-url "$$DATABASE_URL"
sleep "$$SENTINEL_INTERVAL_SECONDS"
done
- python
- -m
- process_sentinel.worker
healthcheck:
test:
[
"CMD",
"python",
"-c",
"import os, process_sentinel.cli; raise SystemExit(0 if os.getenv('SENTINEL_INTERVAL_SECONDS') else 1)",
"import os, process_sentinel.worker; raise SystemExit(0 if os.getenv('SENTINEL_RUN_INTERVAL_SECONDS') else 1)",
]
interval: 30s
timeout: 5s
Expand Down
24 changes: 24 additions & 0 deletions services/process-sentinel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,30 @@ Sentinel should use `FACTORY_STORAGE_BACKEND=postgres` and
so detections, evidence, recommendations, approval decisions, and audit events
are persisted in the shared Postgres store.

## Scheduled Worker Runtime

The scheduled Process Sentinel worker entrypoint is:

```bash
python -m process_sentinel.worker
```

It reads FactoryEvents from the configured runtime event store, runs the
current advisory Process Sentinel rules, and writes detections, evidence items,
recommendations, and supporting state through the configured Sentinel state
store. The run interval is controlled by `SENTINEL_RUN_INTERVAL_SECONDS`, with
`SENTINEL_INTERVAL_SECONDS` retained as a backwards-compatible fallback.

For a one-shot local worker run against JSONL state:

```bash
PYTHONPATH=packages/factory-events:services/simulator:services/ingestion:services/process-sentinel:services/api \
FACTORY_STORAGE_BACKEND=jsonl \
FACTORY_EVENTS_STORE=.local/storage/events.jsonl \
SENTINEL_STATE_DIR=.local/storage/sentinel \
python -m process_sentinel.worker --once
```

## Focused Development

From the repository root:
Expand Down
19 changes: 10 additions & 9 deletions services/process-sentinel/process_sentinel/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

from factory_ingestion.storage import event_store_from_config

from process_sentinel.rules import run_sentinel
from process_sentinel.storage import sentinel_store_from_config
from process_sentinel.worker import run_sentinel_once


def main() -> None:
Expand All @@ -22,23 +22,24 @@ def main() -> None:
parser.add_argument("--database-url", default=os.getenv("DATABASE_URL"))
args = parser.parse_args()

events = event_store_from_config(
events_store = event_store_from_config(
events_store_path=args.events_store,
database_url=args.database_url,
storage_backend=args.storage_backend,
).list_events()
result = run_sentinel(events)
sentinel_store_from_config(
)
sentinel_store = sentinel_store_from_config(
state_dir=args.state_dir,
database_url=args.database_url,
storage_backend=args.storage_backend,
).save_run_result(result)
)
summary = run_sentinel_once(events_store=events_store, sentinel_store=sentinel_store)

print(
"sentinel complete: "
f"detections={len(result.detections)} "
f"evidence={len(result.evidence_items)} "
f"recommendations={len(result.recommendations)} "
f"events={summary.event_count} "
f"detections={summary.detection_count} "
f"evidence={summary.evidence_count} "
f"recommendations={summary.recommendation_count} "
f"storage_backend={args.storage_backend}"
)

Expand Down
Loading
Loading