1818package mcp
1919
2020import (
21+ "context"
2122 stdlog "log"
2223 "os"
2324
2425 "github.com/mark3labs/mcp-go/server"
26+ "github.com/pkg/errors"
2527 "github.com/sirupsen/logrus"
28+ "github.com/streamnative/streamnative-mcp-server/pkg/config"
29+ "github.com/streamnative/streamnative-mcp-server/pkg/kafka"
2630 "github.com/streamnative/streamnative-mcp-server/pkg/mcp"
31+ "github.com/streamnative/streamnative-mcp-server/pkg/pulsar"
2732)
2833
29- func newMcpServer (configOpts * ServerOptions , logrusLogger * logrus.Logger ) * server. MCPServer {
34+ func newMcpServer (_ context. Context , configOpts * ServerOptions , logrusLogger * logrus.Logger ) ( * mcp. Server , error ) {
3035 snConfig := configOpts .Options .LoadConfigOrDie ()
3136 var s * server.MCPServer
37+ var mcpServer * mcp.Server
3238 switch {
3339 case snConfig .KeyFile != "" :
3440 {
@@ -38,36 +44,66 @@ func newMcpServer(configOpts *ServerOptions, logrusLogger *logrus.Logger) *serve
3844 stdlog .Fatalf ("failed to get user name: %v" , err )
3945 os .Exit (1 )
4046 }
41- // Create a new MCP server
42- s = server . NewMCPServer (
43- "streamnative-mcp-server" ,
44- "0.0.1" ,
45- server . WithResourceCapabilities ( true , true ),
46- server . WithInstructions (mcp .GetStreamNativeCloudServerInstructions (userName , snConfig )),
47- server . WithLogging ())
47+ // Create StreamNative Cloud session and set as default
48+ session , err := config . NewSNCloudSessionFromOptions ( configOpts . Options )
49+ if err != nil {
50+ return nil , errors . Wrap ( err , "failed to create StreamNative Cloud session" )
51+ }
52+ mcpServer = mcp . NewServer ( "streamnative-mcp- server" , "0.0.1" , logrusLogger , server . WithInstructions (mcp .GetStreamNativeCloudServerInstructions (userName , snConfig )))
53+ mcpServer . SNCloudSession = session
4854
55+ s = mcpServer .MCPServer
4956 mcp .RegisterPrompts (s )
50- mcp .RegisterContextTools (s , configOpts .Features )
57+ // Skip context tools if pulsar instance and cluster are provided via CLI
58+ skipContextTools := snConfig .Context .PulsarInstance != "" && snConfig .Context .PulsarCluster != ""
59+ mcp .RegisterContextTools (s , configOpts .Features , skipContextTools )
5160 mcp .StreamNativeAddLogTools (s , configOpts .ReadOnly , configOpts .Features )
5261 mcp .StreamNativeAddResourceTools (s , configOpts .ReadOnly , configOpts .Features )
5362 }
5463 case snConfig .ExternalKafka != nil :
5564 {
56- s = server .NewMCPServer (
57- "streamnative-mcp-server/kafka" ,
58- "0.0.1" ,
59- server .WithResourceCapabilities (true , true ),
60- server .WithInstructions (mcp .GetExternalKafkaServerInstructions (snConfig .ExternalKafka .BootstrapServers )),
61- server .WithLogging ())
65+ ksession , err := kafka .NewSession (kafka.KafkaContext {
66+ BootstrapServers : snConfig .ExternalKafka .BootstrapServers ,
67+ AuthType : snConfig .ExternalKafka .AuthType ,
68+ AuthMechanism : snConfig .ExternalKafka .AuthMechanism ,
69+ AuthUser : snConfig .ExternalKafka .AuthUser ,
70+ AuthPass : snConfig .ExternalKafka .AuthPass ,
71+ UseTLS : snConfig .ExternalKafka .UseTLS ,
72+ ClientKeyFile : snConfig .ExternalKafka .ClientKeyFile ,
73+ ClientCertFile : snConfig .ExternalKafka .ClientCertFile ,
74+ CaFile : snConfig .ExternalKafka .CaFile ,
75+ SchemaRegistryURL : snConfig .ExternalKafka .SchemaRegistryURL ,
76+ SchemaRegistryAuthUser : snConfig .ExternalKafka .SchemaRegistryAuthUser ,
77+ SchemaRegistryAuthPass : snConfig .ExternalKafka .SchemaRegistryAuthPass ,
78+ SchemaRegistryBearerToken : snConfig .ExternalKafka .SchemaRegistryBearerToken ,
79+ })
80+ if err != nil {
81+ return nil , errors .Wrap (err , "failed to set external Kafka context" )
82+ }
83+ mcpServer = mcp .NewServer ("streamnative-mcp-server" , "0.0.1" , logrusLogger , server .WithInstructions (mcp .GetExternalKafkaServerInstructions (snConfig .ExternalKafka .BootstrapServers )))
84+ mcpServer .KafkaSession = ksession
85+ s = mcpServer .MCPServer
6286 }
6387 case snConfig .ExternalPulsar != nil :
6488 {
65- s = server .NewMCPServer (
66- "streamnative-mcp-server/pulsar" ,
67- "0.0.1" ,
68- server .WithResourceCapabilities (true , true ),
69- server .WithInstructions (mcp .GetExternalPulsarServerInstructions (snConfig .ExternalPulsar .WebServiceURL )),
70- server .WithLogging ())
89+ psession , err := pulsar .NewSession (pulsar.PulsarContext {
90+ ServiceURL : snConfig .ExternalPulsar .ServiceURL ,
91+ WebServiceURL : snConfig .ExternalPulsar .WebServiceURL ,
92+ AuthPlugin : snConfig .ExternalPulsar .AuthPlugin ,
93+ AuthParams : snConfig .ExternalPulsar .AuthParams ,
94+ Token : snConfig .ExternalPulsar .Token ,
95+ TLSAllowInsecureConnection : snConfig .ExternalPulsar .TLSAllowInsecureConnection ,
96+ TLSEnableHostnameVerification : snConfig .ExternalPulsar .TLSEnableHostnameVerification ,
97+ TLSTrustCertsFilePath : snConfig .ExternalPulsar .TLSTrustCertsFilePath ,
98+ TLSCertFile : snConfig .ExternalPulsar .TLSCertFile ,
99+ TLSKeyFile : snConfig .ExternalPulsar .TLSKeyFile ,
100+ })
101+ if err != nil {
102+ return nil , errors .Wrap (err , "failed to set external Pulsar context" )
103+ }
104+ mcpServer = mcp .NewServer ("streamnative-mcp-server" , "0.0.1" , logrusLogger , server .WithInstructions (mcp .GetExternalPulsarServerInstructions (snConfig .ExternalPulsar .WebServiceURL )))
105+ mcpServer .PulsarSession = psession
106+ s = mcpServer .MCPServer
71107 }
72108 default :
73109 {
@@ -103,5 +139,5 @@ func newMcpServer(configOpts *ServerOptions, logrusLogger *logrus.Logger) *serve
103139 mcp .KafkaAdminAddKafkaConnectTools (s , configOpts .ReadOnly , configOpts .Features )
104140 mcp .KafkaClientAddConsumeTools (s , configOpts .ReadOnly , logrusLogger , configOpts .Features )
105141 mcp .KafkaClientAddProduceTools (s , configOpts .ReadOnly , configOpts .Features )
106- return s
142+ return mcpServer , nil
107143}
0 commit comments