@@ -5,9 +5,7 @@ package logging
55
66import (
77 "context"
8- "errors"
98 "fmt"
10- "strings"
119 "sync"
1210 "sync/atomic"
1311 "time"
@@ -38,20 +36,19 @@ const (
3836// Context keys for logging optimization
3937const (
4038 DroppedCreateContextKey ContextKey = "logging-dropped"
41- CreatedTimestampKey ContextKey = "logging-created-timestamp"
4239)
4340
4441// UpdateLogData contains data for log entry updates
4542type UpdateLogData struct {
4643 Status string
4744 TokenUsage * schemas.LLMUsage
4845 Cost * float64 // Cost in dollars from pricing plugin
49- OutputMessage * schemas.ChatMessage
46+ ChatOutput * schemas.ChatMessage
47+ ResponsesOutput []schemas.ResponsesMessage
5048 EmbeddingOutput []schemas.BifrostEmbedding
51- ToolCalls []schemas.ChatAssistantMessageToolCall
5249 ErrorDetails * schemas.BifrostError
53- Model string // May be different from request
54- Object string // May be different from request
50+ Model string
51+ Object string
5552 SpeechOutput * schemas.BifrostSpeech // For non-streaming speech responses
5653 TranscriptionOutput * schemas.BifrostTranscribe // For non-streaming transcription responses
5754}
@@ -62,6 +59,7 @@ type LogMessage struct {
6259 RequestID string // Unique ID for the request
6360 ParentRequestID string // Unique ID for the parent request
6461 Timestamp time.Time // Of the preHook/postHook call
62+ Latency int64 // For latency updates
6563 InitialData * InitialLogData // For create operations
6664 SemanticCacheDebug * schemas.BifrostCacheDebug // For semantic cache operations
6765 UpdateData * UpdateLogData // For update operations
@@ -100,39 +98,6 @@ type LoggerPlugin struct {
10098 accumulator * streaming.Accumulator // Accumulator for streaming chunks
10199}
102100
103- // retryOnNotFound retries a function up to 3 times with 1-second delays if it returns logstore.ErrNotFound
104- func retryOnNotFound (ctx context.Context , operation func () error ) error {
105- const maxRetries = 3
106- const retryDelay = time .Second
107-
108- var lastErr error
109- for attempt := 0 ; attempt < maxRetries ; attempt ++ {
110- err := operation ()
111- if err == nil {
112- return nil
113- }
114-
115- // Check if the error is logstore.ErrNotFound
116- if ! errors .Is (err , logstore .ErrNotFound ) {
117- return err
118- }
119-
120- lastErr = err
121-
122- // Don't wait after the last attempt
123- if attempt < maxRetries - 1 {
124- select {
125- case <- ctx .Done ():
126- return ctx .Err ()
127- case <- time .After (retryDelay ):
128- // Continue to next retry
129- }
130- }
131- }
132-
133- return lastErr
134- }
135-
136101// Init creates new logger plugin with given log store
137102func Init (ctx context.Context , logger schemas.Logger , logsStore logstore.LogStore , pricingManager * pricing.PricingManager ) (* LoggerPlugin , error ) {
138103 if logsStore == nil {
@@ -269,7 +234,7 @@ func (p *LoggerPlugin) PreHook(ctx *context.Context, req *schemas.BifrostRequest
269234 initialData .Params = req .TranscriptionRequest .Params
270235 initialData .TranscriptionInput = req .TranscriptionRequest .Input
271236 }
272- * ctx = context . WithValue ( * ctx , CreatedTimestampKey , createdTimestamp )
237+
273238 // Queue the log creation message (non-blocking) - Using sync.Pool
274239 logMsg := p .getLogMessage ()
275240 logMsg .Operation = LogOperationCreate
@@ -344,7 +309,13 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
344309 // Queue the log update message (non-blocking) - use same pattern for both streaming and regular
345310 logMsg := p .getLogMessage ()
346311 logMsg .RequestID = requestID
347- logMsg .Timestamp = time .Now ()
312+
313+ if result != nil {
314+ logMsg .Latency = result .ExtraFields .Latency
315+ } else {
316+ logMsg .Latency = 0
317+ }
318+
348319 // If response is nil, and there is an error, we update log with error
349320 if result == nil && bifrostErr != nil {
350321 // If request type is streaming, then we trigger cleanup as well
@@ -357,7 +328,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
357328 ErrorDetails : bifrostErr ,
358329 }
359330 processingErr := retryOnNotFound (p .ctx , func () error {
360- return p .updateLogEntry (p .ctx , logMsg .RequestID , logMsg .Timestamp , logMsg .SemanticCacheDebug , logMsg .UpdateData )
331+ return p .updateLogEntry (p .ctx , logMsg .RequestID , logMsg .Latency , logMsg .SemanticCacheDebug , logMsg .UpdateData )
361332 })
362333 if processingErr != nil {
363334 p .logger .Error ("failed to process log update for request %s: %v" , logMsg .RequestID , processingErr )
@@ -389,7 +360,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
389360 go func () {
390361 defer p .putLogMessage (logMsg ) // Return to pool when done
391362 processingErr := retryOnNotFound (p .ctx , func () error {
392- return p .updateStreamingLogEntry (p .ctx , logMsg .RequestID , logMsg .Timestamp , logMsg . SemanticCacheDebug , logMsg .StreamResponse , streamResponse .Type == streaming .StreamResponseTypeFinal )
363+ return p .updateStreamingLogEntry (p .ctx , logMsg .RequestID , logMsg .SemanticCacheDebug , logMsg .StreamResponse , streamResponse .Type == streaming .StreamResponseTypeFinal )
393364 })
394365 if processingErr != nil {
395366 p .logger .Error ("failed to process stream update for request %s: %v" , logMsg .RequestID , processingErr )
@@ -420,7 +391,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
420391 // Success case
421392 updateData .Status = "success"
422393 if result .Model != "" {
423- updateData .Model = result .Model
394+ updateData .Model = result .ExtraFields . ModelRequested
424395 }
425396 // Update object type if available
426397 if result .Object != "" {
@@ -434,7 +405,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
434405 if len (result .Choices ) > 0 {
435406 choice := result .Choices [0 ]
436407 if choice .BifrostTextCompletionResponseChoice != nil {
437- updateData .OutputMessage = & schemas.ChatMessage {
408+ updateData .ChatOutput = & schemas.ChatMessage {
438409 Role : schemas .ChatMessageRoleAssistant ,
439410 Content : schemas.ChatMessageContent {
440411 ContentStr : choice .BifrostTextCompletionResponseChoice .Text ,
@@ -443,29 +414,11 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
443414 }
444415 // Check if this is a non-stream response choice
445416 if choice .BifrostNonStreamResponseChoice != nil {
446- updateData .OutputMessage = & choice .BifrostNonStreamResponseChoice .Message
447- // Extract tool calls if present
448- if choice .BifrostNonStreamResponseChoice .Message .ChatAssistantMessage != nil &&
449- choice .BifrostNonStreamResponseChoice .Message .ChatAssistantMessage .ToolCalls != nil {
450- updateData .ToolCalls = choice .BifrostNonStreamResponseChoice .Message .ChatAssistantMessage .ToolCalls
451- }
417+ updateData .ChatOutput = & choice .BifrostNonStreamResponseChoice .Message
452418 }
453419 }
454420 if result .ResponsesResponse != nil {
455- outputMessages := result .ResponsesResponse .Output
456- if len (outputMessages ) > 0 {
457- chatMessages := schemas .ToChatMessages (outputMessages )
458- if len (chatMessages ) > 0 {
459- lastMessage := chatMessages [len (chatMessages )- 1 ]
460- updateData .OutputMessage = & lastMessage
461-
462- // Extract tool calls if present
463- if lastMessage .ChatAssistantMessage != nil &&
464- lastMessage .ChatAssistantMessage .ToolCalls != nil {
465- updateData .ToolCalls = lastMessage .ChatAssistantMessage .ToolCalls
466- }
467- }
468- }
421+ updateData .ResponsesOutput = result .ResponsesResponse .Output
469422 }
470423 if result .Data != nil {
471424 updateData .EmbeddingOutput = result .Data
@@ -519,7 +472,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes
519472 }
520473 // Here we pass plugin level context for background processing to avoid context cancellation
521474 processingErr := retryOnNotFound (p .ctx , func () error {
522- return p .updateLogEntry (p .ctx , logMsg .RequestID , logMsg . Timestamp , logMsg .SemanticCacheDebug , logMsg .UpdateData )
475+ return p .updateLogEntry (p .ctx , logMsg .RequestID , 0 , logMsg .SemanticCacheDebug , logMsg .UpdateData )
523476 })
524477 if processingErr != nil {
525478 p .logger .Error ("failed to process log update for request %s: %v" , logMsg .RequestID , processingErr )
@@ -553,92 +506,3 @@ func (p *LoggerPlugin) Cleanup() error {
553506 // GORM handles connection cleanup automatically
554507 return nil
555508}
556-
557- // Helper methods
558-
559- // determineObjectType determines the object type from request input
560- func (p * LoggerPlugin ) determineObjectType (requestType schemas.RequestType ) string {
561- switch requestType {
562- case schemas .TextCompletionRequest , schemas .TextCompletionStreamRequest :
563- return "text.completion"
564- case schemas .ChatCompletionRequest :
565- return "chat.completion"
566- case schemas .ChatCompletionStreamRequest :
567- return "chat.completion.chunk"
568- case schemas .ResponsesRequest :
569- return "response"
570- case schemas .ResponsesStreamRequest :
571- return "response.completion.chunk"
572- case schemas .EmbeddingRequest :
573- return "list"
574- case schemas .SpeechRequest :
575- return "audio.speech"
576- case schemas .SpeechStreamRequest :
577- return "audio.speech.chunk"
578- case schemas .TranscriptionRequest :
579- return "audio.transcription"
580- case schemas .TranscriptionStreamRequest :
581- return "audio.transcription.chunk"
582- }
583- return "unknown"
584- }
585-
586- // extractInputHistory extracts input history from request input
587- // extractInputHistory extracts input history from request input
588- func (p * LoggerPlugin ) extractInputHistory (request * schemas.BifrostRequest ) []schemas.ChatMessage {
589- if request .ChatRequest != nil {
590- return request .ChatRequest .Input
591- }
592- if request .ResponsesRequest != nil {
593- messages := schemas .ToChatMessages (request .ResponsesRequest .Input )
594- if len (messages ) > 0 {
595- return messages
596- }
597- }
598- if request .TextCompletionRequest != nil {
599- var text string
600- if request .TextCompletionRequest .Input .PromptStr != nil {
601- text = * request .TextCompletionRequest .Input .PromptStr
602- } else {
603- var stringBuilder strings.Builder
604- for _ , prompt := range request .TextCompletionRequest .Input .PromptArray {
605- stringBuilder .WriteString (prompt )
606- }
607- text = stringBuilder .String ()
608- }
609- return []schemas.ChatMessage {
610- {
611- Role : schemas .ChatMessageRoleUser ,
612- Content : schemas.ChatMessageContent {
613- ContentStr : & text ,
614- },
615- },
616- }
617- }
618- if request .EmbeddingRequest != nil {
619- texts := request .EmbeddingRequest .Input .Texts
620-
621- if len (texts ) == 0 && request .EmbeddingRequest .Input .Text != nil {
622- texts = []string {* request .EmbeddingRequest .Input .Text }
623- }
624-
625- contentBlocks := make ([]schemas.ChatContentBlock , len (texts ))
626- for i , text := range texts {
627- // Create a per-iteration copy to avoid reusing the same memory address
628- t := text
629- contentBlocks [i ] = schemas.ChatContentBlock {
630- Type : schemas .ChatContentBlockTypeText ,
631- Text : & t ,
632- }
633- }
634- return []schemas.ChatMessage {
635- {
636- Role : schemas .ChatMessageRoleUser ,
637- Content : schemas.ChatMessageContent {
638- ContentBlocks : contentBlocks ,
639- },
640- },
641- }
642- }
643- return []schemas.ChatMessage {}
644- }
0 commit comments