@@ -24,6 +24,14 @@ type RedisMessage =
24
24
25
25
function sendToMcpServer ( sessionId : string , message : JSONRPCMessage , extra ?: { authInfo ?: AuthInfo ; } , options ?: TransportSendOptions ) : Promise < void > {
26
26
const toServerChannel = getToServerChannel ( sessionId ) ;
27
+
28
+ logger . debug ( 'Sending message to MCP server via Redis' , {
29
+ sessionId,
30
+ channel : toServerChannel ,
31
+ method : ( message as any ) . method ,
32
+ id : ( message as any ) . id
33
+ } ) ;
34
+
27
35
const redisMessage : RedisMessage = { type : 'mcp' , message, extra, options } ;
28
36
return redisClient . publish ( toServerChannel , JSON . stringify ( redisMessage ) ) ;
29
37
}
@@ -51,6 +59,7 @@ function sendControlMessage(sessionId: string, action: 'SHUTDOWN' | 'PING' | 'ST
51
59
}
52
60
53
61
export async function shutdownSession ( sessionId : string ) : Promise < void > {
62
+ logger . info ( 'Sending shutdown control message' , { sessionId } ) ;
54
63
return sendControlMessage ( sessionId , 'SHUTDOWN' ) ;
55
64
}
56
65
@@ -61,6 +70,7 @@ export async function isLive(sessionId: string): Promise<boolean> {
61
70
}
62
71
63
72
export async function setSessionOwner ( sessionId : string , userId : string ) : Promise < void > {
73
+ logger . debug ( 'Setting session owner' , { sessionId, userId } ) ;
64
74
await redisClient . set ( `session:${ sessionId } :owner` , userId ) ;
65
75
}
66
76
@@ -76,30 +86,54 @@ export async function validateSessionOwnership(sessionId: string, userId: string
76
86
export async function isSessionOwnedBy ( sessionId : string , userId : string ) : Promise < boolean > {
77
87
const isLiveSession = await isLive ( sessionId ) ;
78
88
if ( ! isLiveSession ) {
89
+ logger . debug ( 'Session not live' , { sessionId } ) ;
79
90
return false ;
80
91
}
81
- return await validateSessionOwnership ( sessionId , userId ) ;
92
+ const isOwned = await validateSessionOwnership ( sessionId , userId ) ;
93
+ logger . debug ( 'Session ownership check' , { sessionId, userId, isOwned } ) ;
94
+ return isOwned ;
82
95
}
83
96
84
97
85
98
export async function redisRelayToMcpServer ( sessionId : string , transport : Transport , isGetRequest : boolean = false ) : Promise < ( ) => Promise < void > > {
99
+ logger . debug ( 'Setting up Redis relay to MCP server' , {
100
+ sessionId,
101
+ isGetRequest
102
+ } ) ;
103
+
86
104
let redisCleanup : ( ( ) => Promise < void > ) | undefined = undefined ;
87
105
const cleanup = async ( ) => {
88
106
// TODO: solve race conditions where we call cleanup while the subscription is being created / before it is created
89
107
if ( redisCleanup ) {
108
+ logger . debug ( 'Cleaning up Redis relay' , { sessionId } ) ;
90
109
await redisCleanup ( ) ;
91
110
}
92
111
}
93
112
94
113
const subscribe = async ( requestId : string ) => {
95
114
const toClientChannel = getToClientChannel ( sessionId , requestId ) ;
115
+
116
+ logger . debug ( 'Subscribing to client channel' , {
117
+ sessionId,
118
+ requestId,
119
+ channel : toClientChannel
120
+ } ) ;
96
121
97
122
redisCleanup = await redisClient . createSubscription ( toClientChannel , async ( redisMessageJson ) => {
98
123
const redisMessage = JSON . parse ( redisMessageJson ) as RedisMessage ;
99
124
if ( redisMessage . type === 'mcp' ) {
125
+ logger . debug ( 'Relaying message from Redis to client' , {
126
+ sessionId,
127
+ requestId,
128
+ method : ( redisMessage . message as any ) . method
129
+ } ) ;
100
130
await transport . send ( redisMessage . message , redisMessage . options ) ;
101
131
}
102
132
} , ( error ) => {
133
+ logger . error ( 'Error in Redis relay subscription' , error , {
134
+ sessionId,
135
+ channel : toClientChannel
136
+ } ) ;
103
137
transport . onerror ?.( error ) ;
104
138
} ) ;
105
139
}
@@ -111,6 +145,11 @@ export async function redisRelayToMcpServer(sessionId: string, transport: Transp
111
145
transport . onmessage = async ( message , extra ) => {
112
146
// First, set up response subscription if needed
113
147
if ( "id" in message ) {
148
+ logger . debug ( 'Setting up response subscription' , {
149
+ sessionId,
150
+ messageId : message . id ,
151
+ method : ( message as any ) . method
152
+ } ) ;
114
153
await subscribe ( message . id . toString ( ) ) ;
115
154
}
116
155
// Now send the message to the MCP server
@@ -171,40 +210,78 @@ export class ServerRedisTransport implements Transport {
171
210
}
172
211
173
212
async start ( ) : Promise < void > {
213
+ logger . info ( 'Starting ServerRedisTransport' , {
214
+ sessionId : this . _sessionId ,
215
+ inactivityTimeoutMs : this . INACTIVITY_TIMEOUT_MS
216
+ } ) ;
217
+
174
218
// Start inactivity timer
175
219
this . resetInactivityTimer ( ) ;
176
220
177
221
// Subscribe to MCP messages from clients
178
222
const serverChannel = getToServerChannel ( this . _sessionId ) ;
223
+ logger . debug ( 'Subscribing to server channel' , {
224
+ sessionId : this . _sessionId ,
225
+ channel : serverChannel
226
+ } ) ;
227
+
179
228
this . serverCleanup = await redisClient . createSubscription (
180
229
serverChannel ,
181
230
( messageJson ) => {
182
231
const redisMessage = JSON . parse ( messageJson ) as RedisMessage ;
183
232
if ( redisMessage . type === 'mcp' ) {
184
233
// Reset inactivity timer on each message from client
185
234
this . resetInactivityTimer ( ) ;
235
+
236
+ logger . debug ( 'Received MCP message from client' , {
237
+ sessionId : this . _sessionId ,
238
+ method : ( redisMessage . message as any ) . method ,
239
+ id : ( redisMessage . message as any ) . id
240
+ } ) ;
241
+
186
242
this . onmessage ?.( redisMessage . message , redisMessage . extra ) ;
187
243
}
188
244
} ,
189
245
( error ) => {
246
+ logger . error ( 'Error in server channel subscription' , error , {
247
+ sessionId : this . _sessionId ,
248
+ channel : serverChannel
249
+ } ) ;
190
250
this . onerror ?.( error ) ;
191
251
}
192
252
) ;
193
253
194
254
// Subscribe to control messages for shutdown
195
255
const controlChannel = getControlChannel ( this . _sessionId ) ;
256
+ logger . debug ( 'Subscribing to control channel' , {
257
+ sessionId : this . _sessionId ,
258
+ channel : controlChannel
259
+ } ) ;
260
+
196
261
this . controlCleanup = await redisClient . createSubscription (
197
262
controlChannel ,
198
263
( messageJson ) => {
199
264
const redisMessage = JSON . parse ( messageJson ) as RedisMessage ;
200
265
if ( redisMessage . type === 'control' ) {
266
+ logger . info ( 'Received control message' , {
267
+ sessionId : this . _sessionId ,
268
+ action : redisMessage . action
269
+ } ) ;
270
+
201
271
if ( redisMessage . action === 'SHUTDOWN' ) {
272
+ logger . info ( 'Shutting down transport due to control message' , {
273
+ sessionId : this . _sessionId
274
+ } ) ;
202
275
this . shouldShutdown = true ;
203
276
this . close ( ) ;
204
277
}
205
278
}
206
279
} ,
207
280
( error ) => {
281
+ logger . error ( 'Error in control channel subscription' , error , {
282
+ sessionId : this . _sessionId ,
283
+ channel : controlChannel
284
+ } ) ;
208
285
this . onerror ?.( error ) ;
209
286
}
210
287
) ;
@@ -215,12 +292,25 @@ export class ServerRedisTransport implements Transport {
215
292
const relatedRequestId = options ?. relatedRequestId ?. toString ( ) ?? ( "id" in message ? message . id ?. toString ( ) : notificationStreamId ) ;
216
293
const channel = getToClientChannel ( this . _sessionId , relatedRequestId )
217
294
295
+ logger . debug ( 'Sending message to client' , {
296
+ sessionId : this . _sessionId ,
297
+ channel,
298
+ method : ( message as any ) . method ,
299
+ id : ( message as any ) . id ,
300
+ relatedRequestId
301
+ } ) ;
302
+
218
303
const redisMessage : RedisMessage = { type : 'mcp' , message, options } ;
219
304
const messageStr = JSON . stringify ( redisMessage ) ;
220
305
await redisClient . publish ( channel , messageStr ) ;
221
306
}
222
307
223
308
async close ( ) : Promise < void > {
309
+ logger . info ( 'Closing ServerRedisTransport' , {
310
+ sessionId : this . _sessionId ,
311
+ wasShutdown : this . shouldShutdown
312
+ } ) ;
313
+
224
314
// Clear inactivity timer
225
315
this . clearInactivityTimer ( ) ;
226
316
@@ -241,6 +331,11 @@ export class ServerRedisTransport implements Transport {
241
331
}
242
332
243
333
export async function getShttpTransport ( sessionId : string , onsessionclosed : ( sessionId : string ) => void | Promise < void > , isGetRequest : boolean = false ) : Promise < StreamableHTTPServerTransport > {
334
+ logger . debug ( 'Getting StreamableHTTPServerTransport for existing session' , {
335
+ sessionId,
336
+ isGetRequest
337
+ } ) ;
338
+
244
339
// Giving undefined here and setting the sessionId means the
245
340
// transport wont try to create a new session.
246
341
const shttpTransport = new StreamableHTTPServerTransport ( {
0 commit comments