Skip to content

Commit fbce5d1

Browse files
Pratham-Mishra04TejasGhatte
authored andcommitted
feat: responses added to logs
1 parent b42310e commit fbce5d1

File tree

20 files changed

+1564
-332
lines changed

20 files changed

+1564
-332
lines changed

core/schemas/mux.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -910,6 +910,7 @@ func (br *BifrostResponse) ToResponsesStream() {
910910
delta := choice.BifrostStreamResponseChoice.Delta
911911
streamResp := &ResponsesStreamResponse{
912912
SequenceNumber: br.ExtraFields.ChunkIndex,
913+
ContentIndex: Ptr(0),
913914
OutputIndex: &choice.Index,
914915
}
915916

core/schemas/providers/anthropic/responses.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ func (chunk *AnthropicStreamEvent) ToBifrostResponsesStream(sequenceNumber int)
389389
SequenceNumber: sequenceNumber,
390390
OutputIndex: schemas.Ptr(0),
391391
ContentIndex: chunk.Index,
392-
Arguments: chunk.Delta.PartialJSON,
392+
Delta: chunk.Delta.PartialJSON,
393393
},
394394
}, nil, false
395395
}

core/schemas/providers/cohere/responses.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s
448448
ResponsesStreamResponse: &schemas.ResponsesStreamResponse{
449449
Type: schemas.ResponsesStreamResponseTypeOutputItemAdded,
450450
SequenceNumber: sequenceNumber,
451+
OutputIndex: schemas.Ptr(0),
451452
Item: item,
452453
},
453454
}, nil, false
@@ -478,6 +479,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s
478479
ResponsesStreamResponse: &schemas.ResponsesStreamResponse{
479480
Type: schemas.ResponsesStreamResponseTypeContentPartAdded,
480481
SequenceNumber: sequenceNumber,
482+
OutputIndex: schemas.Ptr(0),
481483
ContentIndex: chunk.Index,
482484
Part: part,
483485
},
@@ -492,6 +494,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s
492494
ResponsesStreamResponse: &schemas.ResponsesStreamResponse{
493495
Type: schemas.ResponsesStreamResponseTypeOutputTextDelta,
494496
SequenceNumber: sequenceNumber,
497+
OutputIndex: schemas.Ptr(0),
495498
ContentIndex: chunk.Index,
496499
Delta: chunk.Delta.Message.Content.Text,
497500
},
@@ -506,6 +509,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s
506509
ResponsesStreamResponse: &schemas.ResponsesStreamResponse{
507510
Type: schemas.ResponsesStreamResponseTypeContentPartDone,
508511
SequenceNumber: sequenceNumber,
512+
OutputIndex: schemas.Ptr(0),
509513
ContentIndex: chunk.Index,
510514
},
511515
}, nil, false
@@ -517,6 +521,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s
517521
ResponsesStreamResponse: &schemas.ResponsesStreamResponse{
518522
Type: schemas.ResponsesStreamResponseTypeReasoningSummaryTextDelta,
519523
SequenceNumber: sequenceNumber,
524+
OutputIndex: schemas.Ptr(0),
520525
ContentIndex: schemas.Ptr(0), // Tool plan is typically at index 0
521526
Delta: chunk.Delta.Message.ToolPlan,
522527
},
@@ -544,6 +549,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s
544549
ResponsesStreamResponse: &schemas.ResponsesStreamResponse{
545550
Type: schemas.ResponsesStreamResponseTypeOutputItemAdded,
546551
SequenceNumber: sequenceNumber,
552+
OutputIndex: schemas.Ptr(0),
547553
Item: item,
548554
},
549555
}, nil, false
@@ -557,10 +563,11 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s
557563
if toolCall.Function != nil {
558564
return &schemas.BifrostResponse{
559565
ResponsesStreamResponse: &schemas.ResponsesStreamResponse{
560-
Type: schemas.ResponsesStreamResponseTypeFunctionCallArgumentsAdded,
566+
Type: schemas.ResponsesStreamResponseTypeFunctionCallArgumentsDelta,
561567
SequenceNumber: sequenceNumber,
562568
ContentIndex: chunk.Index,
563-
Arguments: schemas.Ptr(toolCall.Function.Arguments),
569+
OutputIndex: schemas.Ptr(0),
570+
Delta: schemas.Ptr(toolCall.Function.Arguments),
564571
},
565572
}, nil, false
566573
}
@@ -573,6 +580,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s
573580
ResponsesStreamResponse: &schemas.ResponsesStreamResponse{
574581
Type: schemas.ResponsesStreamResponseTypeFunctionCallArgumentsDone,
575582
SequenceNumber: sequenceNumber,
583+
OutputIndex: schemas.Ptr(0),
576584
ContentIndex: chunk.Index,
577585
},
578586
}, nil, false
@@ -618,6 +626,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s
618626
ResponsesStreamResponse: &schemas.ResponsesStreamResponse{
619627
Type: schemas.ResponsesStreamResponseTypeOutputTextAnnotationAdded,
620628
SequenceNumber: sequenceNumber,
629+
OutputIndex: schemas.Ptr(0),
621630
ContentIndex: schemas.Ptr(citation.ContentIndex),
622631
Annotation: annotation,
623632
AnnotationIndex: chunk.Index,
@@ -632,6 +641,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s
632641
ResponsesStreamResponse: &schemas.ResponsesStreamResponse{
633642
Type: schemas.ResponsesStreamResponseTypeOutputTextAnnotationAdded,
634643
SequenceNumber: sequenceNumber,
644+
OutputIndex: schemas.Ptr(0),
635645
ContentIndex: chunk.Index,
636646
AnnotationIndex: chunk.Index,
637647
},
@@ -643,6 +653,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s
643653
ResponsesStreamResponse: &schemas.ResponsesStreamResponse{
644654
Type: schemas.ResponsesStreamResponseTypeCompleted,
645655
SequenceNumber: sequenceNumber,
656+
OutputIndex: schemas.Ptr(0),
646657
Response: &schemas.ResponsesStreamResponseStruct{}, // Initialize Response field
647658
},
648659
}

framework/logstore/tables.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,11 @@ type Log struct {
7676
Model string `gorm:"type:varchar(255);index;not null" json:"model"`
7777
InputHistory string `gorm:"type:text" json:"-"` // JSON serialized []schemas.ChatMessage
7878
OutputMessage string `gorm:"type:text" json:"-"` // JSON serialized *schemas.ChatMessage
79+
ResponsesOutput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.ResponsesMessage
7980
EmbeddingOutput string `gorm:"type:text" json:"-"` // JSON serialized [][]float32
8081
Params string `gorm:"type:text" json:"-"` // JSON serialized *schemas.ModelParameters
8182
Tools string `gorm:"type:text" json:"-"` // JSON serialized []schemas.Tool
82-
ToolCalls string `gorm:"type:text" json:"-"` // JSON serialized []schemas.ToolCall
83+
ToolCalls string `gorm:"type:text" json:"-"` // JSON serialized []schemas.ToolCall (For backward compatibility, tool calls are now in the content)
8384
SpeechInput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.SpeechInput
8485
TranscriptionInput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.TranscriptionInput
8586
SpeechOutput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.BifrostSpeech
@@ -104,10 +105,11 @@ type Log struct {
104105
// Virtual fields for JSON output - these will be populated when needed
105106
InputHistoryParsed []schemas.ChatMessage `gorm:"-" json:"input_history,omitempty"`
106107
OutputMessageParsed *schemas.ChatMessage `gorm:"-" json:"output_message,omitempty"`
108+
ResponsesOutputParsed []schemas.ResponsesMessage `gorm:"-" json:"responses_output,omitempty"`
107109
EmbeddingOutputParsed []schemas.BifrostEmbedding `gorm:"-" json:"embedding_output,omitempty"`
108110
ParamsParsed interface{} `gorm:"-" json:"params,omitempty"`
109111
ToolsParsed []schemas.ChatTool `gorm:"-" json:"tools,omitempty"`
110-
ToolCallsParsed []schemas.ChatAssistantMessageToolCall `gorm:"-" json:"tool_calls,omitempty"`
112+
ToolCallsParsed []schemas.ChatAssistantMessageToolCall `gorm:"-" json:"tool_calls,omitempty"` // For backward compatibility, tool calls are now in the content
111113
TokenUsageParsed *schemas.LLMUsage `gorm:"-" json:"token_usage,omitempty"`
112114
ErrorDetailsParsed *schemas.BifrostError `gorm:"-" json:"error_details,omitempty"`
113115
SpeechInputParsed *schemas.SpeechInput `gorm:"-" json:"speech_input,omitempty"`
@@ -158,6 +160,14 @@ func (l *Log) SerializeFields() error {
158160
}
159161
}
160162

163+
if l.ResponsesOutputParsed != nil {
164+
if data, err := json.Marshal(l.ResponsesOutputParsed); err != nil {
165+
return err
166+
} else {
167+
l.ResponsesOutput = string(data)
168+
}
169+
}
170+
161171
if l.EmbeddingOutputParsed != nil {
162172
if data, err := json.Marshal(l.EmbeddingOutputParsed); err != nil {
163173
return err
@@ -273,6 +283,13 @@ func (l *Log) DeserializeFields() error {
273283
}
274284
}
275285

286+
if l.ResponsesOutput != "" {
287+
if err := json.Unmarshal([]byte(l.ResponsesOutput), &l.ResponsesOutputParsed); err != nil {
288+
// Log error but don't fail the operation - initialize as nil
289+
l.ResponsesOutputParsed = []schemas.ResponsesMessage{}
290+
}
291+
}
292+
276293
if l.EmbeddingOutput != "" {
277294
if err := json.Unmarshal([]byte(l.EmbeddingOutput), &l.EmbeddingOutputParsed); err != nil {
278295
// Log error but don't fail the operation - initialize as nil
@@ -398,6 +415,30 @@ func (l *Log) BuildContentSummary() string {
398415
}
399416
}
400417

418+
// Add responses output content
419+
if l.ResponsesOutputParsed != nil {
420+
for _, msg := range l.ResponsesOutputParsed {
421+
if msg.Content != nil {
422+
if msg.Content.ContentStr != nil && *msg.Content.ContentStr != "" {
423+
parts = append(parts, *msg.Content.ContentStr)
424+
}
425+
// If content blocks exist, extract text from them
426+
if msg.Content.ContentBlocks != nil {
427+
for _, block := range msg.Content.ContentBlocks {
428+
if block.Text != nil && *block.Text != "" {
429+
parts = append(parts, *block.Text)
430+
}
431+
}
432+
}
433+
}
434+
if msg.ResponsesReasoning != nil {
435+
for _, summary := range msg.ResponsesReasoning.Summary {
436+
parts = append(parts, summary.Text)
437+
}
438+
}
439+
}
440+
}
441+
401442
// Add speech input content
402443
if l.SpeechInputParsed != nil && l.SpeechInputParsed.Input != "" {
403444
parts = append(parts, l.SpeechInputParsed.Input)

framework/pricing/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@ func (pm *PricingManager) CalculateCost(result *schemas.BifrostResponse) float64
166166
if result.Transcribe.Usage.InputTokenDetails != nil {
167167
audioTokenDetails = result.Transcribe.Usage.InputTokenDetails
168168
}
169+
} else if result.ResponsesStreamResponse != nil && result.ResponsesStreamResponse.Response != nil && result.ResponsesStreamResponse.Response.Usage != nil {
170+
usage = &schemas.LLMUsage{
171+
ResponsesExtendedResponseUsage: result.ResponsesStreamResponse.Response.Usage.ResponsesExtendedResponseUsage,
172+
TotalTokens: result.ResponsesStreamResponse.Response.Usage.TotalTokens,
173+
}
169174
}
170175

171176
cost := 0.0

framework/streaming/accumulator.go

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type Accumulator struct {
1818
streamAccumulators sync.Map // Track accumulators by request ID (atomic)
1919

2020
chatStreamChunkPool sync.Pool // Pool for reusing StreamChunk structs
21+
responsesStreamChunkPool sync.Pool // Pool for reusing ResponsesStreamChunk structs
2122
audioStreamChunkPool sync.Pool // Pool for reusing AudioStreamChunk structs
2223
transcriptionStreamChunkPool sync.Pool // Pool for reusing TranscriptionStreamChunk structs
2324

@@ -80,14 +81,32 @@ func (a *Accumulator) putTranscriptionStreamChunk(chunk *TranscriptionStreamChun
8081
a.transcriptionStreamChunkPool.Put(chunk)
8182
}
8283

84+
// getResponsesStreamChunk gets a responses stream chunk from the pool
85+
func (a *Accumulator) getResponsesStreamChunk() *ResponsesStreamChunk {
86+
return a.responsesStreamChunkPool.Get().(*ResponsesStreamChunk)
87+
}
88+
89+
// putResponsesStreamChunk returns a responses stream chunk to the pool
90+
func (a *Accumulator) putResponsesStreamChunk(chunk *ResponsesStreamChunk) {
91+
chunk.Timestamp = time.Time{}
92+
chunk.StreamResponse = nil
93+
chunk.Cost = nil
94+
chunk.SemanticCacheDebug = nil
95+
chunk.ErrorDetails = nil
96+
chunk.FinishReason = nil
97+
chunk.TokenUsage = nil
98+
a.responsesStreamChunkPool.Put(chunk)
99+
}
100+
83101
// CreateStreamAccumulator creates a new stream accumulator for a request
84102
func (a *Accumulator) createStreamAccumulator(requestID string) *StreamAccumulator {
85103
sc := &StreamAccumulator{
86-
RequestID: requestID,
87-
ChatStreamChunks: make([]*ChatStreamChunk, 0),
88-
IsComplete: false,
89-
Timestamp: time.Now(),
90-
Object: "",
104+
RequestID: requestID,
105+
ChatStreamChunks: make([]*ChatStreamChunk, 0),
106+
ResponsesStreamChunks: make([]*ResponsesStreamChunk, 0),
107+
IsComplete: false,
108+
Timestamp: time.Now(),
109+
Object: "",
91110
}
92111
a.streamAccumulators.Store(requestID, sc)
93112
return sc
@@ -174,6 +193,30 @@ func (a *Accumulator) addAudioStreamChunk(requestID string, chunk *AudioStreamCh
174193
return nil
175194
}
176195

196+
// addResponsesStreamChunk adds a responses stream chunk to the stream accumulator
197+
func (a *Accumulator) addResponsesStreamChunk(requestID string, chunk *ResponsesStreamChunk, object string, isFinalChunk bool) error {
198+
accumulator := a.getOrCreateStreamAccumulator(requestID)
199+
// Lock the accumulator
200+
accumulator.mu.Lock()
201+
defer accumulator.mu.Unlock()
202+
if accumulator.StartTimestamp.IsZero() {
203+
accumulator.StartTimestamp = chunk.Timestamp
204+
}
205+
// Store object type once (from first chunk)
206+
if accumulator.Object == "" && object != "" {
207+
accumulator.Object = object
208+
}
209+
// Add chunk to the list (chunks arrive in order)
210+
accumulator.ResponsesStreamChunks = append(accumulator.ResponsesStreamChunks, chunk)
211+
// Check if this is the final chunk
212+
// Set FinalTimestamp when either FinishReason is present or token usage exists
213+
// This handles both normal completion chunks and usage-only last chunks
214+
if isFinalChunk {
215+
accumulator.FinalTimestamp = chunk.Timestamp
216+
}
217+
return nil
218+
}
219+
177220
// cleanupStreamAccumulator removes the stream accumulator for a request
178221
func (a *Accumulator) cleanupStreamAccumulator(requestID string) {
179222
if accumulator, exists := a.streamAccumulators.Load(requestID); exists {
@@ -182,6 +225,9 @@ func (a *Accumulator) cleanupStreamAccumulator(requestID string) {
182225
for _, chunk := range acc.ChatStreamChunks {
183226
a.putChatStreamChunk(chunk)
184227
}
228+
for _, chunk := range acc.ResponsesStreamChunks {
229+
a.putResponsesStreamChunk(chunk)
230+
}
185231
for _, chunk := range acc.AudioStreamChunks {
186232
a.putAudioStreamChunk(chunk)
187233
}
@@ -263,7 +309,7 @@ func (a *Accumulator) appendContentToMessage(message *schemas.ChatMessage, newCo
263309
}
264310

265311
// ProcessStreamingResponse processes a streaming response
266-
// It handles both audio and chat streaming responses
312+
// It handles chat, audio, and responses streaming responses
267313
func (a *Accumulator) ProcessStreamingResponse(ctx *context.Context, result *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*ProcessedStreamResponse, error) {
268314
// Check if this is a streaming response
269315
if result == nil {
@@ -272,6 +318,8 @@ func (a *Accumulator) ProcessStreamingResponse(ctx *context.Context, result *sch
272318
requestType := result.ExtraFields.RequestType
273319
isAudioStreaming := requestType == schemas.SpeechStreamRequest || requestType == schemas.TranscriptionStreamRequest
274320
isChatStreaming := requestType == schemas.ChatCompletionStreamRequest || requestType == schemas.TextCompletionStreamRequest
321+
isResponsesStreaming := requestType == schemas.ResponsesStreamRequest
322+
275323
if isChatStreaming {
276324
// Handle text-based streaming with ordered accumulation
277325
return a.processChatStreamingResponse(ctx, result, bifrostErr)
@@ -283,6 +331,9 @@ func (a *Accumulator) ProcessStreamingResponse(ctx *context.Context, result *sch
283331
if requestType == schemas.SpeechStreamRequest {
284332
return a.processAudioStreamingResponse(ctx, result, bifrostErr)
285333
}
334+
} else if isResponsesStreaming {
335+
// Handle responses streaming with responses accumulation
336+
return a.processResponsesStreamingResponse(ctx, result, bifrostErr)
286337
}
287338
return nil, fmt.Errorf("request type missing/invalid for accumulator")
288339
}
@@ -295,6 +346,9 @@ func (a *Accumulator) Cleanup() {
295346
for _, chunk := range accumulator.ChatStreamChunks {
296347
a.chatStreamChunkPool.Put(chunk)
297348
}
349+
for _, chunk := range accumulator.ResponsesStreamChunks {
350+
a.responsesStreamChunkPool.Put(chunk)
351+
}
298352
for _, chunk := range accumulator.TranscriptionStreamChunks {
299353
a.transcriptionStreamChunkPool.Put(chunk)
300354
}
@@ -360,6 +414,11 @@ func NewAccumulator(pricingManager *pricing.PricingManager, logger schemas.Logge
360414
return &ChatStreamChunk{}
361415
},
362416
},
417+
responsesStreamChunkPool: sync.Pool{
418+
New: func() any {
419+
return &ResponsesStreamChunk{}
420+
},
421+
},
363422
audioStreamChunkPool: sync.Pool{
364423
New: func() any {
365424
return &AudioStreamChunk{}
@@ -381,6 +440,7 @@ func NewAccumulator(pricingManager *pricing.PricingManager, logger schemas.Logge
381440
// Prewarm the pools for better performance at startup
382441
for range 1000 {
383442
a.chatStreamChunkPool.Put(&ChatStreamChunk{})
443+
a.responsesStreamChunkPool.Put(&ResponsesStreamChunk{})
384444
a.audioStreamChunkPool.Put(&AudioStreamChunk{})
385445
a.transcriptionStreamChunkPool.Put(&TranscriptionStreamChunk{})
386446
}

0 commit comments

Comments
 (0)