Skip to content

Enable distributed mode #793

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { IncomingMessage, ServerResponse } from "node:http";
import { Transport } from "../shared/transport.js";
import { MessageExtraInfo, RequestInfo, isInitializeRequest, isJSONRPCError, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema, RequestId, SUPPORTED_PROTOCOL_VERSIONS, DEFAULT_NEGOTIATED_PROTOCOL_VERSION } from "../types.js";
import { MessageExtraInfo, RequestInfo, isInitializeRequest, isJSONRPCError, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema, RequestId, ServerEvents, SUPPORTED_PROTOCOL_VERSIONS, DEFAULT_NEGOTIATED_PROTOCOL_VERSION, EventListener } from "../types.js";
import getRawBody from "raw-body";
import contentType from "content-type";
import { randomUUID } from "node:crypto";
import { AuthInfo } from "./auth/types.js";
import { EventEmitter } from "node:events";

const MAXIMUM_MESSAGE_SIZE = "4mb";

Expand Down Expand Up @@ -32,6 +33,12 @@ export interface EventStore {
* Configuration options for StreamableHTTPServerTransport
*/
export interface StreamableHTTPServerTransportOptions {
/**
* For any pre-existing sessions that we need new transport for
* If provided then the tranport will be considered _initialized
*/
sessionId?: string;

/**
* Function that generates a session ID for the transport.
* The session ID SHOULD be globally unique and cryptographically secure (e.g., a securely generated UUID, a JWT, or a cryptographic hash)
Expand All @@ -40,6 +47,13 @@ export interface StreamableHTTPServerTransportOptions {
*/
sessionIdGenerator: (() => string) | undefined;

/**
* Disabling local SSE means that the transport will not automatically send SSE messages to the local handler.
* You need to implement your own pub/sub mechanism to handle SSE messages by listening to the `responseSse` event
* and emitting them to the `sse` for any events that match the transport sessionId
*/
disableLocalSse?: boolean;

/**
* A callback for session initialization events
* This is called when the server initializes a new session.
Expand Down Expand Up @@ -143,6 +157,8 @@ export class StreamableHTTPServerTransport implements Transport {
private _allowedHosts?: string[];
private _allowedOrigins?: string[];
private _enableDnsRebindingProtection: boolean;
readonly events = new EventEmitter<ServerEvents>();


sessionId?: string;
onclose?: () => void;
Expand All @@ -158,6 +174,14 @@ export class StreamableHTTPServerTransport implements Transport {
this._allowedHosts = options.allowedHosts;
this._allowedOrigins = options.allowedOrigins;
this._enableDnsRebindingProtection = options.enableDnsRebindingProtection ?? false;
if (options.sessionId) {
this.sessionId = options.sessionId;
this._initialized = true; // Assume initialized if session ID is provided
}
if (!options.disableLocalSse) {
// If we are not disabling local SSE, we pipe see responses to the local handler
this.events.on('responseSse', (data) => this.events.emit('sse', data));
}
}

/**
Expand Down Expand Up @@ -297,11 +321,19 @@ export class StreamableHTTPServerTransport implements Transport {
// otherwise the client will just wait for the first message
res.writeHead(200, headers).flushHeaders();

// Assign the response to the standalone SSE stream
// Write any message matching the sessionId to the SSE stream
const listener: EventListener<ServerEvents['sse']> = ({ sessionId, message, eventId }) => {
if (sessionId === this.sessionId) {
this.writeSSEEvent(res, message, eventId);
}
};
this.events.on('sse', listener);

this._streamMapping.set(this._standaloneSseStreamId, res);
// Set up close handler for client disconnects
res.on("close", () => {
this._streamMapping.delete(this._standaloneSseStreamId);
this.events.removeListener('sse', listener);
});
}

Expand Down Expand Up @@ -680,8 +712,11 @@ export class StreamableHTTPServerTransport implements Transport {
eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message);
}

// Send the message to the standalone SSE stream
this.writeSSEEvent(standaloneSse, message, eventId);
// We emit the responseSse event. If disableLocalSse is set to true this message will not be automatically sent to any local SSE handler
// You can listen for the responseSse and emit to a pub/sub. You can also listen for sse events from pubs/sub and emit them
// to events['sse'] to send them to the local SSE handler.
this.events.emit('responseSse',{sessionId: this.sessionId, message, eventId});

return;
}

Expand Down
8 changes: 8 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1643,3 +1643,11 @@ export type ClientResult = Infer<typeof ClientResultSchema>;
export type ServerRequest = Infer<typeof ServerRequestSchema>;
export type ServerNotification = Infer<typeof ServerNotificationSchema>;
export type ServerResult = Infer<typeof ServerResultSchema>;

/* Server events */
export type ServerEvents = {
responseSse: [{ sessionId?: string, message: JSONRPCMessage, eventId?: string }];
sse: [{ sessionId?: string, message: JSONRPCMessage, eventId?: string }];
}

export type EventListener<K extends any[]> = (...args: K) => void;