From 295ab8f3faf9c86c316897af85b84c2d25eaf986 Mon Sep 17 00:00:00 2001 From: Max Lund Date: Fri, 17 Apr 2026 23:07:26 -0500 Subject: [PATCH] Proxy MCP OAuth callbacks on behalf of connected squadrons MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Command center now hosts a public, unauthenticated OAuth callback endpoint that IdPs can redirect to. When a squadron kicks off an MCP login it reserves a flow keyed by the OAuth `state` value; the callback handler looks up the owning squadron and forwards the code/state back over the WS bridge. Motivation: squadrons on remote hosts can't bind a loopback callback, and some IdPs require pre-registered HTTPS redirect URIs. New endpoints: - GET /oauth/callback (on outerMux, no auth — IdPs don't carry session cookies; security comes from the unguessable single-use state value). - POST /api/instances/{id}/mcp/{name}/oauth/start — browser-initiated MCP login. Forwards StartMCPLogin to squadron, returns the authorization URL for the browser to window.open. - GET /api/instances/{id}/notifications — per-instance SSE stream of ephemeral events. Currently delivers oauth_completed / oauth_failed notifications; structured to accept future event types. Hub additions: - PendingFlows (internal/oauth/flows.go) — state→{instanceID, mcpName} store with 10-minute TTL and tests. - Notifications (internal/hub/notifications.go) — per-instance fan-out with no buffering (ephemeral toasts only). - Connection dispatch handles inbound OAuthRegisterFlow from squadrons. Frontend: - api.startMcpOauth wraps the start endpoint. - subscribeInstanceNotifications SSE helper. - useInstanceNotifications hook mounted in AppLayout surfaces toasts via Sonner on oauth_completed / oauth_failed. - Tools page gains a Connect button on MCP rows. TEMP: go.mod has a local replace pointing at ../squadron-wire for the new protocol message types. Revert + bump once squadron-wire tags a new version (see sibling PR). --- go.mod | 4 + internal/api/oauth.go | 197 ++++++++++++++++++++ internal/api/routes.go | 6 + internal/hub/connection.go | 33 ++++ internal/hub/hub.go | 12 ++ internal/hub/notifications.go | 68 +++++++ internal/oauth/flows.go | 104 +++++++++++ internal/oauth/flows_test.go | 54 ++++++ internal/oauth/oauth_suite_test.go | 13 ++ internal/server/server.go | 6 +- web/src/api/client.ts | 10 + web/src/api/sse.ts | 40 ++++ web/src/components/AppLayout.tsx | 7 +- web/src/hooks/use-instance-notifications.ts | 36 ++++ web/src/pages/PluginsPage.tsx | 49 ++++- 15 files changed, 634 insertions(+), 5 deletions(-) create mode 100644 internal/api/oauth.go create mode 100644 internal/hub/notifications.go create mode 100644 internal/oauth/flows.go create mode 100644 internal/oauth/flows_test.go create mode 100644 internal/oauth/oauth_suite_test.go create mode 100644 web/src/hooks/use-instance-notifications.ts diff --git a/go.mod b/go.mod index 3c78582..2bc0593 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,10 @@ module commander go 1.25.4 +// TEMP: local replace for unpublished squadron-wire OAuth proxy messages. +// Revert + publish a new squadron-wire tag before merging. +replace github.com/mlund01/squadron-wire => ../squadron-wire + require ( github.com/gorilla/websocket v1.5.3 github.com/mlund01/squadron-wire v0.0.40 diff --git a/internal/api/oauth.go b/internal/api/oauth.go new file mode 100644 index 0000000..1ff4683 --- /dev/null +++ b/internal/api/oauth.go @@ -0,0 +1,197 @@ +package api + +import ( + "encoding/json" + "fmt" + "html" + "log" + "net/http" + "time" + + "github.com/mlund01/squadron-wire/protocol" + + "commander/internal/hub" +) + +// HandleOAuthCallback serves GET /oauth/callback, the public URL IdPs +// redirect the user's browser to after authorization. The callback is +// routed to the right squadron instance via the cryptographic `state` +// value (which squadron reserved in advance via OAuthRegisterFlow). +// +// This handler is intentionally unauthenticated — IdPs do not carry +// commander session cookies. Security comes from the state value being +// unguessable and single-use. +func HandleOAuthCallback(h *hub.Hub) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + state := q.Get("state") + code := q.Get("code") + idpErr := q.Get("error") + if idpErrDesc := q.Get("error_description"); idpErrDesc != "" && idpErr != "" { + idpErr = idpErr + ": " + idpErrDesc + } + + if state == "" { + writeOAuthErrorPage(w, "callback missing state parameter") + return + } + + flow, ok := h.PendingFlows().Claim(state) + if !ok { + writeOAuthErrorPage(w, "no matching OAuth flow (it may have expired)") + return + } + + // Forward to the originating squadron. + env, err := protocol.NewRequest(protocol.TypeOAuthCallbackDelivery, &protocol.OAuthCallbackDeliveryPayload{ + State: state, + Code: code, + Error: idpErr, + }) + if err != nil { + writeOAuthErrorPage(w, "internal error building delivery: "+err.Error()) + return + } + resp, err := h.SendRequest(flow.InstanceID, env, 30*time.Second) + if err != nil { + writeOAuthErrorPage(w, "failed to deliver callback to squadron: "+err.Error()) + return + } + if resp.Type == protocol.TypeError { + var perr protocol.ErrorPayload + _ = protocol.DecodePayload(resp, &perr) + writeOAuthErrorPage(w, "squadron rejected callback: "+perr.Message) + return + } + + // Notify any open commander tabs for this instance. + success := idpErr == "" && code != "" + noteType := "oauth_completed" + if !success { + noteType = "oauth_failed" + } + h.Notifications().Publish(flow.InstanceID, hub.Notification{ + Type: noteType, + Data: map[string]interface{}{ + "mcpName": flow.McpName, + "error": idpErr, + }, + }) + + if success { + writeOAuthSuccessPage(w, flow.McpName) + } else { + writeOAuthErrorPage(w, idpErr) + } + } +} + +// HandleStartOAuth kicks off a commander-initiated OAuth login for the +// named MCP server on the specified squadron. Returns the authorization URL +// for the browser to open in a new tab. +func HandleStartOAuth(h *hub.Hub) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + instanceID := r.PathValue("id") + mcpName := r.PathValue("name") + + env, err := protocol.NewRequest(protocol.TypeStartMCPLogin, &protocol.StartMCPLoginPayload{ + McpName: mcpName, + }) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + resp, err := h.SendRequest(instanceID, env, 30*time.Second) + if err != nil { + writeJSON(w, http.StatusBadGateway, map[string]string{"error": err.Error()}) + return + } + if resp.Type == protocol.TypeError { + var perr protocol.ErrorPayload + _ = protocol.DecodePayload(resp, &perr) + writeJSON(w, http.StatusBadGateway, map[string]string{"error": perr.Message}) + return + } + var ack protocol.StartMCPLoginAckPayload + if err := protocol.DecodePayload(resp, &ack); err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + if !ack.Accepted { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": ack.Reason}) + return + } + writeJSON(w, http.StatusOK, map[string]string{"authUrl": ack.AuthURL}) + } +} + +// HandleNotifications opens an SSE stream of per-instance notifications +// (e.g. oauth_completed). Used by the commander SPA to surface toasts. +func HandleNotifications(h *hub.Hub) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + instanceID := r.PathValue("id") + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + + ch, cleanup := h.Notifications().Subscribe(instanceID) + defer cleanup() + + // Initial comment line so the connection is flushed immediately. + fmt.Fprint(w, ": connected\n\n") + flusher.Flush() + + keepalive := time.NewTicker(30 * time.Second) + defer keepalive.Stop() + + for { + select { + case <-r.Context().Done(): + return + case note, ok := <-ch: + if !ok { + return + } + data, err := json.Marshal(note) + if err != nil { + log.Printf("notification marshal: %v", err) + continue + } + fmt.Fprintf(w, "data: %s\n\n", data) + flusher.Flush() + case <-keepalive.C: + fmt.Fprint(w, ": keepalive\n\n") + flusher.Flush() + } + } + } +} + +func writeOAuthSuccessPage(w http.ResponseWriter, mcpName string) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + _, _ = fmt.Fprintf(w, ` +Authorized + +

Authorization complete

+

%s is now connected. You can close this window.

+ +`, html.EscapeString(mcpName)) +} + +func writeOAuthErrorPage(w http.ResponseWriter, msg string) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.WriteHeader(http.StatusBadRequest) + _, _ = fmt.Fprintf(w, ` +Authorization failed + +

Authorization failed

+

%s

+

You can close this window and try again from the command center UI.

+`, html.EscapeString(msg)) +} diff --git a/internal/api/routes.go b/internal/api/routes.go index 2daa36b..fe89a19 100644 --- a/internal/api/routes.go +++ b/internal/api/routes.go @@ -69,6 +69,12 @@ func RegisterRoutes(mux *http.ServeMux, h *hub.Hub, ka *keepalive.KeepAlive) { mux.HandleFunc("GET /api/instances/{id}/agents/{name}/chats", handleChatHistory(h)) mux.HandleFunc("GET /api/instances/{id}/chats/{sessionId}/messages", handleChatMessages(h)) mux.HandleFunc("DELETE /api/instances/{id}/chats/{sessionId}", handleArchiveChat(h)) + + // OAuth proxy: start a login flow, stream completion notifications. + // The public callback endpoint (/oauth/callback) is registered separately + // on the outer mux so IdPs can reach it without auth. + mux.HandleFunc("POST /api/instances/{id}/mcp/{name}/oauth/start", HandleStartOAuth(h)) + mux.HandleFunc("GET /api/instances/{id}/notifications", HandleNotifications(h)) } func handleListInstances(h *hub.Hub) http.HandlerFunc { diff --git a/internal/hub/connection.go b/internal/hub/connection.go index 0049670..b571a06 100644 --- a/internal/hub/connection.go +++ b/internal/hub/connection.go @@ -371,11 +371,44 @@ func (c *Connection) dispatch(env *protocol.Envelope) { c.fanOutChatEvent(env) case protocol.TypeChatComplete: c.fanOutChatComplete(env) + case protocol.TypeOAuthRegisterFlow: + c.handleOAuthRegisterFlow(env) default: log.Printf("Unhandled message type: %s", env.Type) } } +// handleOAuthRegisterFlow records a pending OAuth flow for later callback +// routing. Called when a squadron kicks off an MCP login and asks commander +// to reserve the `state` value. +func (c *Connection) handleOAuthRegisterFlow(env *protocol.Envelope) { + var payload protocol.OAuthRegisterFlowPayload + if err := protocol.DecodePayload(env, &payload); err != nil { + log.Printf("Invalid oauth_register_flow payload: %v", err) + ack, _ := protocol.NewError(env.RequestID, "decode_error", err.Error()) + c.Send(ack) + return + } + if c.instanceID == "" { + ack, _ := protocol.NewError(env.RequestID, "not_registered", "instance not registered yet") + c.Send(ack) + return + } + if payload.State == "" { + ack, _ := protocol.NewResponse(env.RequestID, protocol.TypeOAuthRegisterFlowAck, &protocol.OAuthRegisterFlowAckPayload{ + Accepted: false, + Reason: "state is required", + }) + c.Send(ack) + return + } + c.hub.PendingFlows().Register(payload.State, c.instanceID, payload.McpName) + ack, _ := protocol.NewResponse(env.RequestID, protocol.TypeOAuthRegisterFlowAck, &protocol.OAuthRegisterFlowAckPayload{ + Accepted: true, + }) + c.Send(ack) +} + func (c *Connection) handleRegister(env *protocol.Envelope) { var payload protocol.RegisterPayload if err := protocol.DecodePayload(env, &payload); err != nil { diff --git a/internal/hub/hub.go b/internal/hub/hub.go index 8e01f13..57c4052 100644 --- a/internal/hub/hub.go +++ b/internal/hub/hub.go @@ -8,6 +8,8 @@ import ( "github.com/gorilla/websocket" "github.com/mlund01/squadron-wire/protocol" + + oauthflows "commander/internal/oauth" ) var upgrader = websocket.Upgrader{ @@ -19,6 +21,8 @@ type Hub struct { mu sync.RWMutex connections map[string]*Connection // instanceID → connection registry *Registry + pendingFlows *oauthflows.PendingFlows + notifications *Notifications AllowConfigEdit bool } @@ -27,10 +31,18 @@ func New(allowConfigEdit bool) *Hub { return &Hub{ connections: make(map[string]*Connection), registry: NewRegistry(), + pendingFlows: oauthflows.New(), + notifications: NewNotifications(), AllowConfigEdit: allowConfigEdit, } } +// PendingFlows returns the OAuth flow store. +func (h *Hub) PendingFlows() *oauthflows.PendingFlows { return h.pendingFlows } + +// Notifications returns the per-instance notification fan-out. +func (h *Hub) Notifications() *Notifications { return h.notifications } + // Start initializes background tasks (heartbeat, cleanup, etc.). func (h *Hub) Start() { // TODO: Start heartbeat ticker diff --git a/internal/hub/notifications.go b/internal/hub/notifications.go new file mode 100644 index 0000000..38c2328 --- /dev/null +++ b/internal/hub/notifications.go @@ -0,0 +1,68 @@ +package hub + +import ( + "sync" + "time" +) + +// Notification is a generic per-instance event pushed to any open browser +// tab subscribed to that instance. Initially used to confirm OAuth-proxy +// MCP logins; designed to accept future types without schema churn. +type Notification struct { + Type string `json:"type"` // e.g. "oauth_completed" + Timestamp time.Time `json:"timestamp"` + Data map[string]interface{} `json:"data,omitempty"` +} + +// Notifications fans out per-instance notifications to SSE subscribers. +// Unlike the mission-event fan-out on Connection, notifications are keyed +// by instanceID (not missionID) and have no buffer — they are ephemeral +// hints, not reliable history. Subscribers that aren't listening when an +// event fires will miss it. +type Notifications struct { + mu sync.Mutex + subs map[string][]chan Notification // instanceID → subscribers +} + +// NewNotifications creates an empty fan-out. +func NewNotifications() *Notifications { + return &Notifications{subs: make(map[string][]chan Notification)} +} + +// Subscribe returns a channel for the given instance's notifications and a +// cleanup function to remove the subscription. +func (n *Notifications) Subscribe(instanceID string) (chan Notification, func()) { + ch := make(chan Notification, 16) + n.mu.Lock() + n.subs[instanceID] = append(n.subs[instanceID], ch) + n.mu.Unlock() + return ch, func() { + n.mu.Lock() + defer n.mu.Unlock() + subs := n.subs[instanceID] + for i, s := range subs { + if s == ch { + n.subs[instanceID] = append(subs[:i], subs[i+1:]...) + break + } + } + close(ch) + } +} + +// Publish delivers a notification to all subscribers for the instance. +// Slow subscribers are skipped (no blocking). +func (n *Notifications) Publish(instanceID string, note Notification) { + if note.Timestamp.IsZero() { + note.Timestamp = time.Now() + } + n.mu.Lock() + subs := append([]chan Notification(nil), n.subs[instanceID]...) + n.mu.Unlock() + for _, ch := range subs { + select { + case ch <- note: + default: + } + } +} diff --git a/internal/oauth/flows.go b/internal/oauth/flows.go new file mode 100644 index 0000000..97cf1da --- /dev/null +++ b/internal/oauth/flows.go @@ -0,0 +1,104 @@ +// Package oauth is the command center side of the OAuth proxy. +// +// When a squadron instance wants to authenticate against an MCP server's +// OAuth provider, it asks commander to reserve an entry in the flow store +// keyed by the cryptographic `state` value. When the IdP later redirects +// the user's browser to `/oauth/callback`, commander looks up the +// state, finds the owning instance, and forwards the callback params back +// over the WS bridge. +// +// Flows expire after 10 minutes to keep the store bounded even when users +// abandon the IdP tab. +package oauth + +import ( + "sync" + "time" +) + +// DefaultFlowTTL is how long a reserved flow remains claimable before it's +// evicted by the background sweeper. +const DefaultFlowTTL = 10 * time.Minute + +// PendingFlow is the per-state record kept while a login is in progress. +type PendingFlow struct { + InstanceID string + McpName string + CreatedAt time.Time +} + +// PendingFlows is a thread-safe store of OAuth flows awaiting callback. +type PendingFlows struct { + mu sync.Mutex + ttl time.Duration + now func() time.Time + flows map[string]PendingFlow +} + +// New creates a new PendingFlows store with the default TTL. +func New() *PendingFlows { + return NewWithTTL(DefaultFlowTTL) +} + +// NewWithTTL creates a PendingFlows with a custom TTL (used in tests). +func NewWithTTL(ttl time.Duration) *PendingFlows { + return &PendingFlows{ + ttl: ttl, + now: time.Now, + flows: make(map[string]PendingFlow), + } +} + +// Register stores a flow under the given state value. If a flow for the +// same state already exists it is overwritten (the new request wins — state +// collisions in practice are vanishingly unlikely and indicate a buggy +// client; the latest registrant is the best guess). +func (p *PendingFlows) Register(state, instanceID, mcpName string) { + p.mu.Lock() + defer p.mu.Unlock() + p.flows[state] = PendingFlow{ + InstanceID: instanceID, + McpName: mcpName, + CreatedAt: p.now(), + } +} + +// Claim removes and returns the flow for the given state. The second +// return value is false if no flow is registered or it has expired. +// Callback delivery is one-shot: the entry is removed on successful claim. +func (p *PendingFlows) Claim(state string) (PendingFlow, bool) { + p.mu.Lock() + defer p.mu.Unlock() + f, ok := p.flows[state] + if !ok { + return PendingFlow{}, false + } + delete(p.flows, state) + if p.now().Sub(f.CreatedAt) > p.ttl { + return PendingFlow{}, false + } + return f, true +} + +// Sweep evicts expired flows. Safe to call periodically from a background +// goroutine; the store is also self-cleaning via Claim. +func (p *PendingFlows) Sweep() int { + p.mu.Lock() + defer p.mu.Unlock() + n := 0 + now := p.now() + for state, f := range p.flows { + if now.Sub(f.CreatedAt) > p.ttl { + delete(p.flows, state) + n++ + } + } + return n +} + +// Len returns the current number of registered flows (for tests/metrics). +func (p *PendingFlows) Len() int { + p.mu.Lock() + defer p.mu.Unlock() + return len(p.flows) +} diff --git a/internal/oauth/flows_test.go b/internal/oauth/flows_test.go new file mode 100644 index 0000000..2b03995 --- /dev/null +++ b/internal/oauth/flows_test.go @@ -0,0 +1,54 @@ +package oauth_test + +import ( + "time" + + "commander/internal/oauth" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("PendingFlows", func() { + It("registers and claims a flow", func() { + p := oauth.New() + p.Register("abc", "inst-1", "linear") + f, ok := p.Claim("abc") + Expect(ok).To(BeTrue()) + Expect(f.InstanceID).To(Equal("inst-1")) + Expect(f.McpName).To(Equal("linear")) + }) + + It("returns false for unknown state", func() { + p := oauth.New() + _, ok := p.Claim("does-not-exist") + Expect(ok).To(BeFalse()) + }) + + It("is one-shot — second claim returns false", func() { + p := oauth.New() + p.Register("abc", "inst-1", "linear") + _, ok := p.Claim("abc") + Expect(ok).To(BeTrue()) + _, ok = p.Claim("abc") + Expect(ok).To(BeFalse()) + }) + + It("evicts expired flows via Claim", func() { + p := oauth.NewWithTTL(10 * time.Millisecond) + p.Register("abc", "inst-1", "linear") + time.Sleep(20 * time.Millisecond) + _, ok := p.Claim("abc") + Expect(ok).To(BeFalse()) + }) + + It("Sweep evicts expired flows", func() { + p := oauth.NewWithTTL(10 * time.Millisecond) + p.Register("a", "inst-1", "x") + p.Register("b", "inst-1", "x") + Expect(p.Len()).To(Equal(2)) + time.Sleep(20 * time.Millisecond) + Expect(p.Sweep()).To(Equal(2)) + Expect(p.Len()).To(Equal(0)) + }) +}) diff --git a/internal/oauth/oauth_suite_test.go b/internal/oauth/oauth_suite_test.go new file mode 100644 index 0000000..1c52495 --- /dev/null +++ b/internal/oauth/oauth_suite_test.go @@ -0,0 +1,13 @@ +package oauth_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestOAuth(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "OAuth Flows Suite") +} diff --git a/internal/server/server.go b/internal/server/server.go index 5cf9660..ef03177 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -44,9 +44,13 @@ func New(addr string, webFS fs.FS, allowConfigEdit bool, ka *keepalive.KeepAlive // Outer mux: routes /ws directly to the hub (bypassing auth — it's // machine-to-machine for squadron instances) and forwards everything - // else to the (optionally protected) inner mux. + // else to the (optionally protected) inner mux. /oauth/callback is also + // unauthenticated so OAuth IdPs (which don't carry session cookies) can + // reach it; security comes from the state parameter being unguessable + // and single-use. outerMux := http.NewServeMux() outerMux.HandleFunc("/ws", h.ServeWS) + outerMux.HandleFunc("GET /oauth/callback", api.HandleOAuthCallback(h)) outerMux.Handle("/", protectedHandler) return &Server{ diff --git a/web/src/api/client.ts b/web/src/api/client.ts index 5df4a19..33b14b5 100644 --- a/web/src/api/client.ts +++ b/web/src/api/client.ts @@ -235,6 +235,16 @@ export interface CostSummaryResponse { }>; } +// startMcpOauth asks commander to ask the squadron to kick off an OAuth +// login for the named MCP server. Returns the authorization URL — the +// caller is expected to open it in a new tab. The IdP redirect will land +// on commander's /oauth/callback and complete the flow back over WS. +export async function startMcpOauth(instanceId: string, mcpName: string): Promise<{ authUrl: string }> { + return fetchJSON<{ authUrl: string }>(`/instances/${instanceId}/mcp/${encodeURIComponent(mcpName)}/oauth/start`, { + method: 'POST', + }); +} + export async function getCostSummary(instanceId: string, from?: string, to?: string, groupBy?: string, breakdownField?: string): Promise { const params = new URLSearchParams(); if (from) params.set('from', from); diff --git a/web/src/api/sse.ts b/web/src/api/sse.ts index f783251..561f58f 100644 --- a/web/src/api/sse.ts +++ b/web/src/api/sse.ts @@ -63,3 +63,43 @@ export function subscribeMissionEvents( return { close: () => es.close() }; } + +export interface InstanceNotification { + type: string; // "oauth_completed" | "oauth_failed" | (future) + timestamp?: string; + data?: Record; +} + +export interface InstanceNotificationSource { + close: () => void; +} + +// subscribeInstanceNotifications opens an SSE stream for per-instance +// notifications (OAuth completions, future alert types). Unlike mission +// event streams, notifications are ephemeral — subscribers that aren't +// listening when an event fires will miss it. +export function subscribeInstanceNotifications( + instanceId: string, + onNotification: (note: InstanceNotification) => void, +): InstanceNotificationSource { + const url = `/api/instances/${instanceId}/notifications`; + const es = new EventSource(url); + + es.onmessage = (e) => { + try { + const note: InstanceNotification = JSON.parse(e.data); + onNotification(note); + } catch { + // Skip malformed events + } + }; + + es.onerror = () => { + // Notifications are best-effort; no redirect-on-401 here. + if (es.readyState === EventSource.CLOSED) { + return; + } + }; + + return { close: () => es.close() }; +} diff --git a/web/src/components/AppLayout.tsx b/web/src/components/AppLayout.tsx index 93b3ba9..a315c52 100644 --- a/web/src/components/AppLayout.tsx +++ b/web/src/components/AppLayout.tsx @@ -1,9 +1,14 @@ -import { Outlet } from 'react-router-dom'; +import { Outlet, useParams } from 'react-router-dom'; import { SidebarProvider, SidebarInset } from '@/components/ui/sidebar'; import { TooltipProvider } from '@/components/ui/tooltip'; import { AppSidebar } from './AppSidebar'; +import { useInstanceNotifications } from '@/hooks/use-instance-notifications'; export function AppLayout() { + const { id } = useParams<{ id: string }>(); + // Subscribe once per instance at the shell level so toasts fire regardless + // of which page is mounted. + useInstanceNotifications(id); return ( diff --git a/web/src/hooks/use-instance-notifications.ts b/web/src/hooks/use-instance-notifications.ts new file mode 100644 index 0000000..ae1088d --- /dev/null +++ b/web/src/hooks/use-instance-notifications.ts @@ -0,0 +1,36 @@ +import { useEffect } from 'react'; +import { toast } from 'sonner'; +import { subscribeInstanceNotifications, type InstanceNotification } from '@/api/sse'; + +// useInstanceNotifications subscribes to per-instance notifications and +// surfaces them as toasts. Currently handles OAuth-proxy completion events; +// extend the switch as new notification types are added. +export function useInstanceNotifications(instanceId: string | undefined) { + useEffect(() => { + if (!instanceId) return; + const sub = subscribeInstanceNotifications(instanceId, (note) => { + handleNotification(note); + }); + return () => sub.close(); + }, [instanceId]); +} + +function handleNotification(note: InstanceNotification) { + switch (note.type) { + case 'oauth_completed': { + const name = (note.data?.mcpName as string | undefined) ?? 'MCP server'; + toast.success(`${name} connected`); + break; + } + case 'oauth_failed': { + const name = (note.data?.mcpName as string | undefined) ?? 'MCP server'; + const err = (note.data?.error as string | undefined) ?? 'authorization failed'; + toast.error(`${name}: ${err}`); + break; + } + default: + // Unknown notification types are silently ignored so the backend can + // roll out new types without a simultaneous UI release. + break; + } +} diff --git a/web/src/pages/PluginsPage.tsx b/web/src/pages/PluginsPage.tsx index 5ab8c3c..f399d40 100644 --- a/web/src/pages/PluginsPage.tsx +++ b/web/src/pages/PluginsPage.tsx @@ -1,10 +1,12 @@ import { useState, useMemo } from 'react'; import { useQuery } from '@tanstack/react-query'; import { useParams } from 'react-router-dom'; -import { ChevronRight, Info } from 'lucide-react'; -import { getInstance } from '@/api/client'; +import { toast } from 'sonner'; +import { ChevronRight, Info, KeyRound } from 'lucide-react'; +import { getInstance, startMcpOauth } from '@/api/client'; import { Alert, AlertDescription } from '@/components/ui/alert'; import { Badge } from '@/components/ui/badge'; +import { Button } from '@/components/ui/button'; import { PageStats } from '@/components/page-stats'; import { Dialog, @@ -191,7 +193,12 @@ function PluginRow({ plugin, kind, isOpen, toolCount, onToggle, onToolClick }: P {plugin.version || '—'} - {toolCount} + +
+ {kind === 'mcp' && } + {toolCount} +
+
{isOpen && toolCount > 0 && ( @@ -208,6 +215,42 @@ function PluginRow({ plugin, kind, isOpen, toolCount, onToggle, onToolClick }: P ); } +// McpConnectButton kicks off an OAuth login for the named MCP server by +// asking commander to forward a StartMCPLogin request to the squadron. +// The squadron returns the IdP authorization URL, which we open in a new +// tab; the IdP later redirects to commander's /oauth/callback and the +// flow completes out of band. A toast (from useInstanceNotifications) +// confirms success or failure. +function McpConnectButton({ mcpName }: { mcpName: string }) { + const { id } = useParams<{ id: string }>(); + const [loading, setLoading] = useState(false); + async function handleClick(e: React.MouseEvent) { + e.stopPropagation(); + if (!id) return; + setLoading(true); + try { + const { authUrl } = await startMcpOauth(id, mcpName); + window.open(authUrl, '_blank', 'noopener,noreferrer'); + } catch (err) { + toast.error(`Could not start OAuth: ${err instanceof Error ? err.message : String(err)}`); + } finally { + setLoading(false); + } + } + return ( + + ); +} + function ToolRow({ tool, onClick }: { tool: ToolInfo; onClick: () => void }) { return (