Skip to content

feat: add Server-Sent Events (SSE) streaming support#150

Open
SebastienMelki wants to merge 13 commits intomainfrom
feat/sse-streaming
Open

feat: add Server-Sent Events (SSE) streaming support#150
SebastienMelki wants to merge 13 commits intomainfrom
feat/sse-streaming

Conversation

@SebastienMelki
Copy link
Copy Markdown
Owner

Summary

  • Adds stream: true flag to HttpConfig proto annotation, enabling any RPC to be marked as an SSE streaming endpoint
  • Implements SSE code generation across all 5 protoc generators:
    • Go HTTP server: SSESender interface + SSEHandler[Req] generic handler with text/event-stream, http.Flusher, full request binding
    • Go HTTP client: EventStream[T] iterator with Next(T) bool / Err() / Close() (bufio.Scanner pattern)
    • TS client: async *method() returning AsyncGenerator<T> with ReadableStream SSE parsing
    • TS server: Handler returns ReadableStream<T>, route wraps into SSE-formatted Response
    • OpenAPI: text/event-stream content type with x-sse-event-schema vendor extension
  • Golden tests added for all 5 generators covering SSE + non-SSE methods in the same service

Motivated by the Alpaca Go SDK use case which requires SSE for real-time market data streaming.

Test plan

  • All 8 test packages pass (./scripts/run_tests.sh --fast)
  • make lint-fix reports 0 issues
  • Golden files verified: SSE methods produce correct streaming code, non-SSE methods unchanged
  • Test proto covers: standard unary RPC, SSE streaming, SSE with path params, SSE with query params
  • Manual: generate from a proto with stream: true and verify the Go server SSE handler works end-to-end

🤖 Generated with Claude Code

SebastienMelki and others added 3 commits April 16, 2026 10:46
…erators

- Add stream field to HttpConfig proto annotation (field 3)
- Add Stream bool to shared HTTPConfig struct, populated from proto
- Go HTTP server: SSESender interface, sseSender impl, SSEHandler generic function
- Go HTTP server: SSE methods use (ctx, *Req, SSESender) error signature
- Go client: EventStream[T] type with Next/Err/Close iterator pattern
- Go client: SSE methods return *EventStream instead of *Response
- TS client: async generator methods with ReadableStream SSE parsing
- TS server: SSE handler returns ReadableStream wrapped as text/event-stream
- OpenAPI: SSE operations use text/event-stream with x-sse-event-schema extension
- Golden tests for all 5 generators with SSE + unary in same service
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 16, 2026

🔍 CI Pipeline Status

Lint: success
Test: success
Coverage: success
Build: success
Integration: success


📊 Coverage Report: Available in checks above
🔗 Artifacts: Test results and coverage reports uploaded

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 16, 2026

Codecov Report

❌ Patch coverage is 2.02952% with 531 lines in your changes missing coverage. Please review.
✅ Project coverage is 2.86%. Comparing base (6150ac5) to head (6670936).

Files with missing lines Patch % Lines
internal/httpgen/generator.go 0.00% 174 Missing ⚠️
internal/clientgen/generator.go 0.00% 157 Missing ⚠️
internal/tsservergen/generator.go 0.00% 79 Missing ⚠️
internal/tsclientgen/generator.go 0.00% 78 Missing ⚠️
http/annotations.pb.go 20.75% 41 Missing and 1 partial ⚠️
internal/annotations/http_config.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff            @@
##            main    #150      +/-   ##
========================================
- Coverage   3.03%   2.86%   -0.18%     
========================================
  Files         47      47              
  Lines       7802    8273     +471     
========================================
  Hits         237     237              
- Misses      7561    8032     +471     
  Partials       4       4              
Flag Coverage Δ
unittests 2.86% <2.02%> (-0.18%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

SebastienMelki and others added 2 commits April 16, 2026 12:01
Adds examples/sse-streaming/ with a complete market data service
demonstrating SSE alongside standard unary RPCs. Includes `make demo`
for end-to-end testing. Also bumps Go version in existing example go.mods.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@SebastienMelki
Copy link
Copy Markdown
Owner Author

Review feedback from local diff against origin/main:

  1. internal/clientgen/generator.go:905-930 builds EventStream on top of bufio.Scanner. That gives every generated Go SSE client a 64 KiB token limit, so a single larger JSON event will terminate the stream with bufio.Scanner: token too long. Since the server emits full JSON messages into one data: line, this is a real payload-size cap rather than a theoretical edge case. I think this needs either a larger configured buffer or a reader-based parser instead of Scanner.

  2. internal/httpgen/generator.go:1775-1779 always turns a handler error into an SSE event: error frame. At that point we have only set headers; we have not necessarily written any body yet, so a handler that fails before sending its first event now comes back as HTTP 200 instead of going through errorHandler / normal status handling. That seems like a behavior regression for early failures, especially for validation or domain errors that should still be regular HTTP error responses.

SebastienMelki and others added 2 commits April 16, 2026 12:12
…ling

1. Replace bufio.Scanner with bufio.Reader in Go client EventStream
   to remove the 64 KiB token size limit on SSE events.

2. Track whether sseSender has written data. If handler errors before
   any Send(), use the normal errorHandler path (proper HTTP status)
   instead of sending an SSE error event on an HTTP 200.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove proto-compatibility matrix job — arduino/setup-protoc can no
  longer resolve old protoc versions (3.19.0, 3.20.0, 25.1). Proto
  validation is already covered by buf-lint and buf-breaking jobs.
- Add codecov.yml with informational status checks — generator code is
  covered by golden tests (external binary), not Go unit test coverage.
- Fix SSE example Makefile: redirect server output, poll for readiness.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@SebastienMelki
Copy link
Copy Markdown
Owner Author

Follow-up review after 204e54e:

internal/httpgen/generator.go:1642-1644,1690-1693,1779-1788 still has one edge case in the new early-error logic. SSESender publicly exposes Flush(), and calling Flush() can commit the 200 text/event-stream response before any Send() happens. But the handler later decides between writeErrorWithHandler(...) and an SSE event: error frame using only sender.sent, which is only flipped by Send/SendWithEvent. So a handler that does sender.Flush() first (for example to force headers/heartbeat) and then returns an error will still go down the normal HTTP error path after the response has already been committed. I think the committed-state check needs to include Flush() as well, or more generally track whether the stream has been started rather than whether an event payload was sent.

SebastienMelki and others added 5 commits April 16, 2026 13:25
sseSender.Flush() also commits the HTTP 200 response, so the
committed flag must be set there too. A handler calling Flush()
for a heartbeat before erroring now correctly gets an SSE error
event instead of a broken writeErrorWithHandler after headers
are already sent.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add SSE streaming annotation section showing stream: true usage
- Add generated output examples for all 5 generators (Go server SSESender,
  Go client EventStream, TS client AsyncGenerator, TS server ReadableStream,
  OpenAPI text/event-stream)
- Update annotation registry row for ext 50003 to mention SSE streaming flag
…mple

Add client demos for both languages alongside the existing Go server:
- go-client/main.go: Go client using generated EventStream API
- ts-client/main.ts: TypeScript client using generated AsyncGenerator API
- docs/MarketDataService.openapi.yaml: generated OpenAPI spec with SSE endpoints
- Updated buf.gen.yaml with protoc-gen-ts-client and protoc-gen-openapiv3
- Updated Makefile with demo-go-client and demo-ts-client targets

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…E connections

Breaking out of an async generator releases the reader lock but doesn't
close the underlying TCP connection, keeping Node.js alive indefinitely.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@SebastienMelki
Copy link
Copy Markdown
Owner Author

Another full-pass review found two TypeScript streaming issues:

  1. internal/tsclientgen/generator.go:391-410 only calls reader.releaseLock() in the async-generator finally block. If the consumer breaks out of for await early, the generated client does not cancel the reader or response body, so the SSE HTTP connection stays open until the server ends it. That is why the demo had to add process.exit(0), but real consumers do not have that escape hatch. The generated iterator needs an actual cleanup path (reader.cancel(), aborting the fetch, or equivalent) when iteration stops early.

  2. internal/tsservergen/generator.go:560-578 wraps the handler ReadableStream in another ReadableStream, but it never implements cancel() or otherwise propagates downstream cancellation back to stream.getReader(). When the HTTP client disconnects from a long-lived SSE response, the outer stream can be canceled while the inner handler stream keeps running forever. The wrapper should cancel/release the upstream reader on downstream cancellation so infinite handler streams do not leak work after disconnects.

go run spawns a child process — killing its PID leaves the actual
server orphaned on port 8080. Now we go build first, run the binary
directly, and use a trap to guarantee cleanup even on failure.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@SebastienMelki
Copy link
Copy Markdown
Owner Author

Good job overall on the SSE support work — the Go-side fixes and the follow-up demo/cleanup changes are moving in the right direction. I still see two TypeScript streaming issues that look unresolved:

  1. internal/tsclientgen/generator.go:391-410 only calls reader.releaseLock() in the async-generator finally block. If a consumer breaks out of for await early, the generated client does not cancel the reader or abort the fetch, so the SSE HTTP connection stays open until the server ends it.

  2. internal/tsservergen/generator.go:560-578 wraps the handler ReadableStream in another ReadableStream, but it still does not propagate downstream cancellation back to stream.getReader(). If the HTTP client disconnects from a long-lived SSE response, the handler stream can keep running indefinitely.

Once those two TS cleanup/cancellation paths are handled, I think this is in much better shape.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant