Skip to content

Commit b838fe1

Browse files
feat: responses added to logs
1 parent 18785e9 commit b838fe1

File tree

12 files changed

+1104
-298
lines changed

12 files changed

+1104
-298
lines changed

framework/logstore/tables.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,11 @@ type Log struct {
7676
Model string `gorm:"type:varchar(255);index;not null" json:"model"`
7777
InputHistory string `gorm:"type:text" json:"-"` // JSON serialized []schemas.ChatMessage
7878
OutputMessage string `gorm:"type:text" json:"-"` // JSON serialized *schemas.ChatMessage
79+
ResponsesOutput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.ResponsesMessage
7980
EmbeddingOutput string `gorm:"type:text" json:"-"` // JSON serialized [][]float32
8081
Params string `gorm:"type:text" json:"-"` // JSON serialized *schemas.ModelParameters
8182
Tools string `gorm:"type:text" json:"-"` // JSON serialized []schemas.Tool
82-
ToolCalls string `gorm:"type:text" json:"-"` // JSON serialized []schemas.ToolCall
83+
ToolCalls string `gorm:"type:text" json:"-"` // JSON serialized []schemas.ToolCall (For backward compatibility, tool calls are now in the content)
8384
SpeechInput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.SpeechInput
8485
TranscriptionInput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.TranscriptionInput
8586
SpeechOutput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.BifrostSpeech
@@ -104,10 +105,11 @@ type Log struct {
104105
// Virtual fields for JSON output - these will be populated when needed
105106
InputHistoryParsed []schemas.ChatMessage `gorm:"-" json:"input_history,omitempty"`
106107
OutputMessageParsed *schemas.ChatMessage `gorm:"-" json:"output_message,omitempty"`
108+
ResponsesOutputParsed []schemas.ResponsesMessage `gorm:"-" json:"responses_output,omitempty"`
107109
EmbeddingOutputParsed []schemas.BifrostEmbedding `gorm:"-" json:"embedding_output,omitempty"`
108110
ParamsParsed interface{} `gorm:"-" json:"params,omitempty"`
109111
ToolsParsed []schemas.ChatTool `gorm:"-" json:"tools,omitempty"`
110-
ToolCallsParsed []schemas.ChatAssistantMessageToolCall `gorm:"-" json:"tool_calls,omitempty"`
112+
ToolCallsParsed []schemas.ChatAssistantMessageToolCall `gorm:"-" json:"tool_calls,omitempty"` // For backward compatibility, tool calls are now in the content
111113
TokenUsageParsed *schemas.LLMUsage `gorm:"-" json:"token_usage,omitempty"`
112114
ErrorDetailsParsed *schemas.BifrostError `gorm:"-" json:"error_details,omitempty"`
113115
SpeechInputParsed *schemas.SpeechInput `gorm:"-" json:"speech_input,omitempty"`
@@ -158,6 +160,14 @@ func (l *Log) SerializeFields() error {
158160
}
159161
}
160162

163+
if l.ResponsesOutputParsed != nil {
164+
if data, err := json.Marshal(l.ResponsesOutputParsed); err != nil {
165+
return err
166+
} else {
167+
l.ResponsesOutput = string(data)
168+
}
169+
}
170+
161171
if l.EmbeddingOutputParsed != nil {
162172
if data, err := json.Marshal(l.EmbeddingOutputParsed); err != nil {
163173
return err
@@ -273,6 +283,13 @@ func (l *Log) DeserializeFields() error {
273283
}
274284
}
275285

286+
if l.ResponsesOutput != "" {
287+
if err := json.Unmarshal([]byte(l.ResponsesOutput), &l.ResponsesOutputParsed); err != nil {
288+
// Log error but don't fail the operation - initialize as nil
289+
l.ResponsesOutputParsed = []schemas.ResponsesMessage{}
290+
}
291+
}
292+
276293
if l.EmbeddingOutput != "" {
277294
if err := json.Unmarshal([]byte(l.EmbeddingOutput), &l.EmbeddingOutputParsed); err != nil {
278295
// Log error but don't fail the operation - initialize as nil
@@ -396,6 +413,30 @@ func (l *Log) BuildContentSummary() string {
396413
}
397414
}
398415

416+
// Add responses output content
417+
if l.ResponsesOutputParsed != nil {
418+
for _, msg := range l.ResponsesOutputParsed {
419+
if msg.Content != nil {
420+
if msg.Content.ContentStr != nil && *msg.Content.ContentStr != "" {
421+
parts = append(parts, *msg.Content.ContentStr)
422+
}
423+
// If content blocks exist, extract text from them
424+
if msg.Content.ContentBlocks != nil {
425+
for _, block := range msg.Content.ContentBlocks {
426+
if block.Text != nil && *block.Text != "" {
427+
parts = append(parts, *block.Text)
428+
}
429+
}
430+
}
431+
}
432+
if msg.ResponsesReasoning != nil {
433+
for _, summary := range msg.ResponsesReasoning.Summary {
434+
parts = append(parts, summary.Text)
435+
}
436+
}
437+
}
438+
}
439+
399440
// Add speech input content
400441
if l.SpeechInputParsed != nil && l.SpeechInputParsed.Input != "" {
401442
parts = append(parts, l.SpeechInputParsed.Input)

plugins/logging/main.go

Lines changed: 20 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ package logging
55

66
import (
77
"context"
8-
"errors"
98
"fmt"
10-
"strings"
119
"sync"
1210
"sync/atomic"
1311
"time"
@@ -38,20 +36,19 @@ const (
3836
// Context keys for logging optimization
3937
const (
4038
DroppedCreateContextKey ContextKey = "logging-dropped"
41-
CreatedTimestampKey ContextKey = "logging-created-timestamp"
4239
)
4340

4441
// UpdateLogData contains data for log entry updates
4542
type UpdateLogData struct {
4643
Status string
4744
TokenUsage *schemas.LLMUsage
4845
Cost *float64 // Cost in dollars from pricing plugin
49-
OutputMessage *schemas.ChatMessage
46+
ChatOutput *schemas.ChatMessage
47+
ResponsesOutput []schemas.ResponsesMessage
5048
EmbeddingOutput []schemas.BifrostEmbedding
51-
ToolCalls []schemas.ChatAssistantMessageToolCall
5249
ErrorDetails *schemas.BifrostError
53-
Model string // May be different from request
54-
Object string // May be different from request
50+
Model string
51+
Object string
5552
SpeechOutput *schemas.BifrostSpeech // For non-streaming speech responses
5653
TranscriptionOutput *schemas.BifrostTranscribe // For non-streaming transcription responses
5754
RawResponse interface{}
@@ -63,6 +60,7 @@ type LogMessage struct {
6360
RequestID string // Unique ID for the request
6461
ParentRequestID string // Unique ID for the parent request
6562
Timestamp time.Time // Of the preHook/postHook call
63+
Latency int64 // For latency updates
6664
InitialData *InitialLogData // For create operations
6765
SemanticCacheDebug *schemas.BifrostCacheDebug // For semantic cache operations
6866
UpdateData *UpdateLogData // For update operations
@@ -101,39 +99,6 @@ type LoggerPlugin struct {
10199
accumulator *streaming.Accumulator // Accumulator for streaming chunks
102100
}
103101

104-
// retryOnNotFound retries a function up to 3 times with 1-second delays if it returns logstore.ErrNotFound
105-
func retryOnNotFound(ctx context.Context, operation func() error) error {
106-
const maxRetries = 3
107-
const retryDelay = time.Second
108-
109-
var lastErr error
110-
for attempt := 0; attempt < maxRetries; attempt++ {
111-
err := operation()
112-
if err == nil {
113-
return nil
114-
}
115-
116-
// Check if the error is logstore.ErrNotFound
117-
if !errors.Is(err, logstore.ErrNotFound) {
118-
return err
119-
}
120-
121-
lastErr = err
122-
123-
// Don't wait after the last attempt
124-
if attempt < maxRetries-1 {
125-
select {
126-
case <-ctx.Done():
127-
return ctx.Err()
128-
case <-time.After(retryDelay):
129-
// Continue to next retry
130-
}
131-
}
132-
}
133-
134-
return lastErr
135-
}
136-
137102
// Init creates new logger plugin with given log store
138103
func Init(ctx context.Context, logger schemas.Logger, logsStore logstore.LogStore, pricingManager *pricing.PricingManager) (*LoggerPlugin, error) {
139104
if logsStore == nil {
@@ -270,7 +235,7 @@ func (p *LoggerPlugin) PreHook(ctx *context.Context, req *schemas.BifrostRequest
270235
initialData.Params = req.TranscriptionRequest.Params
271236
initialData.TranscriptionInput = req.TranscriptionRequest.Input
272237
}
273-
*ctx = context.WithValue(*ctx, CreatedTimestampKey, createdTimestamp)
238+
274239
// Queue the log creation message (non-blocking) - Using sync.Pool
275240
logMsg := p.getLogMessage()
276241
logMsg.Operation = LogOperationCreate
@@ -345,7 +310,13 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
345310
// Queue the log update message (non-blocking) - use same pattern for both streaming and regular
346311
logMsg := p.getLogMessage()
347312
logMsg.RequestID = requestID
348-
logMsg.Timestamp = time.Now()
313+
314+
if result != nil {
315+
logMsg.Latency = result.ExtraFields.Latency
316+
} else {
317+
logMsg.Latency = 0
318+
}
319+
349320
// If response is nil, and there is an error, we update log with error
350321
if result == nil && bifrostErr != nil {
351322
// If request type is streaming, then we trigger cleanup as well
@@ -358,7 +329,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
358329
ErrorDetails: bifrostErr,
359330
}
360331
processingErr := retryOnNotFound(p.ctx, func() error {
361-
return p.updateLogEntry(p.ctx, logMsg.RequestID, logMsg.Timestamp, logMsg.SemanticCacheDebug, logMsg.UpdateData)
332+
return p.updateLogEntry(p.ctx, logMsg.RequestID, logMsg.Latency, logMsg.SemanticCacheDebug, logMsg.UpdateData)
362333
})
363334
if processingErr != nil {
364335
p.logger.Error("failed to process log update for request %s: %v", logMsg.RequestID, processingErr)
@@ -390,7 +361,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
390361
go func() {
391362
defer p.putLogMessage(logMsg) // Return to pool when done
392363
processingErr := retryOnNotFound(p.ctx, func() error {
393-
return p.updateStreamingLogEntry(p.ctx, logMsg.RequestID, logMsg.Timestamp, logMsg.SemanticCacheDebug, logMsg.StreamResponse, streamResponse.Type == streaming.StreamResponseTypeFinal)
364+
return p.updateStreamingLogEntry(p.ctx, logMsg.RequestID, logMsg.SemanticCacheDebug, logMsg.StreamResponse, streamResponse.Type == streaming.StreamResponseTypeFinal)
394365
})
395366
if processingErr != nil {
396367
p.logger.Error("failed to process stream update for request %s: %v", logMsg.RequestID, processingErr)
@@ -421,7 +392,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
421392
// Success case
422393
updateData.Status = "success"
423394
if result.Model != "" {
424-
updateData.Model = result.Model
395+
updateData.Model = result.ExtraFields.ModelRequested
425396
}
426397
// Update object type if available
427398
if result.Object != "" {
@@ -438,7 +409,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
438409
if len(result.Choices) > 0 {
439410
choice := result.Choices[0]
440411
if choice.BifrostTextCompletionResponseChoice != nil {
441-
updateData.OutputMessage = &schemas.ChatMessage{
412+
updateData.ChatOutput = &schemas.ChatMessage{
442413
Role: schemas.ChatMessageRoleAssistant,
443414
Content: &schemas.ChatMessageContent{
444415
ContentStr: choice.BifrostTextCompletionResponseChoice.Text,
@@ -447,29 +418,11 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
447418
}
448419
// Check if this is a non-stream response choice
449420
if choice.BifrostNonStreamResponseChoice != nil {
450-
updateData.OutputMessage = choice.BifrostNonStreamResponseChoice.Message
451-
// Extract tool calls if present
452-
if choice.BifrostNonStreamResponseChoice.Message.ChatAssistantMessage != nil &&
453-
choice.BifrostNonStreamResponseChoice.Message.ChatAssistantMessage.ToolCalls != nil {
454-
updateData.ToolCalls = choice.BifrostNonStreamResponseChoice.Message.ChatAssistantMessage.ToolCalls
455-
}
421+
updateData.ChatOutput = choice.BifrostNonStreamResponseChoice.Message
456422
}
457423
}
458424
if result.ResponsesResponse != nil {
459-
outputMessages := result.ResponsesResponse.Output
460-
if len(outputMessages) > 0 {
461-
chatMessages := schemas.ToChatMessages(outputMessages)
462-
if len(chatMessages) > 0 {
463-
lastMessage := chatMessages[len(chatMessages)-1]
464-
updateData.OutputMessage = &lastMessage
465-
466-
// Extract tool calls if present
467-
if lastMessage.ChatAssistantMessage != nil &&
468-
lastMessage.ChatAssistantMessage.ToolCalls != nil {
469-
updateData.ToolCalls = lastMessage.ChatAssistantMessage.ToolCalls
470-
}
471-
}
472-
}
425+
updateData.ResponsesOutput = result.ResponsesResponse.Output
473426
}
474427
if result.Data != nil {
475428
updateData.EmbeddingOutput = result.Data
@@ -523,7 +476,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
523476
}
524477
// Here we pass plugin level context for background processing to avoid context cancellation
525478
processingErr := retryOnNotFound(p.ctx, func() error {
526-
return p.updateLogEntry(p.ctx, logMsg.RequestID, logMsg.Timestamp, logMsg.SemanticCacheDebug, logMsg.UpdateData)
479+
return p.updateLogEntry(p.ctx, logMsg.RequestID, 0, logMsg.SemanticCacheDebug, logMsg.UpdateData)
527480
})
528481
if processingErr != nil {
529482
p.logger.Error("failed to process log update for request %s: %v", logMsg.RequestID, processingErr)
@@ -557,92 +510,3 @@ func (p *LoggerPlugin) Cleanup() error {
557510
// GORM handles connection cleanup automatically
558511
return nil
559512
}
560-
561-
// Helper methods
562-
563-
// determineObjectType determines the object type from request input
564-
func (p *LoggerPlugin) determineObjectType(requestType schemas.RequestType) string {
565-
switch requestType {
566-
case schemas.TextCompletionRequest, schemas.TextCompletionStreamRequest:
567-
return "text.completion"
568-
case schemas.ChatCompletionRequest:
569-
return "chat.completion"
570-
case schemas.ChatCompletionStreamRequest:
571-
return "chat.completion.chunk"
572-
case schemas.ResponsesRequest:
573-
return "response"
574-
case schemas.ResponsesStreamRequest:
575-
return "response.completion.chunk"
576-
case schemas.EmbeddingRequest:
577-
return "list"
578-
case schemas.SpeechRequest:
579-
return "audio.speech"
580-
case schemas.SpeechStreamRequest:
581-
return "audio.speech.chunk"
582-
case schemas.TranscriptionRequest:
583-
return "audio.transcription"
584-
case schemas.TranscriptionStreamRequest:
585-
return "audio.transcription.chunk"
586-
}
587-
return "unknown"
588-
}
589-
590-
// extractInputHistory extracts input history from request input
591-
// extractInputHistory extracts input history from request input
592-
func (p *LoggerPlugin) extractInputHistory(request *schemas.BifrostRequest) []schemas.ChatMessage {
593-
if request.ChatRequest != nil {
594-
return request.ChatRequest.Input
595-
}
596-
if request.ResponsesRequest != nil {
597-
messages := schemas.ToChatMessages(request.ResponsesRequest.Input)
598-
if len(messages) > 0 {
599-
return messages
600-
}
601-
}
602-
if request.TextCompletionRequest != nil {
603-
var text string
604-
if request.TextCompletionRequest.Input.PromptStr != nil {
605-
text = *request.TextCompletionRequest.Input.PromptStr
606-
} else {
607-
var stringBuilder strings.Builder
608-
for _, prompt := range request.TextCompletionRequest.Input.PromptArray {
609-
stringBuilder.WriteString(prompt)
610-
}
611-
text = stringBuilder.String()
612-
}
613-
return []schemas.ChatMessage{
614-
{
615-
Role: schemas.ChatMessageRoleUser,
616-
Content: &schemas.ChatMessageContent{
617-
ContentStr: &text,
618-
},
619-
},
620-
}
621-
}
622-
if request.EmbeddingRequest != nil {
623-
texts := request.EmbeddingRequest.Input.Texts
624-
625-
if len(texts) == 0 && request.EmbeddingRequest.Input.Text != nil {
626-
texts = []string{*request.EmbeddingRequest.Input.Text}
627-
}
628-
629-
contentBlocks := make([]schemas.ChatContentBlock, len(texts))
630-
for i, text := range texts {
631-
// Create a per-iteration copy to avoid reusing the same memory address
632-
t := text
633-
contentBlocks[i] = schemas.ChatContentBlock{
634-
Type: schemas.ChatContentBlockTypeText,
635-
Text: &t,
636-
}
637-
}
638-
return []schemas.ChatMessage{
639-
{
640-
Role: schemas.ChatMessageRoleUser,
641-
Content: &schemas.ChatMessageContent{
642-
ContentBlocks: contentBlocks,
643-
},
644-
},
645-
}
646-
}
647-
return []schemas.ChatMessage{}
648-
}

0 commit comments

Comments
 (0)