Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/add-sse-keepalive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@modelcontextprotocol/server': minor
---

Add optional `keepAliveInterval` to `WebStandardStreamableHTTPServerTransportOptions` that sends periodic SSE comments on the standalone GET stream to prevent reverse proxy idle timeout disconnections.
36 changes: 36 additions & 0 deletions packages/server/src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ export interface WebStandardStreamableHTTPServerTransportOptions {
*/
retryInterval?: number;

/**
* Interval in milliseconds for sending SSE keepalive comments on the standalone
* GET SSE stream. When set, the transport sends periodic SSE comments
* (`: keepalive`) to prevent reverse proxies from closing idle connections.
*
* Disabled by default (no keepalive comments are sent).
*/
keepAliveInterval?: number;

/**
* List of protocol versions that this transport will accept.
* Used to validate the `mcp-protocol-version` header in incoming requests.
Expand Down Expand Up @@ -238,6 +247,8 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
private _allowedOrigins?: string[];
private _enableDnsRebindingProtection: boolean;
private _retryInterval?: number;
private _keepAliveInterval?: number;
private _keepAliveTimer?: ReturnType<typeof setInterval>;
private _supportedProtocolVersions: string[];

sessionId?: string;
Expand All @@ -255,6 +266,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
this._allowedOrigins = options.allowedOrigins;
this._enableDnsRebindingProtection = options.enableDnsRebindingProtection ?? false;
this._retryInterval = options.retryInterval;
this._keepAliveInterval = options.keepAliveInterval;
this._supportedProtocolVersions = options.supportedProtocolVersions ?? SUPPORTED_PROTOCOL_VERSIONS;
}

Expand Down Expand Up @@ -443,6 +455,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
},
cancel: () => {
// Stream was cancelled by client
this._clearKeepAliveTimer();
this._streamMapping.delete(this._standaloneSseStreamId);
}
});
Expand Down Expand Up @@ -472,6 +485,19 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
}
});

// Start keepalive timer to send periodic SSE comments that prevent
// reverse proxies from closing the connection due to idle timeouts
if (this._keepAliveInterval !== undefined) {
this._keepAliveTimer = setInterval(() => {
try {
streamController!.enqueue(encoder.encode(': keepalive\n\n'));
} catch {
// Controller is closed or errored, stop sending keepalives
this._clearKeepAliveTimer();
}
}, this._keepAliveInterval);
}

return new Response(readable, { headers });
}

Expand Down Expand Up @@ -886,7 +912,16 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
return undefined;
}

private _clearKeepAliveTimer(): void {
if (this._keepAliveTimer !== undefined) {
clearInterval(this._keepAliveTimer);
this._keepAliveTimer = undefined;
}
}

async close(): Promise<void> {
this._clearKeepAliveTimer();

// Close all SSE connections
for (const { cleanup } of this._streamMapping.values()) {
cleanup();
Expand Down Expand Up @@ -918,6 +953,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
* Use this to implement polling behavior for server-initiated notifications.
*/
closeStandaloneSSEStream(): void {
this._clearKeepAliveTimer();
const stream = this._streamMapping.get(this._standaloneSseStreamId);
if (stream) {
stream.cleanup();
Expand Down
89 changes: 89 additions & 0 deletions packages/server/test/server/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -765,4 +765,93 @@ describe('Zod v4', () => {
await expect(transport.start()).rejects.toThrow('Transport already started');
});
});

describe('HTTPServerTransport - keepAliveInterval', () => {
let transport: WebStandardStreamableHTTPServerTransport;
let mcpServer: McpServer;

beforeEach(() => {
vi.useFakeTimers();
});

afterEach(async () => {
vi.useRealTimers();
await transport.close();
});

async function setupTransport(keepAliveInterval?: number): Promise<string> {
mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' });

transport = new WebStandardStreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
keepAliveInterval
});

await mcpServer.connect(transport);

const initReq = createRequest('POST', TEST_MESSAGES.initialize);
const initRes = await transport.handleRequest(initReq);
return initRes.headers.get('mcp-session-id') as string;
}

it('should send SSE keepalive comments periodically when keepAliveInterval is set', async () => {
const sessionId = await setupTransport(50);

const getReq = createRequest('GET', undefined, { sessionId });
const getRes = await transport.handleRequest(getReq);

expect(getRes.status).toBe(200);
expect(getRes.body).not.toBeNull();

const reader = getRes.body!.getReader();

// Advance past two intervals to accumulate keepalive comments
vi.advanceTimersByTime(120);

const { value } = await reader.read();
const text = new TextDecoder().decode(value);
expect(text).toContain(': keepalive');
});

it('should not send SSE comments when keepAliveInterval is not set', async () => {
const sessionId = await setupTransport(undefined);

const getReq = createRequest('GET', undefined, { sessionId });
const getRes = await transport.handleRequest(getReq);

expect(getRes.status).toBe(200);
expect(getRes.body).not.toBeNull();

const reader = getRes.body!.getReader();

// Advance time; no keepalive should be enqueued
vi.advanceTimersByTime(200);

// Close the transport to end the stream, then read whatever was buffered
await transport.close();

const chunks: string[] = [];
for (let result = await reader.read(); !result.done; result = await reader.read()) {
chunks.push(new TextDecoder().decode(result.value));
}

const allText = chunks.join('');
expect(allText).not.toContain(': keepalive');
});

it('should clear the keepalive interval when the transport is closed', async () => {
const sessionId = await setupTransport(50);

const getReq = createRequest('GET', undefined, { sessionId });
const getRes = await transport.handleRequest(getReq);

expect(getRes.status).toBe(200);

// Close the transport, which should clear the interval
await transport.close();

// Advancing timers after close should not throw
vi.advanceTimersByTime(200);
});
});
});
Loading