@@ -3,7 +3,6 @@ import { redisClient } from "../redis.js";
3
3
import { Transport , TransportSendOptions } from "@modelcontextprotocol/sdk/shared/transport.js" ;
4
4
import { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js" ;
5
5
import { JSONRPCMessage , MessageExtraInfo } from "@modelcontextprotocol/sdk/types.js" ;
6
- import { Server } from "@modelcontextprotocol/sdk/server/index.js" ;
7
6
8
7
let redisTransportCounter = 0 ;
9
8
const notificationStreamId = "__GET_stream" ;
@@ -60,6 +59,27 @@ export async function isLive(sessionId: string): Promise<boolean> {
60
59
return numSubs > 0 ;
61
60
}
62
61
62
+ export async function setSessionOwner ( sessionId : string , userId : string ) : Promise < void > {
63
+ await redisClient . set ( `session:${ sessionId } :owner` , userId ) ;
64
+ }
65
+
66
+ export async function getSessionOwner ( sessionId : string ) : Promise < string | null > {
67
+ return await redisClient . get ( `session:${ sessionId } :owner` ) ;
68
+ }
69
+
70
+ export async function validateSessionOwnership ( sessionId : string , userId : string ) : Promise < boolean > {
71
+ const owner = await getSessionOwner ( sessionId ) ;
72
+ return owner === userId ;
73
+ }
74
+
75
+ export async function isSessionOwnedBy ( sessionId : string , userId : string ) : Promise < boolean > {
76
+ const isLiveSession = await isLive ( sessionId ) ;
77
+ if ( ! isLiveSession ) {
78
+ return false ;
79
+ }
80
+ return await validateSessionOwnership ( sessionId , userId ) ;
81
+ }
82
+
63
83
64
84
export function redisRelayToMcpServer ( sessionId : string , transport : Transport ) : ( ) => Promise < void > {
65
85
let redisCleanup : ( ( ) => Promise < void > ) | undefined = undefined ;
@@ -182,45 +202,16 @@ export class ServerRedisTransport implements Transport {
182
202
}
183
203
}
184
204
185
- export async function startServerListeningToRedis ( serverWrapper : { server : Server ; cleanup : ( ) => void } , sessionId : string ) : Promise < ServerRedisTransport > {
186
- const serverRedisTransport = new ServerRedisTransport ( sessionId ) ;
187
-
188
- // Add cleanup callback to the transport
189
- const originalClose = serverRedisTransport . close . bind ( serverRedisTransport ) ;
190
- serverRedisTransport . close = async ( ) => {
191
- serverWrapper . cleanup ( ) ;
192
- await originalClose ( ) ;
193
- } ;
194
-
195
- // The server.connect() will call start() on the transport
196
- await serverWrapper . server . connect ( serverRedisTransport )
197
-
198
- return serverRedisTransport ;
199
- }
200
-
201
- export async function getFirstShttpTransport ( sessionId : string ) : Promise < { shttpTransport : StreamableHTTPServerTransport , cleanup : ( ) => Promise < void > } > {
202
- const shttpTransport = new StreamableHTTPServerTransport ( {
203
- sessionIdGenerator : ( ) => sessionId ,
204
- enableJsonResponse : true , // Enable JSON response mode
205
- } ) ;
206
-
207
- // Use the new request-id based relay approach
208
- const cleanup = redisRelayToMcpServer ( sessionId , shttpTransport ) ;
209
-
210
- return { shttpTransport, cleanup } ;
211
- }
212
-
213
- export async function getShttpTransport ( sessionId : string ) : Promise < { shttpTransport : StreamableHTTPServerTransport , cleanup : ( ) => Promise < void > } > {
205
+ export async function getShttpTransport ( sessionId : string ) : Promise < StreamableHTTPServerTransport > {
214
206
// Giving undefined here and setting the sessionId means the
215
207
// transport wont try to create a new session.
216
208
const shttpTransport = new StreamableHTTPServerTransport ( {
217
209
sessionIdGenerator : undefined ,
218
- enableJsonResponse : true , // Use JSON response mode for all requests
219
210
} )
220
211
shttpTransport . sessionId = sessionId ;
221
212
222
213
// Use the new request-id based relay approach
223
214
const cleanup = redisRelayToMcpServer ( sessionId , shttpTransport ) ;
224
-
225
- return { shttpTransport, cleanup } ;
215
+ shttpTransport . onclose = cleanup ;
216
+ return shttpTransport ;
226
217
}
0 commit comments