Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ SCRIPTS_SRC = $(shell find . \( -name '*.sh' -o -name '*.py' -o -name '*.mk' -o
-not -path './vendor/*' \
-not -path './idl/*' \
-not -path './jaeger-ui/*' \
-not -path './python-sidecar/*' \
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excluding python-sidecar from SCRIPTS_SRC means these new .py/.md files won't be covered by the repo's formatting/license checks (make fmt / make lint-license). If this code is intended to live in-repo (even as a PoC), consider keeping it in the lint set and adding the standard license headers instead of opting out.

Suggested change
-not -path './python-sidecar/*' \

Copilot uses AI. Check for mistakes.
-type f | \
sort)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) 2026 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package jaegerai

import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/coder/acp-go-sdk"
"github.com/gorilla/websocket"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerquery/querysvc"
)

const endOfTurnMarker = "__END_OF_TURN__"

// ChatRequest is the incoming payload
type ChatRequest struct {
Prompt string `json:"prompt"`
}

// ChatHandler manages the AI gateway requests
type ChatHandler struct {
Logger *zap.Logger
QueryService *querysvc.QueryService
}

func NewChatHandler(logger *zap.Logger, queryService *querysvc.QueryService) *ChatHandler {
return &ChatHandler{Logger: logger, QueryService: queryService}
}

func (h *ChatHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

var req ChatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad request", http.StatusBadRequest)
return
}
Comment on lines +43 to +47
Comment on lines +43 to +47
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request body is decoded without any size limit, so a large POST can cause excessive memory usage. Consider wrapping r.Body with http.MaxBytesReader (and optionally validating req.Prompt length / non-empty) before decoding.

Copilot uses AI. Check for mistakes.

flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

Comment on lines +37 to +58
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces a new public HTTP endpoint (/api/ai/chat) with streaming behavior and external sidecar dependency, but there are no corresponding tests. Consider adding at least a basic handler test (similar to existing http_handler_test.go patterns) that exercises request validation, sidecar dial failure handling, and streaming output on a mocked websocket/ACP connection.

Copilot uses AI. Check for mistakes.
ctx := r.Context()
dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
conn, resp, err := dialer.DialContext(ctx, "ws://localhost:9000", nil)
Comment on lines +60 to +61
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The websocket sidecar endpoint is hard-coded to ws://localhost:9000, which makes the feature unusable in non-local deployments and complicates ops (sidecar on different host/port, TLS, etc.). Please make the sidecar URL configurable (e.g., via QueryOptions / env var) and validate it at startup.

Copilot uses AI. Check for mistakes.
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
}
if err != nil {
h.Logger.Error("Failed to dial ACP sidecar", zap.Error(err))
http.Error(w, "Failed to connect to agent backend", http.StatusBadGateway)
return
Comment on lines +60 to +68
}
defer conn.Close()

adapter := NewWsAdapter(conn)

clientImpl := &streamingClient{
requestCtx: ctx,
w: w,
flusher: flusher,
doneCh: make(chan struct{}),
}
acpConn := acp.NewClientSideConnection(clientImpl, adapter, adapter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the connection to the sidecar? please add comments like this


acpCtx, cancel := context.WithCancel(ctx)
defer cancel()

_, err = acpConn.Initialize(acpCtx, acp.InitializeRequest{
ProtocolVersion: acp.ProtocolVersionNumber,
ClientCapabilities: acp.ClientCapabilities{
Fs: acp.FileSystemCapability{ReadTextFile: false, WriteTextFile: false},
Terminal: false,
},
ClientInfo: &acp.Implementation{
Name: "jaeger-ai-gateway",
Version: "0.1.0",
},
})
if err != nil {
fmt.Fprintf(w, "Error initializing agent: %v\n", err)
return
}

sess, err := acpConn.NewSession(acpCtx, acp.NewSessionRequest{
Cwd: "/",
McpServers: []acp.McpServer{},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

architecturally what does having MCP servers here mean?

})
if err != nil {
fmt.Fprintf(w, "Error creating session: %v\n", err)
return
}

// This is blocking until the agent finishes processing the prompt
_, err = acpConn.Prompt(acpCtx, acp.PromptRequest{
SessionId: sess.SessionId,
Prompt: []acp.ContentBlock{acp.TextBlock(req.Prompt)},
})
if err != nil {
fmt.Fprintf(w, "Error starting prompt: %v\n", err)
return
}
Comment on lines +96 to +118

// Wait for explicit end-of-turn marker from the sidecar, with timeout fallback.
clientImpl.waitForTurnCompletion(acpCtx, 2*time.Second)
Comment on lines +120 to +121
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a fixed 2s wait for the end-of-turn marker risks truncating responses for longer model/tool runs. Please either wait indefinitely until the marker/ctx cancellation, or make the timeout configurable and large enough for real traces.

Copilot uses AI. Check for mistakes.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright (c) 2026 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package jaegerai

import (
"context"
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/coder/acp-go-sdk"
)

// streamingClient implements acp.Client to handle callbacks and streaming text.
type streamingClient struct {
requestCtx context.Context
w http.ResponseWriter
flusher http.Flusher
mu sync.Mutex
closed bool
doneCh chan struct{}
doneOnce sync.Once
}

func (c *streamingClient) signalDone() {
c.doneOnce.Do(func() {
if c.doneCh != nil {
close(c.doneCh)
}
})
}

func (c *streamingClient) writeAndFlush(text string) {
c.mu.Lock()
defer c.mu.Unlock()

if c.closed {
return
}

if c.requestCtx != nil {
select {
case <-c.requestCtx.Done():
c.closed = true
c.signalDone()
return
default:
}
}

defer func() {
if recover() != nil {
c.closed = true
c.signalDone()
}
}()

if _, err := io.WriteString(c.w, text); err != nil {
c.closed = true
c.signalDone()
return
}

c.flusher.Flush()
}

func (c *streamingClient) waitForTurnCompletion(ctx context.Context, maxWait time.Duration) {
if maxWait <= 0 {
return
}

maxTimer := time.NewTimer(maxWait)
defer maxTimer.Stop()

select {
case <-ctx.Done():
return
case <-maxTimer.C:
return
case <-c.doneCh:
return
}
}

func (*streamingClient) RequestPermission(_ context.Context, p acp.RequestPermissionRequest) (acp.RequestPermissionResponse, error) {
if len(p.Options) == 0 {
return acp.RequestPermissionResponse{
Outcome: acp.RequestPermissionOutcome{
Cancelled: &acp.RequestPermissionOutcomeCancelled{},
},
}, nil
}
return acp.RequestPermissionResponse{
Outcome: acp.RequestPermissionOutcome{
Selected: &acp.RequestPermissionOutcomeSelected{OptionId: p.Options[0].OptionId},
},
}, nil
}

func (c *streamingClient) SessionUpdate(_ context.Context, n acp.SessionNotification) error {
u := n.Update
if u.AgentMessageChunk != nil {
content := u.AgentMessageChunk.Content
if content.Text != nil {
if content.Text.Text == endOfTurnMarker {
c.signalDone()
} else {
c.writeAndFlush(content.Text.Text)
}
}
}
if u.ToolCall != nil {
c.writeAndFlush(fmt.Sprintf("\n[tool_call] %s\n", u.ToolCall.Title))
}
if u.ToolCallUpdate != nil {
c.writeAndFlush(fmt.Sprintf("\n[tool_result] id=%s status=%s\n", u.ToolCallUpdate.ToolCallId, valueOrUnknown(u.ToolCallUpdate.Status)))
}
return nil
}

func valueOrUnknown(v *acp.ToolCallStatus) string {
if v == nil {
return "unknown"
}
return string(*v)
}

func (*streamingClient) WriteTextFile(_ context.Context, _ acp.WriteTextFileRequest) (acp.WriteTextFileResponse, error) {
return acp.WriteTextFileResponse{}, nil
}

func (*streamingClient) ReadTextFile(_ context.Context, p acp.ReadTextFileRequest) (acp.ReadTextFileResponse, error) {
return acp.ReadTextFileResponse{Content: "unsupported path: " + p.Path}, nil
}

func (*streamingClient) CreateTerminal(_ context.Context, _ acp.CreateTerminalRequest) (acp.CreateTerminalResponse, error) {
return acp.CreateTerminalResponse{TerminalId: "t-1"}, nil
}

func (*streamingClient) KillTerminalCommand(_ context.Context, _ acp.KillTerminalCommandRequest) (acp.KillTerminalCommandResponse, error) {
return acp.KillTerminalCommandResponse{}, nil
}

func (*streamingClient) ReleaseTerminal(_ context.Context, _ acp.ReleaseTerminalRequest) (acp.ReleaseTerminalResponse, error) {
return acp.ReleaseTerminalResponse{}, nil
}

func (*streamingClient) TerminalOutput(_ context.Context, _ acp.TerminalOutputRequest) (acp.TerminalOutputResponse, error) {
return acp.TerminalOutputResponse{Output: "ok", Truncated: false}, nil
}

func (*streamingClient) WaitForTerminalExit(_ context.Context, _ acp.WaitForTerminalExitRequest) (acp.WaitForTerminalExitResponse, error) {
return acp.WaitForTerminalExitResponse{}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2026 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package jaegerai

import (
"fmt"
"io"

"github.com/gorilla/websocket"
)

// WsReadWriteCloser wraps a gorilla websocket to implement io.ReadWriteCloser.
type WsReadWriteCloser struct {
conn *websocket.Conn
r io.Reader
}

func NewWsAdapter(conn *websocket.Conn) *WsReadWriteCloser {
return &WsReadWriteCloser{conn: conn}
}

func (w *WsReadWriteCloser) Read(p []byte) (int, error) {
if w.r == nil {
messageType, r, err := w.conn.NextReader()
if err != nil {
return 0, err
}
if messageType != websocket.TextMessage && messageType != websocket.BinaryMessage {
return 0, fmt.Errorf("unexpected message type: %d", messageType)
}
w.r = r
}

n, err := w.r.Read(p)
if err == io.EOF {
w.r = nil
if n > 0 {
return n, nil
}
return w.Read(p)
}
return n, err
Comment on lines +24 to +43
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adapter uses recursion on io.EOF (calling Read() again). If the peer sends a sequence of empty frames/messages, this can recurse deeply and risk stack growth. Prefer a loop that resets w.r and continues reading until non-EOF data or a real error.

Suggested change
if w.r == nil {
messageType, r, err := w.conn.NextReader()
if err != nil {
return 0, err
}
if messageType != websocket.TextMessage && messageType != websocket.BinaryMessage {
return 0, fmt.Errorf("unexpected message type: %d", messageType)
}
w.r = r
}
n, err := w.r.Read(p)
if err == io.EOF {
w.r = nil
if n > 0 {
return n, nil
}
return w.Read(p)
}
return n, err
for {
if w.r == nil {
messageType, r, err := w.conn.NextReader()
if err != nil {
return 0, err
}
if messageType != websocket.TextMessage && messageType != websocket.BinaryMessage {
return 0, fmt.Errorf("unexpected message type: %d", messageType)
}
w.r = r
}
n, err := w.r.Read(p)
if err == io.EOF {
w.r = nil
if n > 0 {
return n, nil
}
// n == 0: advance to the next message/reader
continue
}
return n, err
}

Copilot uses AI. Check for mistakes.
}

func (w *WsReadWriteCloser) Write(p []byte) (int, error) {
err := w.conn.WriteMessage(websocket.TextMessage, p)
if err != nil {
return 0, err
}
return len(p), nil
}

func (w *WsReadWriteCloser) Close() error {
return w.conn.Close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/jaegertracing/jaeger-idl/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerquery/internal/apiv3"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerquery/internal/jaegerai"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerquery/querysvc"
"github.com/jaegertracing/jaeger/internal/auth/bearertoken"
"github.com/jaegertracing/jaeger/internal/proto/api_v3"
Expand Down Expand Up @@ -189,6 +190,14 @@ func initRouter(
if queryOpts.BasePath != "" && queryOpts.BasePath != "/" {
apiNotFoundPattern = queryOpts.BasePath + apiNotFoundPattern
}

// AI Gateway Endpoints
aiHandlerPath := "/api/ai/chat"
if queryOpts.BasePath != "" && queryOpts.BasePath != "/" {
aiHandlerPath = queryOpts.BasePath + aiHandlerPath
}
r.HandleFunc(aiHandlerPath, jaegerai.NewChatHandler(telset.Logger, querySvc).ServeHTTP)

Comment on lines +194 to +200
r.HandleFunc(apiNotFoundPattern, func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "404 page not found", http.StatusNotFound)
})
Expand Down
Loading
Loading