diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 3bf84e430..10a72ce4a 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -1,10 +1,11 @@ import { IncomingMessage, ServerResponse } from "node:http"; import { Transport } from "../shared/transport.js"; -import { MessageExtraInfo, RequestInfo, isInitializeRequest, isJSONRPCError, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema, RequestId, SUPPORTED_PROTOCOL_VERSIONS, DEFAULT_NEGOTIATED_PROTOCOL_VERSION } from "../types.js"; +import { MessageExtraInfo, RequestInfo, isInitializeRequest, isJSONRPCError, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema, RequestId, ServerEvents, SUPPORTED_PROTOCOL_VERSIONS, DEFAULT_NEGOTIATED_PROTOCOL_VERSION, EventListener } from "../types.js"; import getRawBody from "raw-body"; import contentType from "content-type"; import { randomUUID } from "node:crypto"; import { AuthInfo } from "./auth/types.js"; +import { EventEmitter } from "node:events"; const MAXIMUM_MESSAGE_SIZE = "4mb"; @@ -32,6 +33,12 @@ export interface EventStore { * Configuration options for StreamableHTTPServerTransport */ export interface StreamableHTTPServerTransportOptions { + /** + * For any pre-existing sessions that we need new transport for + * If provided then the tranport will be considered _initialized + */ + sessionId?: string; + /** * Function that generates a session ID for the transport. * The session ID SHOULD be globally unique and cryptographically secure (e.g., a securely generated UUID, a JWT, or a cryptographic hash) @@ -40,6 +47,13 @@ export interface StreamableHTTPServerTransportOptions { */ sessionIdGenerator: (() => string) | undefined; + /** + * Disabling local SSE means that the transport will not automatically send SSE messages to the local handler. + * You need to implement your own pub/sub mechanism to handle SSE messages by listening to the `responseSse` event + * and emitting them to the `sse` for any events that match the transport sessionId + */ + disableLocalSse?: boolean; + /** * A callback for session initialization events * This is called when the server initializes a new session. @@ -143,6 +157,8 @@ export class StreamableHTTPServerTransport implements Transport { private _allowedHosts?: string[]; private _allowedOrigins?: string[]; private _enableDnsRebindingProtection: boolean; + readonly events = new EventEmitter(); + sessionId?: string; onclose?: () => void; @@ -158,6 +174,14 @@ export class StreamableHTTPServerTransport implements Transport { this._allowedHosts = options.allowedHosts; this._allowedOrigins = options.allowedOrigins; this._enableDnsRebindingProtection = options.enableDnsRebindingProtection ?? false; + if (options.sessionId) { + this.sessionId = options.sessionId; + this._initialized = true; // Assume initialized if session ID is provided + } + if (!options.disableLocalSse) { + // If we are not disabling local SSE, we pipe see responses to the local handler + this.events.on('responseSse', (data) => this.events.emit('sse', data)); + } } /** @@ -297,11 +321,19 @@ export class StreamableHTTPServerTransport implements Transport { // otherwise the client will just wait for the first message res.writeHead(200, headers).flushHeaders(); - // Assign the response to the standalone SSE stream + // Write any message matching the sessionId to the SSE stream + const listener: EventListener = ({ sessionId, message, eventId }) => { + if (sessionId === this.sessionId) { + this.writeSSEEvent(res, message, eventId); + } + }; + this.events.on('sse', listener); + this._streamMapping.set(this._standaloneSseStreamId, res); // Set up close handler for client disconnects res.on("close", () => { this._streamMapping.delete(this._standaloneSseStreamId); + this.events.removeListener('sse', listener); }); } @@ -680,8 +712,11 @@ export class StreamableHTTPServerTransport implements Transport { eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message); } - // Send the message to the standalone SSE stream - this.writeSSEEvent(standaloneSse, message, eventId); + // We emit the responseSse event. If disableLocalSse is set to true this message will not be automatically sent to any local SSE handler + // You can listen for the responseSse and emit to a pub/sub. You can also listen for sse events from pubs/sub and emit them + // to events['sse'] to send them to the local SSE handler. + this.events.emit('responseSse',{sessionId: this.sessionId, message, eventId}); + return; } diff --git a/src/types.ts b/src/types.ts index 323e37389..a0f5d3e21 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1643,3 +1643,11 @@ export type ClientResult = Infer; export type ServerRequest = Infer; export type ServerNotification = Infer; export type ServerResult = Infer; + +/* Server events */ +export type ServerEvents = { + responseSse: [{ sessionId?: string, message: JSONRPCMessage, eventId?: string }]; + sse: [{ sessionId?: string, message: JSONRPCMessage, eventId?: string }]; +} + +export type EventListener = (...args: K) => void; \ No newline at end of file