Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

Commit 8d9b1ad

Browse files
[Backport 5.3] Fix panics in streaming completions handler (#60476)
Fix panics in streaming completions handler (#60474) (cherry picked from commit 52dffbb) Co-authored-by: Cezary Bartoszuk <[email protected]>
1 parent 7aaef8d commit 8d9b1ad

File tree

2 files changed

+25
-22
lines changed

2 files changed

+25
-22
lines changed

internal/completions/httpapi/handler.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -226,31 +226,34 @@ func newStreamingResponseHandler(logger log.Logger, db database.DB, feature type
226226
return eventWriter
227227
})
228228

229+
// Isolate writing events.
230+
var mu sync.Mutex
231+
writeEvent := func(name string, data any) error {
232+
mu.Lock()
233+
defer mu.Unlock()
234+
if ev := eventWriter(); ev != nil {
235+
return ev.Event(name, data)
236+
}
237+
return nil
238+
}
239+
229240
// Always send a final done event so clients know the stream is shutting down.
230241
firstEventObserved := false
231242
defer func() {
232243
if firstEventObserved {
233-
if ev := eventWriter(); ev != nil {
234-
_ = ev.Event("done", map[string]any{})
235-
}
244+
_ = writeEvent("done", map[string]any{})
236245
}
237246
}()
238247
start := time.Now()
239248
eventSink := func(e types.CompletionResponse) error {
240-
if w := eventWriter(); w != nil {
241-
return w.Event("completion", e)
242-
}
243-
return nil
249+
return writeEvent("completion", e)
244250
}
245251
attributionErrorLog := func(err error) {
246252
l := trace.Logger(ctx, logger)
247-
ev := eventWriter()
248-
if ev != nil {
249-
if err := ev.Event("attribution-error", map[string]string{"error": err.Error()}); err != nil {
250-
l.Error("error reporting attribution error", log.Error(err))
251-
} else {
252-
return
253-
}
253+
if err := writeEvent("attribution-error", map[string]string{"error": err.Error()}); err != nil {
254+
l.Error("error reporting attribution error", log.Error(err))
255+
} else {
256+
return
254257
}
255258
l.Error("attribution error", log.Error(err))
256259
}
@@ -325,20 +328,16 @@ func newStreamingResponseHandler(logger log.Logger, db database.DB, feature type
325328
firstEventObserved = true
326329
timeToFirstEventMetrics.Observe(time.Since(start).Seconds(), 1, nil, requestParams.Model)
327330
}
328-
if ev := eventWriter(); ev != nil {
329-
if err := ev.Event("error", map[string]string{"error": err.Error()}); err != nil {
330-
l.Error("error reporting streaming completion error", log.Error(err))
331-
}
331+
if err := writeEvent("error", map[string]string{"error": err.Error()}); err != nil {
332+
l.Error("error reporting streaming completion error", log.Error(err))
332333
}
333334
return
334335
}
335336
if f != nil { // if autocomplete-attribution enabled
336337
if err := f.WaitDone(ctx); err != nil {
337338
l := trace.Logger(ctx, logger)
338-
if ev := eventWriter(); ev != nil {
339-
if err := ev.Event("error", map[string]string{"error": err.Error()}); err != nil {
340-
l.Error("error reporting streaming completion error", log.Error(err))
341-
}
339+
if err := writeEvent("error", map[string]string{"error": err.Error()}); err != nil {
340+
l.Error("error reporting streaming completion error", log.Error(err))
342341
}
343342
}
344343
}

internal/guardrails/attribution_filter.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ func (a *completionsFilter) Send(ctx context.Context, e types.CompletionResponse
115115
// is called and it will wait for remaining attribution search if context
116116
// time limits permit.
117117
func (a *completionsFilter) WaitDone(ctx context.Context) error {
118+
// If attribution never run, we're done.
119+
a.attributionRun.Do(func() {
120+
close(a.attributionFinished)
121+
})
118122
select {
119123
case <-ctx.Done():
120124
a.blockSending()

0 commit comments

Comments
 (0)