Skip to content

Commit b02b020

Browse files
Refactor MCP code
1 parent 89001a1 commit b02b020

File tree

9 files changed

+165
-50
lines changed

9 files changed

+165
-50
lines changed

application/apps/indexer/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ members = [
99
"addons/bufread",
1010
"indexer_base",
1111
"merging",
12+
"mcp",
1213
"parsers",
1314
"plugins_host",
1415
"processor",
1516
"session",
1617
"sources",
1718
"stypes",
18-
"mcp",
1919
]
2020

2121
[workspace.dependencies]

application/apps/indexer/mcp/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ schemars = "1.0"
1111
dirs.workspace = true
1212
anyhow.workspace = true
1313
log.workspace = true
14+
tokio.workspace = true
15+
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "std"] }
16+
tracing = "0.1"
1417

1518
[dev-dependencies]
1619
tempfile.workspace = true

application/apps/indexer/mcp/src/ai_config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@ impl AiConfig {
2020
.unwrap_or_default()
2121
}
2222

23+
pub fn new(model: String, api_key: String, endpoint: String, timeout_secs: u64) -> Self {
24+
AiConfig {
25+
model,
26+
api_key,
27+
endpoint,
28+
timeout_secs,
29+
}
30+
}
31+
2332
pub fn is_valid(&self) -> bool {
2433
!self.model.is_empty() && !self.api_key.is_empty() && !self.endpoint.is_empty()
2534
}

application/apps/indexer/mcp/src/ai_manager.rs

Lines changed: 0 additions & 31 deletions
This file was deleted.
Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,94 @@
11
mod ai_config;
2-
mod ai_manager;
32
mod parameters;
3+
mod tasks;
44
mod tools;
55
mod utils;
66

77
use anyhow::Result;
8+
use rmcp::{
9+
handler::server::{ServerHandler, router::tool::ToolRouter},
10+
model::{ServerCapabilities, ServerInfo},
11+
tool_handler,
12+
};
13+
use tokio::sync::mpsc::{Receiver, Sender};
814

915
use ai_config::AiConfig;
10-
use ai_manager::ChipmunkAI;
16+
use tasks::Task;
1117

12-
async fn start() -> Result<()> {
13-
let config = AiConfig::init();
14-
let service = ChipmunkAI::new(config);
15-
Ok(())
18+
/// ChipmunkAI is responsible for receiving the prompt from the UI and the processing
19+
/// the prompt with the LLM agent. If agent invokes some tool then
20+
pub struct ChipmunkAI {
21+
pub config: AiConfig,
22+
23+
// Channel for sending the Task to Chipmunk Core
24+
pub task_tx: Sender<Task>,
25+
26+
//Channel for receiving the prompt from the User
27+
pub prompt_rx: Receiver<String>,
28+
29+
// Channel for sending the UI updates to Chipmunk Core.
30+
// This channel will receive the updated from AI agent,
31+
// e.g. UI messages to display, thinking..., Searching for tools...
32+
pub ui_tx: Sender<String>,
33+
34+
pub tool_router: ToolRouter<Self>,
35+
}
36+
37+
#[tool_handler]
38+
impl ServerHandler for ChipmunkAI {
39+
fn get_info(&self) -> ServerInfo {
40+
ServerInfo {
41+
instructions: Some("Chipmunk AI".to_string()),
42+
capabilities: ServerCapabilities::builder()
43+
.enable_tools()
44+
.enable_prompts()
45+
.enable_resources()
46+
.build(),
47+
..Default::default()
48+
}
49+
}
50+
}
51+
52+
/// ServerCommunication is responsible to receiving the tasks from
53+
/// AI. Once task is received then Chipmunk takes the necessary action on the tasks,
54+
/// apply those tasks to the UI.
55+
pub struct ServerCommunication {
56+
pub task_rx: Receiver<Task>,
57+
}
58+
59+
impl ServerCommunication {
60+
pub fn new(task_rx: Receiver<Task>) -> Self {
61+
Self { task_rx }
62+
}
63+
}
64+
65+
/// ClientCommuinication is responsible for communicating the tasks from
66+
/// UI to the AI agent. With this user can send the prompt to AI agent and
67+
/// AI agent can send the response over mpsc receiver to render on the UI
68+
pub struct ClientCommunication {
69+
pub ui_rx: Receiver<String>,
70+
pub prompt_tx: Sender<String>,
71+
}
72+
73+
impl ClientCommunication {
74+
pub fn new(ui_receiver: Receiver<String>, prompt_sender: Sender<String>) -> Self {
75+
Self {
76+
ui_rx: ui_receiver,
77+
prompt_tx: prompt_sender,
78+
}
79+
}
80+
}
81+
82+
pub fn start() -> Result<(ChipmunkAI, ServerCommunication, ClientCommunication)> {
83+
let (task_tx, task_rx) = tokio::sync::mpsc::channel::<Task>(10);
84+
let (prompt_tx, prompt_rx) = tokio::sync::mpsc::channel::<String>(1);
85+
let (ui_tx, ui_rx) = tokio::sync::mpsc::channel::<String>(1);
86+
87+
let server_comm = ServerCommunication::new(task_rx);
88+
let client_comm = ClientCommunication::new(ui_rx, prompt_tx);
89+
90+
let config = AiConfig::default();
91+
let manager = ChipmunkAI::new(config, prompt_rx, task_tx, ui_tx);
92+
93+
Ok((manager, server_comm, client_comm))
1694
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use anyhow::Result;
2+
3+
use mcp_server::start;
4+
use rmcp::{ServiceExt, transport::io::stdio};
5+
6+
#[tokio::main]
7+
async fn main() -> Result<()> {
8+
let (ai_manager, _server_comm, _client_comm) = start()?;
9+
10+
let service = ai_manager
11+
.serve(stdio())
12+
.await
13+
.inspect_err(|err| eprintln!("Error while starting the AI Server; {err:?}"))?;
14+
15+
service.waiting().await?;
16+
Ok(())
17+
}
18+
Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
use schemars::JsonSchema;
22
use serde::{Deserialize, Serialize};
33

4-
#[derive(Debug, Default, JsonSchema, Serialize, Deserialize)]
4+
#[derive(Clone, Debug, Default, JsonSchema, Serialize, Deserialize)]
5+
pub struct ChartFilter {
6+
pub value: String,
7+
pub is_regex: bool,
8+
pub ignore_case: bool,
9+
pub is_word: bool,
10+
}
11+
12+
#[derive(Clone, Debug, Default, JsonSchema, Serialize, Deserialize)]
513
pub struct SearchFilter {
614
pub value: String,
715
pub is_regex: bool,
@@ -10,6 +18,11 @@ pub struct SearchFilter {
1018
}
1119

1220
#[derive(Debug, Default, JsonSchema, Serialize, Deserialize)]
13-
pub struct FilterParameter {
21+
pub struct SearchFilterParameter {
1422
pub filters: Vec<SearchFilter>,
1523
}
24+
25+
#[derive(Debug, Default, JsonSchema, Serialize, Deserialize)]
26+
pub struct ChartFilterParameter {
27+
pub filters: Vec<ChartFilter>,
28+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
use crate::parameters::{ChartFilter, SearchFilter};
2+
3+
#[derive(Debug)]
4+
pub enum Task {
5+
ApplyFilter { filters: Vec<SearchFilter> },
6+
ApplyChart { filter: Vec<ChartFilter> },
7+
}

application/apps/indexer/mcp/src/tools.rs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,34 @@
1+
use anyhow::Result;
12
use log::info;
23
use rmcp::{
34
ErrorData as McpError,
45
handler::server::wrapper::Parameters,
56
model::{CallToolResult, Content},
67
tool, tool_router,
78
};
9+
use tokio::sync::mpsc::{Receiver, Sender};
810

911
use super::{
12+
ChipmunkAI, ClientCommunication, ServerCommunication,
1013
ai_config::AiConfig,
11-
ai_manager::ChipmunkAI,
12-
parameters::{FilterParameter, SearchFilter},
14+
parameters::{SearchFilter, SearchFilterParameter},
15+
tasks::Task,
1316
};
1417

1518
#[tool_router]
1619
impl ChipmunkAI {
1720
#[allow(dead_code)]
18-
pub fn new(config: AiConfig) -> Self {
21+
pub fn new(
22+
config: AiConfig,
23+
prompt_rx: Receiver<String>,
24+
task_tx: Sender<Task>,
25+
ui_tx: Sender<String>,
26+
) -> Self {
1927
Self {
2028
config,
29+
prompt_rx,
30+
ui_tx,
31+
task_tx,
2132
tool_router: Self::tool_router(),
2233
}
2334
}
@@ -29,15 +40,15 @@ Each filter can be customized with flags for regex matching, case sensitivity, a
2940
3041
**Input Parameters:**
3142
- `filters`: An array of filter objects, where each object contains:
32-
- `filter` (string): The text or pattern to search for
43+
- `value` (string): The text or pattern to search for
3344
- `is_regex` (boolean): true if the filter is a regular expression pattern
3445
- `ignore_case` (boolean): true for case-insensitive matching
3546
- `is_word` (boolean): true to match whole words only (word boundary matching)
3647
3748
**Usage Examples:**
3849
3950
Single filter:
40-
- Input: [{"filter": "error", "is_regex": false, "ignore_case": false, "is_word": false}]
51+
- Input: [{"value": "error", "is_regex": false, "ignore_case": false, "is_word": false}]
4152
- Use case: Find exact matches of "error"
4253
4354
Multiple filters:
@@ -61,10 +72,11 @@ When the user provides natural language instructions, interpret them as follows:
6172
- "regex pattern \\d+" → set is_regex: true
6273
- "find ERROR, WARNING, and CRITICAL" → three separate filters
6374
"#)]
64-
async fn apply_filters(
75+
async fn apply_search_filters(
6576
&self,
66-
Parameters(param): Parameters<FilterParameter>,
77+
Parameters(param): Parameters<SearchFilterParameter>,
6778
) -> Result<CallToolResult, McpError> {
79+
tracing::debug!("MCP: Apply search filter call received");
6880
let filters = param
6981
.filters
7082
.iter()
@@ -75,10 +87,16 @@ When the user provides natural language instructions, interpret them as follows:
7587
ignore_case: f.ignore_case,
7688
})
7789
.collect::<Vec<SearchFilter>>();
78-
info!("Received Filters from the LLM Agent: {filters:?}");
79-
// TODO: Apply filter via channel
90+
tracing::debug!("MCP: Received Filters from the LLM Agent: {filters:?}");
91+
self.task_tx
92+
.send(Task::ApplyFilter {
93+
filters: filters.clone(),
94+
})
95+
.await
96+
.map_err(|e| McpError::internal_error(format!("Failed to send task: {}", e), None))?;
97+
tracing::debug!("MCP: Sent ApplyFilter task to the task processor: {filters:?}");
8098
Ok(CallToolResult::success(vec![Content::json(
81-
"Applied filters to the logs",
99+
"Applied filters to the logs: {filters:?}",
82100
)?]))
83101
}
84102
}

0 commit comments

Comments
 (0)