Skip to content

Commit 728c3f6

Browse files
Wired up notifications
1 parent 29f5bde commit 728c3f6

File tree

2 files changed

+37
-29
lines changed

2 files changed

+37
-29
lines changed

src/handlers/shttp.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,16 @@ export async function handleStreamableHTTP(req: Request, res: Response) {
4242
return;
4343
}
4444

45+
const isGetRequest = req.method === 'GET';
46+
4547
// incorrect session for the authed user, return 401
4648
if (sessionId) {
4749
if (!(await isSessionOwnedBy(sessionId, userId))) {
4850
res.status(401)
4951
return;
5052
}
5153
// Reuse existing transport for owned session
52-
shttpTransport = await getShttpTransport(sessionId, onsessionclosed);
54+
shttpTransport = await getShttpTransport(sessionId, onsessionclosed, isGetRequest);
5355
} else if (isInitializeRequest(req.body)) {
5456
// New initialization request - use JSON response mode
5557
const onsessioninitialized = async (sessionId: string) => {
@@ -69,7 +71,7 @@ export async function handleStreamableHTTP(req: Request, res: Response) {
6971
onsessionclosed,
7072
onsessioninitialized,
7173
});
72-
shttpTransport.onclose = redisRelayToMcpServer(sessionId, shttpTransport);
74+
shttpTransport.onclose = await redisRelayToMcpServer(sessionId, shttpTransport);
7375
} else {
7476
// Invalid request - no session ID and not initialization request
7577
res.status(400)

src/services/redisTransport.ts

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ export async function isSessionOwnedBy(sessionId: string, userId: string): Promi
8181
}
8282

8383

84-
export function redisRelayToMcpServer(sessionId: string, transport: Transport): () => Promise<void> {
84+
export async function redisRelayToMcpServer(sessionId: string, transport: Transport, isGetRequest: boolean = false): Promise<() => Promise<void>> {
8585
let redisCleanup: (() => Promise<void>) | undefined = undefined;
8686
const cleanup = async () => {
8787
// TODO: solve race conditions where we call cleanup while the subscription is being created / before it is created
@@ -90,33 +90,39 @@ export function redisRelayToMcpServer(sessionId: string, transport: Transport):
9090
}
9191
}
9292

93-
const messagePromise = new Promise<JSONRPCMessage>((resolve) => {
94-
transport.onmessage = async (message, extra) => {
95-
// First, set up response subscription if needed
96-
if ("id" in message) {
97-
const toClientChannel = getToClientChannel(sessionId, message.id.toString());
93+
const subscribe = async (requestId: string) => {
94+
const toClientChannel = getToClientChannel(sessionId, requestId);
9895

99-
redisCleanup = await redisClient.createSubscription(toClientChannel, async (redisMessageJson) => {
100-
const redisMessage = JSON.parse(redisMessageJson) as RedisMessage;
101-
if (redisMessage.type === 'mcp') {
102-
await transport.send(redisMessage.message, redisMessage.options);
103-
}
104-
}, (error) => {
105-
transport.onerror?.(error);
106-
});
96+
redisCleanup = await redisClient.createSubscription(toClientChannel, async (redisMessageJson) => {
97+
const redisMessage = JSON.parse(redisMessageJson) as RedisMessage;
98+
if (redisMessage.type === 'mcp') {
99+
await transport.send(redisMessage.message, redisMessage.options);
107100
}
108-
109-
// Now send the message to the MCP server
110-
await sendToMcpServer(sessionId, message, extra);
111-
resolve(message);
112-
}
113-
});
114-
115-
messagePromise.catch((error) => {
116-
transport.onerror?.(error);
117-
cleanup();
118-
});
101+
}, (error) => {
102+
transport.onerror?.(error);
103+
});
104+
}
119105

106+
if (isGetRequest) {
107+
await subscribe(notificationStreamId);
108+
} else {
109+
const messagePromise = new Promise<JSONRPCMessage>((resolve) => {
110+
transport.onmessage = async (message, extra) => {
111+
// First, set up response subscription if needed
112+
if ("id" in message) {
113+
await subscribe(message.id.toString());
114+
}
115+
// Now send the message to the MCP server
116+
await sendToMcpServer(sessionId, message, extra);
117+
resolve(message);
118+
}
119+
});
120+
121+
messagePromise.catch((error) => {
122+
transport.onerror?.(error);
123+
cleanup();
124+
});
125+
}
120126
return cleanup;
121127
}
122128

@@ -202,7 +208,7 @@ export class ServerRedisTransport implements Transport {
202208
}
203209
}
204210

205-
export async function getShttpTransport(sessionId: string, onsessionclosed: (sessionId: string) => void | Promise<void>): Promise<StreamableHTTPServerTransport> {
211+
export async function getShttpTransport(sessionId: string, onsessionclosed: (sessionId: string) => void | Promise<void>, isGetRequest: boolean = false): Promise<StreamableHTTPServerTransport> {
206212
// Giving undefined here and setting the sessionId means the
207213
// transport wont try to create a new session.
208214
const shttpTransport = new StreamableHTTPServerTransport({
@@ -212,7 +218,7 @@ export async function getShttpTransport(sessionId: string, onsessionclosed: (ses
212218
shttpTransport.sessionId = sessionId;
213219

214220
// Use the new request-id based relay approach
215-
const cleanup = redisRelayToMcpServer(sessionId, shttpTransport);
221+
const cleanup = await redisRelayToMcpServer(sessionId, shttpTransport, isGetRequest);
216222
shttpTransport.onclose = cleanup;
217223
return shttpTransport;
218224
}

0 commit comments

Comments
 (0)