1
1
import { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js" ;
2
2
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js" ;
3
3
import { Request , Response } from "express" ;
4
- import { getFirstShttpTransport , getShttpTransport , isLive , shutdownSession , startServerListeningToRedis } from "../services/redisTransport.js" ;
4
+ import { getShttpTransport , isSessionOwnedBy , redisRelayToMcpServer , ServerRedisTransport , setSessionOwner , shutdownSession } from "../services/redisTransport.js" ;
5
5
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js" ;
6
6
import { randomUUID } from "crypto" ;
7
7
import { createMcpServer } from "../services/mcp.js" ;
@@ -16,101 +16,74 @@ declare module "express-serve-static-core" {
16
16
}
17
17
}
18
18
19
+ function getUserIdFromAuth ( auth ?: AuthInfo ) : string | null {
20
+ return auth ?. extra ?. userId as string || null ;
21
+ }
22
+
19
23
export async function handleStreamableHTTP ( req : Request , res : Response ) {
20
24
let shttpTransport : StreamableHTTPServerTransport | undefined = undefined ;
21
- let cleanup : ( ( ) => Promise < void > ) | undefined = undefined ;
22
- try {
23
- // Handle DELETE requests for session termination
24
- if ( req . method === 'DELETE' ) {
25
- const sessionId = req . headers [ 'mcp-session-id' ] as string | undefined ;
26
-
27
- if ( ! sessionId ) {
28
- res . status ( 400 ) . json ( {
29
- jsonrpc : '2.0' ,
30
- error : {
31
- code : - 32000 ,
32
- message : 'Bad Request: Session ID required for DELETE request' ,
33
- } ,
34
- id : null ,
35
- } ) ;
36
- return ;
37
- }
38
25
39
- // Check if session exists
40
- if ( ! ( await isLive ( sessionId ) ) ) {
41
- res . status ( 404 ) . json ( {
42
- jsonrpc : '2.0' ,
43
- error : {
44
- code : - 32001 ,
45
- message : 'Session not found' ,
46
- } ,
47
- id : null ,
48
- } ) ;
49
- return ;
50
- }
26
+ res . on ( 'finish' , async ( ) => {
27
+ await shttpTransport ?. close ( ) ;
28
+ } ) ;
51
29
52
- // Send shutdown control message to terminate the session
53
- await shutdownSession ( sessionId ) ;
30
+ try {
31
+ // Check for existing session ID
32
+ const sessionId = req . headers [ 'mcp-session-id' ] as string | undefined ;
33
+ const userId = getUserIdFromAuth ( req . auth ) ;
54
34
55
- // Respond with success
56
- res . status ( 200 ) . json ( {
57
- jsonrpc : '2.0' ,
58
- result : { status : 'Session terminated successfully' } ,
59
- id : null ,
60
- } ) ;
35
+ // if no userid, return 401, we shouldn't get here ideally
36
+ if ( ! userId ) {
37
+ res . status ( 401 )
61
38
return ;
62
39
}
63
40
64
- // Check for existing session ID
65
- const sessionId = req . headers [ 'mcp-session-id' ] as string | undefined ;
66
-
67
- if ( sessionId && await isLive ( sessionId ) ) {
68
- // Reuse existing transport
69
- ( { shttpTransport, cleanup } = await getShttpTransport ( sessionId ) ) ;
70
- } else if ( ! sessionId && isInitializeRequest ( req . body ) ) {
41
+ // incorrect session for the authed user, return 401
42
+ if ( sessionId ) {
43
+ if ( ! ( await isSessionOwnedBy ( sessionId , userId ) ) ) {
44
+ res . status ( 401 )
45
+ return ;
46
+ }
47
+ // Reuse existing transport for owned session
48
+ shttpTransport = await getShttpTransport ( sessionId ) ;
49
+ } else if ( isInitializeRequest ( req . body ) ) {
71
50
// New initialization request - use JSON response mode
72
- const sessionId = randomUUID ( ) ;
73
-
74
- const server = createMcpServer ( ) ;
75
-
76
- await startServerListeningToRedis ( server , sessionId ) ;
51
+ const onsessioninitialized = async ( sessionId : string ) => {
52
+ const { server, cleanup : mcpCleanup } = createMcpServer ( ) ;
53
+
54
+ const serverRedisTransport = new ServerRedisTransport ( sessionId ) ;
55
+ serverRedisTransport . onclose = mcpCleanup ;
56
+ await server . connect ( serverRedisTransport )
77
57
78
- ( { shttpTransport, cleanup } = await getFirstShttpTransport ( sessionId ) ) ;
79
- } else {
80
- // Invalid request - no session ID or not initialization request
81
- res . status ( 400 ) . json ( {
82
- jsonrpc : '2.0' ,
83
- error : {
84
- code : - 32000 ,
85
- message : 'Bad Request: No valid session ID provided' ,
86
- } ,
87
- id : null ,
58
+ // Set session ownership
59
+ await setSessionOwner ( sessionId , userId ) ;
60
+ }
61
+
62
+ const onsessionclosed = async ( sessionId : string ) => {
63
+ console . log ( `Session ${ sessionId } closing` ) ;
64
+ void shutdownSession ( sessionId ) ;
65
+ console . log ( `Session ${ sessionId } closed` ) ;
66
+ }
67
+
68
+ const sessionId = randomUUID ( ) ;
69
+ shttpTransport = new StreamableHTTPServerTransport ( {
70
+ sessionIdGenerator : ( ) => sessionId ,
71
+ onsessionclosed, // dev build only
72
+ onsessioninitialized,
88
73
} ) ;
74
+ shttpTransport . onclose = redisRelayToMcpServer ( sessionId , shttpTransport ) ;
75
+ } else {
76
+ // Invalid request - no session ID and not initialization request
77
+ res . status ( 400 )
89
78
return ;
90
79
}
91
-
92
80
// Handle the request with existing transport - no need to reconnect
93
81
await shttpTransport . handleRequest ( req , res , req . body ) ;
94
82
} catch ( error ) {
95
83
console . error ( 'Error handling MCP request:' , error ) ;
96
84
97
85
if ( ! res . headersSent ) {
98
- res . status ( 500 ) . json ( {
99
- jsonrpc : '2.0' ,
100
- error : {
101
- code : - 32603 ,
102
- message : 'Internal server error' ,
103
- } ,
104
- id : null ,
105
- } ) ;
86
+ res . status ( 500 )
106
87
}
107
- } finally {
108
- // Set up cleanup when response is complete
109
- res . on ( 'finish' , async ( ) => {
110
- await shttpTransport ?. close ( ) ;
111
- if ( cleanup ) {
112
- await cleanup ( ) ;
113
- }
114
- } ) ;
115
88
}
116
89
}
0 commit comments