diff --git a/.changeset/add-sse-keepalive.md b/.changeset/add-sse-keepalive.md new file mode 100644 index 000000000..78654d9d9 --- /dev/null +++ b/.changeset/add-sse-keepalive.md @@ -0,0 +1,5 @@ +--- +'@modelcontextprotocol/server': minor +--- + +Add optional `keepAliveInterval` to `WebStandardStreamableHTTPServerTransportOptions` that sends periodic SSE comments on the standalone GET stream to prevent reverse proxy idle timeout disconnections. diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index 74e689892..98dd7bcff 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -141,6 +141,15 @@ export interface WebStandardStreamableHTTPServerTransportOptions { */ retryInterval?: number; + /** + * Interval in milliseconds for sending SSE keepalive comments on the standalone + * GET SSE stream. When set, the transport sends periodic SSE comments + * (`: keepalive`) to prevent reverse proxies from closing idle connections. + * + * Disabled by default (no keepalive comments are sent). + */ + keepAliveInterval?: number; + /** * List of protocol versions that this transport will accept. * Used to validate the `mcp-protocol-version` header in incoming requests. @@ -238,6 +247,8 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { private _allowedOrigins?: string[]; private _enableDnsRebindingProtection: boolean; private _retryInterval?: number; + private _keepAliveInterval?: number; + private _keepAliveTimer?: ReturnType; private _supportedProtocolVersions: string[]; sessionId?: string; @@ -255,6 +266,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { this._allowedOrigins = options.allowedOrigins; this._enableDnsRebindingProtection = options.enableDnsRebindingProtection ?? false; this._retryInterval = options.retryInterval; + this._keepAliveInterval = options.keepAliveInterval; this._supportedProtocolVersions = options.supportedProtocolVersions ?? SUPPORTED_PROTOCOL_VERSIONS; } @@ -443,6 +455,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { }, cancel: () => { // Stream was cancelled by client + this._clearKeepAliveTimer(); this._streamMapping.delete(this._standaloneSseStreamId); } }); @@ -472,6 +485,19 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { } }); + // Start keepalive timer to send periodic SSE comments that prevent + // reverse proxies from closing the connection due to idle timeouts + if (this._keepAliveInterval !== undefined) { + this._keepAliveTimer = setInterval(() => { + try { + streamController!.enqueue(encoder.encode(': keepalive\n\n')); + } catch { + // Controller is closed or errored, stop sending keepalives + this._clearKeepAliveTimer(); + } + }, this._keepAliveInterval); + } + return new Response(readable, { headers }); } @@ -886,7 +912,16 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { return undefined; } + private _clearKeepAliveTimer(): void { + if (this._keepAliveTimer !== undefined) { + clearInterval(this._keepAliveTimer); + this._keepAliveTimer = undefined; + } + } + async close(): Promise { + this._clearKeepAliveTimer(); + // Close all SSE connections for (const { cleanup } of this._streamMapping.values()) { cleanup(); @@ -918,6 +953,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { * Use this to implement polling behavior for server-initiated notifications. */ closeStandaloneSSEStream(): void { + this._clearKeepAliveTimer(); const stream = this._streamMapping.get(this._standaloneSseStreamId); if (stream) { stream.cleanup(); diff --git a/packages/server/test/server/streamableHttp.test.ts b/packages/server/test/server/streamableHttp.test.ts index ab6f22342..fdf216bcd 100644 --- a/packages/server/test/server/streamableHttp.test.ts +++ b/packages/server/test/server/streamableHttp.test.ts @@ -765,4 +765,93 @@ describe('Zod v4', () => { await expect(transport.start()).rejects.toThrow('Transport already started'); }); }); + + describe('HTTPServerTransport - keepAliveInterval', () => { + let transport: WebStandardStreamableHTTPServerTransport; + let mcpServer: McpServer; + + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(async () => { + vi.useRealTimers(); + await transport.close(); + }); + + async function setupTransport(keepAliveInterval?: number): Promise { + mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }); + + transport = new WebStandardStreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + keepAliveInterval + }); + + await mcpServer.connect(transport); + + const initReq = createRequest('POST', TEST_MESSAGES.initialize); + const initRes = await transport.handleRequest(initReq); + return initRes.headers.get('mcp-session-id') as string; + } + + it('should send SSE keepalive comments periodically when keepAliveInterval is set', async () => { + const sessionId = await setupTransport(50); + + const getReq = createRequest('GET', undefined, { sessionId }); + const getRes = await transport.handleRequest(getReq); + + expect(getRes.status).toBe(200); + expect(getRes.body).not.toBeNull(); + + const reader = getRes.body!.getReader(); + + // Advance past two intervals to accumulate keepalive comments + vi.advanceTimersByTime(120); + + const { value } = await reader.read(); + const text = new TextDecoder().decode(value); + expect(text).toContain(': keepalive'); + }); + + it('should not send SSE comments when keepAliveInterval is not set', async () => { + const sessionId = await setupTransport(undefined); + + const getReq = createRequest('GET', undefined, { sessionId }); + const getRes = await transport.handleRequest(getReq); + + expect(getRes.status).toBe(200); + expect(getRes.body).not.toBeNull(); + + const reader = getRes.body!.getReader(); + + // Advance time; no keepalive should be enqueued + vi.advanceTimersByTime(200); + + // Close the transport to end the stream, then read whatever was buffered + await transport.close(); + + const chunks: string[] = []; + for (let result = await reader.read(); !result.done; result = await reader.read()) { + chunks.push(new TextDecoder().decode(result.value)); + } + + const allText = chunks.join(''); + expect(allText).not.toContain(': keepalive'); + }); + + it('should clear the keepalive interval when the transport is closed', async () => { + const sessionId = await setupTransport(50); + + const getReq = createRequest('GET', undefined, { sessionId }); + const getRes = await transport.handleRequest(getReq); + + expect(getRes.status).toBe(200); + + // Close the transport, which should clear the interval + await transport.close(); + + // Advancing timers after close should not throw + vi.advanceTimersByTime(200); + }); + }); });