Skip to content

Commit a86d6f8

Browse files
authored
feat: Enhance Pulsar Function Manager with cluster error handling (#57)
1 parent 89b6d81 commit a86d6f8

File tree

4 files changed

+72
-41
lines changed

4 files changed

+72
-41
lines changed

pkg/mcp/pftools/errors.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,21 @@
11
package pftools
22

3-
import "errors"
3+
import (
4+
"errors"
5+
"strings"
6+
7+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
8+
)
49

510
var (
611
ErrFunctionNotFound = errors.New("function not found")
712
ErrNotOurMessage = errors.New("not our message")
813
)
14+
15+
// IsClusterUnhealthy checks if an error indicates cluster health issues
16+
func IsClusterUnhealthy(err error) bool {
17+
if restErr, ok := err.(rest.Error); ok {
18+
return restErr.Code == 503 && strings.Contains(restErr.Reason, "no healthy upstream")
19+
}
20+
return false
21+
}

pkg/mcp/pftools/manager.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -89,23 +89,24 @@ func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerO
8989

9090
// Create the manager
9191
manager := &PulsarFunctionManager{
92-
adminClient: adminClient,
93-
v2adminClient: v2adminClient,
94-
pulsarClient: pulsarClient,
95-
fnToToolMap: make(map[string]*FunctionTool),
96-
mutex: sync.RWMutex{},
97-
producerCache: make(map[string]pulsarclient.Producer),
98-
producerMutex: sync.RWMutex{},
99-
pollInterval: options.PollInterval,
100-
stopCh: make(chan struct{}),
101-
callInProgressMap: make(map[string]context.CancelFunc),
102-
mcpServer: snServer.MCPServer,
103-
readOnly: readOnly,
104-
defaultTimeout: options.DefaultTimeout,
105-
circuitBreakers: make(map[string]*CircuitBreaker),
106-
tenantNamespaces: options.TenantNamespaces,
107-
strictExport: options.StrictExport,
108-
sessionID: sessionID,
92+
adminClient: adminClient,
93+
v2adminClient: v2adminClient,
94+
pulsarClient: pulsarClient,
95+
fnToToolMap: make(map[string]*FunctionTool),
96+
mutex: sync.RWMutex{},
97+
producerCache: make(map[string]pulsarclient.Producer),
98+
producerMutex: sync.RWMutex{},
99+
pollInterval: options.PollInterval,
100+
stopCh: make(chan struct{}),
101+
callInProgressMap: make(map[string]context.CancelFunc),
102+
mcpServer: snServer.MCPServer,
103+
readOnly: readOnly,
104+
defaultTimeout: options.DefaultTimeout,
105+
circuitBreakers: make(map[string]*CircuitBreaker),
106+
tenantNamespaces: options.TenantNamespaces,
107+
strictExport: options.StrictExport,
108+
sessionID: sessionID,
109+
clusterErrorHandler: options.ClusterErrorHandler,
109110
}
110111

111112
return manager, nil
@@ -154,6 +155,11 @@ func (m *PulsarFunctionManager) updateFunctions() {
154155
functions, err := m.getFunctionsList()
155156
if err != nil {
156157
log.Printf("Failed to get functions list: %v", err)
158+
159+
// Check if this is a cluster health error and invoke callback if configured
160+
if IsClusterUnhealthy(err) && m.clusterErrorHandler != nil {
161+
go m.clusterErrorHandler(m, err)
162+
}
157163
return
158164
}
159165

pkg/mcp/pftools/types.go

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,23 +31,24 @@ import (
3131

3232
// PulsarFunctionManager manages the lifecycle of Pulsar Functions as MCP tools
3333
type PulsarFunctionManager struct {
34-
adminClient cmdutils.Client
35-
v2adminClient cmdutils.Client
36-
pulsarClient pulsar.Client
37-
fnToToolMap map[string]*FunctionTool
38-
mutex sync.RWMutex
39-
producerCache map[string]pulsar.Producer
40-
producerMutex sync.RWMutex
41-
pollInterval time.Duration
42-
stopCh chan struct{}
43-
callInProgressMap map[string]context.CancelFunc
44-
mcpServer *server.MCPServer
45-
readOnly bool
46-
defaultTimeout time.Duration
47-
circuitBreakers map[string]*CircuitBreaker
48-
tenantNamespaces []string
49-
strictExport bool
50-
sessionID string
34+
adminClient cmdutils.Client
35+
v2adminClient cmdutils.Client
36+
pulsarClient pulsar.Client
37+
fnToToolMap map[string]*FunctionTool
38+
mutex sync.RWMutex
39+
producerCache map[string]pulsar.Producer
40+
producerMutex sync.RWMutex
41+
pollInterval time.Duration
42+
stopCh chan struct{}
43+
callInProgressMap map[string]context.CancelFunc
44+
mcpServer *server.MCPServer
45+
readOnly bool
46+
defaultTimeout time.Duration
47+
circuitBreakers map[string]*CircuitBreaker
48+
tenantNamespaces []string
49+
strictExport bool
50+
sessionID string
51+
clusterErrorHandler ClusterErrorHandler
5152
}
5253

5354
type FunctionTool struct {
@@ -84,13 +85,16 @@ const (
8485
StateClosed
8586
)
8687

88+
type ClusterErrorHandler func(*PulsarFunctionManager, error)
89+
8790
type ManagerOptions struct {
88-
PollInterval time.Duration
89-
DefaultTimeout time.Duration
90-
FailureThreshold int
91-
ResetTimeout time.Duration
92-
TenantNamespaces []string
93-
StrictExport bool
91+
PollInterval time.Duration
92+
DefaultTimeout time.Duration
93+
FailureThreshold int
94+
ResetTimeout time.Duration
95+
TenantNamespaces []string
96+
StrictExport bool
97+
ClusterErrorHandler ClusterErrorHandler
9498
}
9599

96100
func DefaultManagerOptions() *ManagerOptions {

pkg/mcp/pulsar_functions_as_tools.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string,
6666

6767
options := pftools2.DefaultManagerOptions()
6868

69+
// Configure cluster error handler for graceful cleanup
70+
options.ClusterErrorHandler = func(_ *pftools2.PulsarFunctionManager, err error) {
71+
log.Printf("Cluster health error detected: %v", err)
72+
log.Printf("Consider implementing cleanup logic here (e.g., stopping manager, notifying monitoring systems)")
73+
// The calling service can implement specific cleanup logic here
74+
// For example: stop the manager, send alerts, implement backoff strategies
75+
}
76+
6977
if s.SNCloudSession.Ctx.Organization == "" || s.SNCloudSession.Ctx.PulsarInstance == "" || s.SNCloudSession.Ctx.PulsarCluster == "" {
7078
log.Printf("Skipping Pulsar Functions as MCP Tools because both organization, pulsar instance and pulsar cluster are not set")
7179
return

0 commit comments

Comments
 (0)