Skip to content

Commit ac40e8a

Browse files
authored
feat: add session management for streamableHttp [MCP-52] (#379)
1 parent b7c616a commit ac40e8a

File tree

9 files changed

+229
-175
lines changed

9 files changed

+229
-175
lines changed

.vscode/launch.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"name": "Launch Program",
2020
"skipFiles": ["<node_internals>/**"],
2121
"program": "${workspaceFolder}/dist/index.js",
22+
"args": ["--transport", "http", "--loggers", "stderr", "mcp"],
2223
"preLaunchTask": "tsc: build - tsconfig.build.json",
2324
"outFiles": ["${workspaceFolder}/dist/**/*.js"]
2425
}

README.md

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -299,20 +299,20 @@ The MongoDB MCP Server can be configured using multiple methods, with the follow
299299

300300
### Configuration Options
301301

302-
| Option | Default | Description |
303-
| ------------------ | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
304-
| `apiClientId` | <not set> | Atlas API client ID for authentication. Required for running Atlas tools. |
305-
| `apiClientSecret` | <not set> | Atlas API client secret for authentication. Required for running Atlas tools. |
306-
| `connectionString` | <not set> | MongoDB connection string for direct database connections. Optional, if not set, you'll need to call the `connect` tool before interacting with MongoDB data. |
307-
| `loggers` | disk,mcp | Comma separated values, possible values are `mcp`, `disk` and `stderr`. See [Logger Options](#logger-options) for details. |
308-
| `logPath` | see note\* | Folder to store logs. |
309-
| `disabledTools` | <not set> | An array of tool names, operation types, and/or categories of tools that will be disabled. |
310-
| `readOnly` | false | When set to true, only allows read, connect, and metadata operation types, disabling create/update/delete operations. |
311-
| `indexCheck` | false | When set to true, enforces that query operations must use an index, rejecting queries that perform a collection scan. |
312-
| `telemetry` | enabled | When set to disabled, disables telemetry collection. |
313-
| `transport` | stdio | Either 'stdio' or 'http'. |
314-
| `httpPort` | 3000 | Port number. |
315-
| `httpHost` | 127.0.0.1 | Host to bind the http server. |
302+
| CLI Option | Environment Variable | Default | Description |
303+
| ------------------ | --------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
304+
| `apiClientId` | `MDB_MCP_API_CLIENT_ID` | <not set> | Atlas API client ID for authentication. Required for running Atlas tools. |
305+
| `apiClientSecret` | `MDB_MCP_API_CLIENT_SECRET` | <not set> | Atlas API client secret for authentication. Required for running Atlas tools. |
306+
| `connectionString` | `MDB_MCP_CONNECTION_STRING` | <not set> | MongoDB connection string for direct database connections. Optional, if not set, you'll need to call the `connect` tool before interacting with MongoDB data. |
307+
| `loggers` | `MDB_MCP_LOGGERS` | disk,mcp | Comma separated values, possible values are `mcp`, `disk` and `stderr`. See [Logger Options](#logger-options) for details. |
308+
| `logPath` | `MDB_MCP_LOG_PATH` | see note\* | Folder to store logs. |
309+
| `disabledTools` | `MDB_MCP_DISABLED_TOOLS` | <not set> | An array of tool names, operation types, and/or categories of tools that will be disabled. |
310+
| `readOnly` | `MDB_MCP_READ_ONLY` | false | When set to true, only allows read, connect, and metadata operation types, disabling create/update/delete operations. |
311+
| `indexCheck` | `MDB_MCP_INDEX_CHECK` | false | When set to true, enforces that query operations must use an index, rejecting queries that perform a collection scan. |
312+
| `telemetry` | `MDB_MCP_TELEMETRY` | enabled | When set to disabled, disables telemetry collection. |
313+
| `transport` | `MDB_MCP_TRANSPORT` | stdio | Either 'stdio' or 'http'. |
314+
| `httpPort` | `MDB_MCP_HTTP_PORT` | 3000 | Port number. |
315+
| `httpHost` | `MDB_MCP_HTTP_HOST` | 127.0.0.1 | Host to bind the http server. |
316316

317317
#### Logger Options
318318

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
},
1717
"type": "module",
1818
"scripts": {
19+
"start": "node dist/index.js --transport http",
1920
"prepare": "npm run build",
2021
"build:clean": "rm -rf dist",
2122
"build:compile": "tsc --project tsconfig.build.json",

src/common/logger.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,9 @@ export const LogId = {
4040
toolUpdateFailure: mongoLogId(1_005_001),
4141

4242
streamableHttpTransportStarted: mongoLogId(1_006_001),
43-
streamableHttpTransportStartFailure: mongoLogId(1_006_002),
44-
streamableHttpTransportSessionInitialized: mongoLogId(1_006_003),
45-
streamableHttpTransportRequestFailure: mongoLogId(1_006_004),
46-
streamableHttpTransportCloseRequested: mongoLogId(1_006_005),
47-
streamableHttpTransportCloseSuccess: mongoLogId(1_006_006),
48-
streamableHttpTransportCloseFailure: mongoLogId(1_006_007),
43+
streamableHttpTransportSessionCloseFailure: mongoLogId(1_006_002),
44+
streamableHttpTransportRequestFailure: mongoLogId(1_006_003),
45+
streamableHttpTransportCloseFailure: mongoLogId(1_006_004),
4946
} as const;
5047

5148
export abstract class LoggerBase {

src/common/sessionStore.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
2+
import logger, { LogId } from "./logger.js";
3+
4+
export class SessionStore {
5+
private sessions: { [sessionId: string]: StreamableHTTPServerTransport } = {};
6+
7+
getSession(sessionId: string): StreamableHTTPServerTransport | undefined {
8+
return this.sessions[sessionId];
9+
}
10+
11+
setSession(sessionId: string, transport: StreamableHTTPServerTransport): void {
12+
if (this.sessions[sessionId]) {
13+
throw new Error(`Session ${sessionId} already exists`);
14+
}
15+
this.sessions[sessionId] = transport;
16+
}
17+
18+
async closeSession(sessionId: string, closeTransport: boolean = true): Promise<void> {
19+
if (!this.sessions[sessionId]) {
20+
throw new Error(`Session ${sessionId} not found`);
21+
}
22+
if (closeTransport) {
23+
const transport = this.sessions[sessionId];
24+
if (!transport) {
25+
throw new Error(`Session ${sessionId} not found`);
26+
}
27+
try {
28+
await transport.close();
29+
} catch (error) {
30+
logger.error(
31+
LogId.streamableHttpTransportSessionCloseFailure,
32+
"streamableHttpTransport",
33+
`Error closing transport ${sessionId}: ${error instanceof Error ? error.message : String(error)}`
34+
);
35+
}
36+
}
37+
delete this.sessions[sessionId];
38+
}
39+
40+
async closeAllSessions(): Promise<void> {
41+
await Promise.all(
42+
Object.values(this.sessions)
43+
.filter((transport) => transport !== undefined)
44+
.map((transport) => transport.close())
45+
);
46+
this.sessions = {};
47+
}
48+
}

src/index.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ async function main() {
1414
transportRunner
1515
.close()
1616
.then(() => {
17+
logger.info(LogId.serverClosed, "server", `Server closed`);
1718
process.exit(0);
1819
})
1920
.catch((error: unknown) => {
@@ -22,10 +23,10 @@ async function main() {
2223
});
2324
};
2425

25-
process.once("SIGINT", shutdown);
26-
process.once("SIGABRT", shutdown);
27-
process.once("SIGTERM", shutdown);
28-
process.once("SIGQUIT", shutdown);
26+
process.on("SIGINT", shutdown);
27+
process.on("SIGABRT", shutdown);
28+
process.on("SIGTERM", shutdown);
29+
process.on("SIGQUIT", shutdown);
2930

3031
try {
3132
await transportRunner.start();

src/transports/streamableHttp.ts

Lines changed: 112 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,90 +1,143 @@
11
import express from "express";
22
import http from "http";
33
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
4+
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
45
import { TransportRunnerBase } from "./base.js";
56
import { config } from "../common/config.js";
67
import logger, { LogId } from "../common/logger.js";
8+
import { randomUUID } from "crypto";
9+
import { SessionStore } from "../common/sessionStore.js";
710

811
const JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED = -32000;
9-
const JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED = -32601;
12+
const JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED = -32001;
13+
const JSON_RPC_ERROR_CODE_SESSION_ID_INVALID = -32002;
14+
const JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND = -32003;
15+
const JSON_RPC_ERROR_CODE_INVALID_REQUEST = -32004;
1016

1117
function promiseHandler(
1218
fn: (req: express.Request, res: express.Response, next: express.NextFunction) => Promise<void>
1319
) {
1420
return (req: express.Request, res: express.Response, next: express.NextFunction) => {
15-
fn(req, res, next).catch(next);
21+
fn(req, res, next).catch((error) => {
22+
logger.error(
23+
LogId.streamableHttpTransportRequestFailure,
24+
"streamableHttpTransport",
25+
`Error handling request: ${error instanceof Error ? error.message : String(error)}`
26+
);
27+
res.status(400).json({
28+
jsonrpc: "2.0",
29+
error: {
30+
code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED,
31+
message: `failed to handle request`,
32+
data: error instanceof Error ? error.message : String(error),
33+
},
34+
});
35+
});
1636
};
1737
}
1838

1939
export class StreamableHttpRunner extends TransportRunnerBase {
2040
private httpServer: http.Server | undefined;
41+
private sessionStore: SessionStore = new SessionStore();
2142

2243
async start() {
2344
const app = express();
2445
app.enable("trust proxy"); // needed for reverse proxy support
25-
app.use(express.urlencoded({ extended: true }));
2646
app.use(express.json());
2747

48+
const handleRequest = async (req: express.Request, res: express.Response) => {
49+
const sessionId = req.headers["mcp-session-id"];
50+
if (!sessionId) {
51+
res.status(400).json({
52+
jsonrpc: "2.0",
53+
error: {
54+
code: JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED,
55+
message: `session id is required`,
56+
},
57+
});
58+
return;
59+
}
60+
if (typeof sessionId !== "string") {
61+
res.status(400).json({
62+
jsonrpc: "2.0",
63+
error: {
64+
code: JSON_RPC_ERROR_CODE_SESSION_ID_INVALID,
65+
message: `session id is invalid`,
66+
},
67+
});
68+
return;
69+
}
70+
const transport = this.sessionStore.getSession(sessionId);
71+
if (!transport) {
72+
res.status(404).json({
73+
jsonrpc: "2.0",
74+
error: {
75+
code: JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND,
76+
message: `session not found`,
77+
},
78+
});
79+
return;
80+
}
81+
await transport.handleRequest(req, res, req.body);
82+
};
83+
2884
app.post(
2985
"/mcp",
3086
promiseHandler(async (req: express.Request, res: express.Response) => {
31-
const transport = new StreamableHTTPServerTransport({
32-
sessionIdGenerator: undefined,
33-
});
87+
const sessionId = req.headers["mcp-session-id"];
88+
if (sessionId) {
89+
await handleRequest(req, res);
90+
return;
91+
}
3492

35-
const server = this.setupServer();
93+
if (!isInitializeRequest(req.body)) {
94+
res.status(400).json({
95+
jsonrpc: "2.0",
96+
error: {
97+
code: JSON_RPC_ERROR_CODE_INVALID_REQUEST,
98+
message: `invalid request`,
99+
},
100+
});
101+
return;
102+
}
36103

37-
await server.connect(transport);
104+
const server = this.setupServer();
105+
const transport = new StreamableHTTPServerTransport({
106+
sessionIdGenerator: () => randomUUID().toString(),
107+
onsessioninitialized: (sessionId) => {
108+
this.sessionStore.setSession(sessionId, transport);
109+
},
110+
onsessionclosed: async (sessionId) => {
111+
try {
112+
await this.sessionStore.closeSession(sessionId, false);
113+
} catch (error) {
114+
logger.error(
115+
LogId.streamableHttpTransportSessionCloseFailure,
116+
"streamableHttpTransport",
117+
`Error closing session: ${error instanceof Error ? error.message : String(error)}`
118+
);
119+
}
120+
},
121+
});
38122

39-
res.on("close", () => {
40-
Promise.all([transport.close(), server.close()]).catch((error: unknown) => {
123+
transport.onclose = () => {
124+
server.close().catch((error) => {
41125
logger.error(
42126
LogId.streamableHttpTransportCloseFailure,
43127
"streamableHttpTransport",
44128
`Error closing server: ${error instanceof Error ? error.message : String(error)}`
45129
);
46130
});
47-
});
131+
};
48132

49-
try {
50-
await transport.handleRequest(req, res, req.body);
51-
} catch (error) {
52-
logger.error(
53-
LogId.streamableHttpTransportRequestFailure,
54-
"streamableHttpTransport",
55-
`Error handling request: ${error instanceof Error ? error.message : String(error)}`
56-
);
57-
res.status(400).json({
58-
jsonrpc: "2.0",
59-
error: {
60-
code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED,
61-
message: `failed to handle request`,
62-
data: error instanceof Error ? error.message : String(error),
63-
},
64-
});
65-
}
133+
await server.connect(transport);
134+
135+
await transport.handleRequest(req, res, req.body);
66136
})
67137
);
68138

69-
app.get("/mcp", (req: express.Request, res: express.Response) => {
70-
res.status(405).json({
71-
jsonrpc: "2.0",
72-
error: {
73-
code: JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED,
74-
message: `method not allowed`,
75-
},
76-
});
77-
});
78-
79-
app.delete("/mcp", (req: express.Request, res: express.Response) => {
80-
res.status(405).json({
81-
jsonrpc: "2.0",
82-
error: {
83-
code: JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED,
84-
message: `method not allowed`,
85-
},
86-
});
87-
});
139+
app.get("/mcp", promiseHandler(handleRequest));
140+
app.delete("/mcp", promiseHandler(handleRequest));
88141

89142
this.httpServer = await new Promise<http.Server>((resolve, reject) => {
90143
const result = app.listen(config.httpPort, config.httpHost, (err?: Error) => {
@@ -104,14 +157,17 @@ export class StreamableHttpRunner extends TransportRunnerBase {
104157
}
105158

106159
async close(): Promise<void> {
107-
await new Promise<void>((resolve, reject) => {
108-
this.httpServer?.close((err) => {
109-
if (err) {
110-
reject(err);
111-
return;
112-
}
113-
resolve();
114-
});
115-
});
160+
await Promise.all([
161+
this.sessionStore.closeAllSessions(),
162+
new Promise<void>((resolve, reject) => {
163+
this.httpServer?.close((err) => {
164+
if (err) {
165+
reject(err);
166+
return;
167+
}
168+
resolve();
169+
});
170+
}),
171+
]);
116172
}
117173
}

0 commit comments

Comments
 (0)