Skip to content

Commit 5745bcb

Browse files
feat: responses added to logs
1 parent 290c61b commit 5745bcb

File tree

13 files changed

+1157
-302
lines changed

13 files changed

+1157
-302
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ PORT ?= 8080
66
APP_DIR ?=
77
PROMETHEUS_LABELS ?=
88
LOG_STYLE ?= json
9-
LOG_LEVEL ?= debug
9+
LOG_LEVEL ?= info
1010

1111
# Colors for output
1212
RED=\033[0;31m

framework/logstore/tables.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,11 @@ type Log struct {
7676
Model string `gorm:"type:varchar(255);index;not null" json:"model"`
7777
InputHistory string `gorm:"type:text" json:"-"` // JSON serialized []schemas.ChatMessage
7878
OutputMessage string `gorm:"type:text" json:"-"` // JSON serialized *schemas.ChatMessage
79+
ResponsesOutput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.ResponsesMessage
7980
EmbeddingOutput string `gorm:"type:text" json:"-"` // JSON serialized [][]float32
8081
Params string `gorm:"type:text" json:"-"` // JSON serialized *schemas.ModelParameters
8182
Tools string `gorm:"type:text" json:"-"` // JSON serialized []schemas.Tool
82-
ToolCalls string `gorm:"type:text" json:"-"` // JSON serialized []schemas.ToolCall
83+
ToolCalls string `gorm:"type:text" json:"-"` // JSON serialized []schemas.ToolCall (For backward compatibility, tool calls are now in the content)
8384
SpeechInput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.SpeechInput
8485
TranscriptionInput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.TranscriptionInput
8586
SpeechOutput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.BifrostSpeech
@@ -103,10 +104,11 @@ type Log struct {
103104
// Virtual fields for JSON output - these will be populated when needed
104105
InputHistoryParsed []schemas.ChatMessage `gorm:"-" json:"input_history,omitempty"`
105106
OutputMessageParsed *schemas.ChatMessage `gorm:"-" json:"output_message,omitempty"`
107+
ResponsesOutputParsed []schemas.ResponsesMessage `gorm:"-" json:"responses_output,omitempty"`
106108
EmbeddingOutputParsed []schemas.BifrostEmbedding `gorm:"-" json:"embedding_output,omitempty"`
107109
ParamsParsed interface{} `gorm:"-" json:"params,omitempty"`
108110
ToolsParsed []schemas.ChatTool `gorm:"-" json:"tools,omitempty"`
109-
ToolCallsParsed []schemas.ChatAssistantMessageToolCall `gorm:"-" json:"tool_calls,omitempty"`
111+
ToolCallsParsed []schemas.ChatAssistantMessageToolCall `gorm:"-" json:"tool_calls,omitempty"` // For backward compatibility, tool calls are now in the content
110112
TokenUsageParsed *schemas.LLMUsage `gorm:"-" json:"token_usage,omitempty"`
111113
ErrorDetailsParsed *schemas.BifrostError `gorm:"-" json:"error_details,omitempty"`
112114
SpeechInputParsed *schemas.SpeechInput `gorm:"-" json:"speech_input,omitempty"`
@@ -157,6 +159,14 @@ func (l *Log) SerializeFields() error {
157159
}
158160
}
159161

162+
if l.ResponsesOutputParsed != nil {
163+
if data, err := json.Marshal(l.ResponsesOutputParsed); err != nil {
164+
return err
165+
} else {
166+
l.ResponsesOutput = string(data)
167+
}
168+
}
169+
160170
if l.EmbeddingOutputParsed != nil {
161171
if data, err := json.Marshal(l.EmbeddingOutputParsed); err != nil {
162172
return err
@@ -272,6 +282,13 @@ func (l *Log) DeserializeFields() error {
272282
}
273283
}
274284

285+
if l.ResponsesOutput != "" {
286+
if err := json.Unmarshal([]byte(l.ResponsesOutput), &l.ResponsesOutputParsed); err != nil {
287+
// Log error but don't fail the operation - initialize as nil
288+
l.ResponsesOutputParsed = []schemas.ResponsesMessage{}
289+
}
290+
}
291+
275292
if l.EmbeddingOutput != "" {
276293
if err := json.Unmarshal([]byte(l.EmbeddingOutput), &l.EmbeddingOutputParsed); err != nil {
277294
// Log error but don't fail the operation - initialize as nil
@@ -393,6 +410,30 @@ func (l *Log) BuildContentSummary() string {
393410
}
394411
}
395412

413+
// Add responses output content
414+
if l.ResponsesOutputParsed != nil {
415+
for _, msg := range l.ResponsesOutputParsed {
416+
if msg.Content != nil {
417+
if msg.Content.ContentStr != nil && *msg.Content.ContentStr != "" {
418+
parts = append(parts, *msg.Content.ContentStr)
419+
}
420+
// If content blocks exist, extract text from them
421+
if msg.Content.ContentBlocks != nil {
422+
for _, block := range msg.Content.ContentBlocks {
423+
if block.Text != nil && *block.Text != "" {
424+
parts = append(parts, *block.Text)
425+
}
426+
}
427+
}
428+
}
429+
if msg.ResponsesReasoning != nil {
430+
for _, summary := range msg.ResponsesReasoning.Summary {
431+
parts = append(parts, summary.Text)
432+
}
433+
}
434+
}
435+
}
436+
396437
// Add speech input content
397438
if l.SpeechInputParsed != nil && l.SpeechInputParsed.Input != "" {
398439
parts = append(parts, l.SpeechInputParsed.Input)

plugins/logging/main.go

Lines changed: 18 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ package logging
55

66
import (
77
"context"
8-
"errors"
98
"fmt"
10-
"strings"
119
"sync"
1210
"sync/atomic"
1311
"time"
@@ -38,20 +36,19 @@ const (
3836
// Context keys for logging optimization
3937
const (
4038
DroppedCreateContextKey ContextKey = "logging-dropped"
41-
CreatedTimestampKey ContextKey = "logging-created-timestamp"
4239
)
4340

4441
// UpdateLogData contains data for log entry updates
4542
type UpdateLogData struct {
4643
Status string
4744
TokenUsage *schemas.LLMUsage
4845
Cost *float64 // Cost in dollars from pricing plugin
49-
OutputMessage *schemas.ChatMessage
46+
ChatOutput *schemas.ChatMessage
47+
ResponsesOutput []schemas.ResponsesMessage
5048
EmbeddingOutput []schemas.BifrostEmbedding
51-
ToolCalls []schemas.ChatAssistantMessageToolCall
5249
ErrorDetails *schemas.BifrostError
53-
Model string // May be different from request
54-
Object string // May be different from request
50+
Model string
51+
Object string
5552
SpeechOutput *schemas.BifrostSpeech // For non-streaming speech responses
5653
TranscriptionOutput *schemas.BifrostTranscribe // For non-streaming transcription responses
5754
}
@@ -62,6 +59,7 @@ type LogMessage struct {
6259
RequestID string // Unique ID for the request
6360
ParentRequestID string // Unique ID for the parent request
6461
Timestamp time.Time // Of the preHook/postHook call
62+
Latency int64 // For latency updates
6563
InitialData *InitialLogData // For create operations
6664
SemanticCacheDebug *schemas.BifrostCacheDebug // For semantic cache operations
6765
UpdateData *UpdateLogData // For update operations
@@ -100,39 +98,6 @@ type LoggerPlugin struct {
10098
accumulator *streaming.Accumulator // Accumulator for streaming chunks
10199
}
102100

103-
// retryOnNotFound retries a function up to 3 times with 1-second delays if it returns logstore.ErrNotFound
104-
func retryOnNotFound(ctx context.Context, operation func() error) error {
105-
const maxRetries = 3
106-
const retryDelay = time.Second
107-
108-
var lastErr error
109-
for attempt := 0; attempt < maxRetries; attempt++ {
110-
err := operation()
111-
if err == nil {
112-
return nil
113-
}
114-
115-
// Check if the error is logstore.ErrNotFound
116-
if !errors.Is(err, logstore.ErrNotFound) {
117-
return err
118-
}
119-
120-
lastErr = err
121-
122-
// Don't wait after the last attempt
123-
if attempt < maxRetries-1 {
124-
select {
125-
case <-ctx.Done():
126-
return ctx.Err()
127-
case <-time.After(retryDelay):
128-
// Continue to next retry
129-
}
130-
}
131-
}
132-
133-
return lastErr
134-
}
135-
136101
// Init creates new logger plugin with given log store
137102
func Init(ctx context.Context, logger schemas.Logger, logsStore logstore.LogStore, pricingManager *pricing.PricingManager) (*LoggerPlugin, error) {
138103
if logsStore == nil {
@@ -269,7 +234,7 @@ func (p *LoggerPlugin) PreHook(ctx *context.Context, req *schemas.BifrostRequest
269234
initialData.Params = req.TranscriptionRequest.Params
270235
initialData.TranscriptionInput = req.TranscriptionRequest.Input
271236
}
272-
*ctx = context.WithValue(*ctx, CreatedTimestampKey, createdTimestamp)
237+
273238
// Queue the log creation message (non-blocking) - Using sync.Pool
274239
logMsg := p.getLogMessage()
275240
logMsg.Operation = LogOperationCreate
@@ -319,7 +284,6 @@ func (p *LoggerPlugin) PreHook(ctx *context.Context, req *schemas.BifrostRequest
319284

320285
// PostHook is called after a response is received - FULLY ASYNC, NO DATABASE I/O
321286
func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*schemas.BifrostResponse, *schemas.BifrostError, error) {
322-
p.logger.Debug("[logging] PostHook called")
323287
if ctx == nil {
324288
// Log error but don't fail the request
325289
p.logger.Error("context is nil in PostHook")
@@ -344,7 +308,12 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
344308
// Queue the log update message (non-blocking) - use same pattern for both streaming and regular
345309
logMsg := p.getLogMessage()
346310
logMsg.RequestID = requestID
347-
logMsg.Timestamp = time.Now()
311+
logMsg.Latency = 0
312+
313+
if result != nil {
314+
logMsg.Latency = result.ExtraFields.Latency
315+
}
316+
348317
if bifrost.IsStreamRequestType(requestType) {
349318
p.logger.Debug("[logging] processing streaming response")
350319
streamResponse, err := p.accumulator.ProcessStreamingResponse(ctx, result, bifrostErr)
@@ -359,7 +328,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
359328
go func() {
360329
defer p.putLogMessage(logMsg) // Return to pool when done
361330
processingErr := retryOnNotFound(p.ctx, func() error {
362-
return p.updateStreamingLogEntry(p.ctx, logMsg.RequestID, logMsg.Timestamp, logMsg.SemanticCacheDebug, logMsg.StreamResponse, streamResponse.Type == streaming.StreamResponseTypeFinal)
331+
return p.updateStreamingLogEntry(p.ctx, logMsg.RequestID, logMsg.SemanticCacheDebug, logMsg.StreamResponse, streamResponse.Type == streaming.StreamResponseTypeFinal)
363332
})
364333
if processingErr != nil {
365334
p.logger.Error("failed to process stream update for request %s: %v", logMsg.RequestID, processingErr)
@@ -389,7 +358,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
389358
// Success case
390359
updateData.Status = "success"
391360
if result.Model != "" {
392-
updateData.Model = result.Model
361+
updateData.Model = result.ExtraFields.ModelRequested
393362
}
394363
// Update object type if available
395364
if result.Object != "" {
@@ -403,7 +372,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
403372
if len(result.Choices) > 0 {
404373
choice := result.Choices[0]
405374
if choice.BifrostTextCompletionResponseChoice != nil {
406-
updateData.OutputMessage = &schemas.ChatMessage{
375+
updateData.ChatOutput = &schemas.ChatMessage{
407376
Role: schemas.ChatMessageRoleAssistant,
408377
Content: schemas.ChatMessageContent{
409378
ContentStr: choice.BifrostTextCompletionResponseChoice.Text,
@@ -412,29 +381,11 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
412381
}
413382
// Check if this is a non-stream response choice
414383
if choice.BifrostNonStreamResponseChoice != nil {
415-
updateData.OutputMessage = &choice.BifrostNonStreamResponseChoice.Message
416-
// Extract tool calls if present
417-
if choice.BifrostNonStreamResponseChoice.Message.ChatAssistantMessage != nil &&
418-
choice.BifrostNonStreamResponseChoice.Message.ChatAssistantMessage.ToolCalls != nil {
419-
updateData.ToolCalls = choice.BifrostNonStreamResponseChoice.Message.ChatAssistantMessage.ToolCalls
420-
}
384+
updateData.ChatOutput = &choice.BifrostNonStreamResponseChoice.Message
421385
}
422386
}
423387
if result.ResponsesResponse != nil {
424-
outputMessages := result.ResponsesResponse.Output
425-
if len(outputMessages) > 0 {
426-
chatMessages := schemas.ToChatMessages(outputMessages)
427-
if len(chatMessages) > 0 {
428-
lastMessage := chatMessages[len(chatMessages)-1]
429-
updateData.OutputMessage = &lastMessage
430-
431-
// Extract tool calls if present
432-
if lastMessage.ChatAssistantMessage != nil &&
433-
lastMessage.ChatAssistantMessage.ToolCalls != nil {
434-
updateData.ToolCalls = lastMessage.ChatAssistantMessage.ToolCalls
435-
}
436-
}
437-
}
388+
updateData.ResponsesOutput = result.ResponsesResponse.Output
438389
}
439390
if result.Data != nil {
440391
updateData.EmbeddingOutput = result.Data
@@ -488,7 +439,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
488439
}
489440
// Here we pass plugin level context for background processing to avoid context cancellation
490441
processingErr := retryOnNotFound(p.ctx, func() error {
491-
return p.updateLogEntry(p.ctx, logMsg.RequestID, logMsg.Timestamp, logMsg.SemanticCacheDebug, logMsg.UpdateData)
442+
return p.updateLogEntry(p.ctx, logMsg.RequestID, 0, logMsg.SemanticCacheDebug, logMsg.UpdateData)
492443
})
493444
if processingErr != nil {
494445
p.logger.Error("failed to process log update for request %s: %v", logMsg.RequestID, processingErr)
@@ -522,92 +473,3 @@ func (p *LoggerPlugin) Cleanup() error {
522473
// GORM handles connection cleanup automatically
523474
return nil
524475
}
525-
526-
// Helper methods
527-
528-
// determineObjectType determines the object type from request input
529-
func (p *LoggerPlugin) determineObjectType(requestType schemas.RequestType) string {
530-
switch requestType {
531-
case schemas.TextCompletionRequest, schemas.TextCompletionStreamRequest:
532-
return "text.completion"
533-
case schemas.ChatCompletionRequest:
534-
return "chat.completion"
535-
case schemas.ChatCompletionStreamRequest:
536-
return "chat.completion.chunk"
537-
case schemas.ResponsesRequest:
538-
return "response"
539-
case schemas.ResponsesStreamRequest:
540-
return "response.completion.chunk"
541-
case schemas.EmbeddingRequest:
542-
return "list"
543-
case schemas.SpeechRequest:
544-
return "audio.speech"
545-
case schemas.SpeechStreamRequest:
546-
return "audio.speech.chunk"
547-
case schemas.TranscriptionRequest:
548-
return "audio.transcription"
549-
case schemas.TranscriptionStreamRequest:
550-
return "audio.transcription.chunk"
551-
}
552-
return "unknown"
553-
}
554-
555-
// extractInputHistory extracts input history from request input
556-
// extractInputHistory extracts input history from request input
557-
func (p *LoggerPlugin) extractInputHistory(request *schemas.BifrostRequest) []schemas.ChatMessage {
558-
if request.ChatRequest != nil {
559-
return request.ChatRequest.Input
560-
}
561-
if request.ResponsesRequest != nil {
562-
messages := schemas.ToChatMessages(request.ResponsesRequest.Input)
563-
if len(messages) > 0 {
564-
return messages
565-
}
566-
}
567-
if request.TextCompletionRequest != nil {
568-
var text string
569-
if request.TextCompletionRequest.Input.PromptStr != nil {
570-
text = *request.TextCompletionRequest.Input.PromptStr
571-
} else {
572-
var stringBuilder strings.Builder
573-
for _, prompt := range request.TextCompletionRequest.Input.PromptArray {
574-
stringBuilder.WriteString(prompt)
575-
}
576-
text = stringBuilder.String()
577-
}
578-
return []schemas.ChatMessage{
579-
{
580-
Role: schemas.ChatMessageRoleUser,
581-
Content: schemas.ChatMessageContent{
582-
ContentStr: &text,
583-
},
584-
},
585-
}
586-
}
587-
if request.EmbeddingRequest != nil {
588-
texts := request.EmbeddingRequest.Input.Texts
589-
590-
if len(texts) == 0 && request.EmbeddingRequest.Input.Text != nil {
591-
texts = []string{*request.EmbeddingRequest.Input.Text}
592-
}
593-
594-
contentBlocks := make([]schemas.ChatContentBlock, len(texts))
595-
for i, text := range texts {
596-
// Create a per-iteration copy to avoid reusing the same memory address
597-
t := text
598-
contentBlocks[i] = schemas.ChatContentBlock{
599-
Type: schemas.ChatContentBlockTypeText,
600-
Text: &t,
601-
}
602-
}
603-
return []schemas.ChatMessage{
604-
{
605-
Role: schemas.ChatMessageRoleUser,
606-
Content: schemas.ChatMessageContent{
607-
ContentBlocks: contentBlocks,
608-
},
609-
},
610-
}
611-
}
612-
return []schemas.ChatMessage{}
613-
}

0 commit comments

Comments
 (0)