Skip to content
This repository was archived by the owner on May 23, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 73 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
A minimal Model Context Protocol (MCP) server for OpenSearch exposing 4 tools over stdio and sse server.

## Available tools
- list_indices: Lists all indices in OpenSearch.
- get_index_mapping: Gets the mapping for specified index.
- search_index: Searches an index using a query.
- get_shards: Gets information about shards in OpenSearch cluster.
- ListIndexTool: Lists all indices in OpenSearch.
- IndexMappingTool: Retrieves index mapping and setting information for an index in OpenSearch.
- SearchIndexTool: Searches an index using a query written in query domain-specific language (DSL) in OpenSearch.
- GetShardsTool: Gets information about shards in OpenSearch.

> More tools coming soon
> More tools coming soon. [Click here](#contributing) to learn how to add new tools.

## Installation

Expand Down Expand Up @@ -180,4 +180,71 @@ uv add <package-name>
```
uv lock
uv sync
```
```

## Contributing {#contributing}
### Adding Custom Tools
To add a new tool to the MCP server, follow these steps:

1. Create a new tool function in `src/tools/tools.py`:
```python
async def your_tool_function(args: YourToolArgs) -> list[dict]:
try:
# Your tool implementation here
result = your_implementation()
return [{
"type": "text",
"text": result
}]
except Exception as e:
return [{
"type": "text",
"text": f"Error: {str(e)}"
}]
```

2. Define the arguments model using Pydantic:
```python
class YourToolArgs(BaseModel):
# Define your tool's parameters here
param1: str
param2: int
```

3. Register your tool in the `TOOL_REGISTRY` dictionary:
```python
TOOL_REGISTRY = {
# ... existing tools ...
"YourToolName": {
"description": "Description of what your tool does",
"input_schema": YourToolArgs.model_json_schema(),
"function": your_tool_function,
"args_model": YourToolArgs,
}
}
```

4. Add helper functions in `src/opensearch/helper.py`:
```python
def your_helper_function(param1: str, param2: int) -> dict:
"""
Helper function that performs a single REST call to OpenSearch.
Each helper should be focused on one specific OpenSearch operation.
This promotes clarity and maintainable architecture.
"""
# Your OpenSearch REST call implementation here
return result
```

5. Import and use the helper functions in your tool:
```python
from opensearch.helper import your_helper_function
```

The tool will be automatically available through the MCP server after registration.

> Note: Each helper function should perform a single REST call to OpenSearch. This design promotes:
> - Clear separation of concerns
> - Easy testing and maintenance
> - Extensible architecture
> - Reusable OpenSearch operations
Empty file removed mcpSSEServer.py
Empty file.
15 changes: 0 additions & 15 deletions src/mcp_server_opensearch/sse_server.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,18 @@
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0

import logging
import argparse
from typing import Any

import uvicorn
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from mcp.server.sse import SseServerTransport
from mcp.server import Server
from mcp.types import TextContent, Tool
from tools.tools import TOOL_REGISTRY

# Constants
API_KEY = "secret-token"

class APIKeyMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
auth_header = request.headers.get("authorization")
if not auth_header or auth_header != f"Bearer {API_KEY}":
return JSONResponse({"detail": "Unauthorized"}, status_code=401)
return await call_next(request)

def create_mcp_server() -> Server:
server = Server("opensearch-mcp-server")

Expand Down Expand Up @@ -74,7 +60,6 @@ def create_app(self) -> Starlette:
Route("/sse", endpoint=self.handle_sse, methods=["GET"]),
Mount("/messages/", app=self.sse.handle_post_message),
],
middleware=[Middleware(APIKeyMiddleware)],
)

async def serve(host: str = "0.0.0.0", port: int = 9900) -> None:
Expand Down
14 changes: 7 additions & 7 deletions src/tools/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,26 +96,26 @@ async def get_shards_tool(args: GetShardsArgs) -> list[dict]:
}]

TOOL_REGISTRY = {
"list_indices": {
"ListIndexTool": {
"description": "Lists all indices in OpenSearch",
"input_schema": ListIndicesArgs.model_json_schema(),
"function": list_indices_tool,
"args_model": ListIndicesArgs,
},
"get_index_mapping": {
"description": "Gets the mapping for specified index",
"IndexMappingTool": {
"description": "Retrieves index mapping and setting information for an index in OpenSearch",
"input_schema": GetIndexMappingArgs.model_json_schema(),
"function": get_index_mapping_tool,
"args_model": GetIndexMappingArgs,
},
"search_index": {
"description": "Searches an index using a query",
"SearchIndexTool": {
"description": "Searches an index using a query written in query domain-specific language (DSL) in OpenSearch",
"input_schema": SearchIndexArgs.model_json_schema(),
"function": search_index_tool,
"args_model": SearchIndexArgs,
},
"get_shards": {
"description": "Gets information about shards in OpenSearch cluster",
"GetShardsTool": {
"description": "Gets information about shards in OpenSearch",
"input_schema": GetShardsArgs.model_json_schema(),
"function": get_shards_tool,
"args_model": GetShardsArgs,
Expand Down