Conversation
|
Firetiger deploy monitoring skipped This PR didn't match the auto-monitor filter configured on your GitHub connection:
Reason: PR title and empty body provide insufficient information to determine if kernel API endpoints or Temporal workflows are being modified; please add details about what files are changed. To monitor this PR anyway, reply with |
a19154d to
fc419d0
Compare
| @@ -0,0 +1,155 @@ | |||
| # External Event Ingestion + SSE Streaming — Plan | |||
|
|
|||
| **Scope:** Two new HTTP endpoints layered on top of the merged CDP base: `POST /events/capture_session/publish` (external event ingestion) and `GET /events/capture_session/stream` (SSE live stream), both wired into the existing resource-style `CaptureSession`. | |||
There was a problem hiding this comment.
i don't think the url here should be nested under capture_session. in my mind the capture session is a singleton producer the API runs / users can configure that publishes to a shared event stream that others can publish into. it doesn't own this stream
so either POST /events or POST /events/publish and GET /events or GET /evenst/stream makes more sense to me
There was a problem hiding this comment.
Wouldn't we want the /events/publish tied to a capture_session_id for traceability?
There was a problem hiding this comment.
The endpoint changed to events/publish but made it so that the a capture session is required so it goes through the pipeline
| | POST | `/events/capture_session/publish` | `ApiService.PublishEvent` | `publishEvent` | | ||
| | GET | `/events/capture_session/stream` | `ApiService.StreamCaptureSession` (SSE) | `streamCaptureSession` | | ||
|
|
||
| The stream endpoint follows the same singleton pattern as the other `/events/capture_session` routes. Handlers reference `s.captureSession` directly; the endpoint returns 404 when no session is active. |
There was a problem hiding this comment.
i don't think the stream endpoint should be stateful or a singleton. Many API users should be able to call this and request events starting after some time or sequence number. It's not tied to capture session, in fact capture session might not even be running
There was a problem hiding this comment.
Okay understood.
So then diving into the lifecycle of the stream:
Client Pipeline
│ │
│ GET /events/stream?after_seq=N │
│───────────────────────────────────▶│
│ │
│ SSE connection opens │
│◀───────────────────────────────────│
│ │
│ event (seq=N+1) │
│◀───────────────────────────────────│
│ event (seq=N+2) │
│◀───────────────────────────────────│
│ ... │
│ │
├──── Client disconnects ────────────┤
│ OR │
├──── Server shuts down ─────────────┤
│ │
│ Connection closes │
│◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
- if we did decouple the
events/publishfrom the capture session, then would we leave the stream open?- since this is opt-in, we'd need to figure out the use cases to leave the stream connection open/closed
There was a problem hiding this comment.
Once a capture session is active, multiple streams will be able to connect/disconnect either at the oldest record or a certain seq
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 2bbe5a3. Configure here.
Sayan-
left a comment
There was a problem hiding this comment.
overall semantics look good to me!
|
|
||
| Accepts a JSON `events.Event` body and publishes it into the currently active `CaptureSession`. | ||
|
|
||
| **Publish requires an active capture session.** If no session is active the handler returns `400` — the caller must `POST /events/start` first. The session is a precondition for publishing, not an implicitly-created resource. |
| - `400` when `type` is empty. | ||
| - `200` on successful publish. | ||
|
|
||
| The handler unconditionally sets `source.kind = KindKernelAPI` **after** decoding, overwriting whatever the caller supplied. This is not a default — it is an enforcement. A caller that sends `"source": {"kind": "cdp"}` must not be able to forge CDP provenance; `source.kind` is documented as the fan-out key (§3.2) and its value for this endpoint is not negotiable. |
There was a problem hiding this comment.
hmm is kind an enum or known set of strings? for example could we simpler interpolate the provided kind with something like event_publish? would that be more or less confusing downstream?
There was a problem hiding this comment.
Yeah kind is coming from the types set in event.go
type SourceKind string
const (
KindCDP SourceKind = "cdp"
KindKernelAPI SourceKind = "kernel_api"
KindExtension SourceKind = "extension"
KindLocalProcess SourceKind = "local_process"
)
There was a problem hiding this comment.
hmm. how would we ever get events of type KindExtension or KindLocalProcess?
|
|
||
| The handler does not take `monitorMu` — `CaptureSession.Publish` is serialised internally and guarantees monotonic seq delivery. This creates a narrow TOCTOU: a concurrent `StopCapture` can clear the session ID between the `Active()` check and the `Publish()` call, causing the event to be written to the ring buffer with an empty `captureSessionID` (after the `session_ended` marker). This is accepted; grabbing `monitorMu` on every publish would serialize all external events behind CDP monitor start/stop. | ||
|
|
||
| ### 2.3 `GET /events/stream` |

Note
Low Risk
Low risk because this PR only adds a planning document and does not change runtime code or APIs.
Overview
Adds
plans/external-events.md, a detailed plan proposing new endpointsPOST /events/publish(ingest externalevents.Eventinto the active capture session with enforcedsource.kind) andGET /events/stream(SSE streaming withLast-Event-IDresume, keepalives, and multi-client readers).The plan also outlines follow-up behavioral changes like clearing capture-session state on stop via a
session_endedmarker, addingCaptureSession.Active(), and documenting both endpoints inserver/openapi.yaml.Reviewed by Cursor Bugbot for commit 6a06e95. Bugbot is set up for automated code reviews on this repo. Configure here.