Skip to content

Commit bbf4084

Browse files
fix: async jobs stuck in processing on marshal failure (#1907)
## Summary Fixes async jobs that become stuck in "processing" state when JSON marshaling fails during job execution. Previously, marshal failures would cause the job to exit without updating its status, leaving it permanently in the processing state. ## Changes - Added panic recovery mechanism to ensure async jobs always reach a terminal state even if unexpected panics occur - Introduced `markFailed` helper function to consistently update job status to "failed" with appropriate error details - Added explicit error handling for JSON marshal failures that now properly transition jobs to "failed" status instead of leaving them stuck - Jobs that fail during error serialization or result serialization now get marked as failed with descriptive error messages ## Type of change - [x] Bug fix - [ ] Feature - [ ] Refactor - [ ] Documentation - [ ] Chore/CI ## Affected areas - [x] Core (Go) - [ ] Transports (HTTP) - [ ] Providers/Integrations - [ ] Plugins - [ ] UI (Next.js) - [ ] Docs ## How to test Test async job execution with marshal failures to verify jobs transition to failed state: ```sh # Core/Transports go version go test ./... # Test specific async job scenarios go test ./framework/logstore -v -run TestAsyncJob ``` ## Screenshots/Recordings N/A ## Breaking changes - [ ] Yes - [x] No ## Related issues N/A ## Security considerations No security implications. This change improves system reliability by preventing jobs from being stuck in processing state. ## Checklist - [ ] I read `docs/contributing/README.md` and followed the guidelines - [ ] I added/updated tests where appropriate - [ ] I updated documentation where needed - [ ] I verified builds succeed (Go and UI) - [ ] I verified the CI pipeline passes locally if applicable
1 parent e3cdcdf commit bbf4084

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

framework/logstore/asyncjob.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,31 @@ func (e *AsyncJobExecutor) SubmitJob(virtualKeyValue *string, resultTTL int, ope
118118
func (e *AsyncJobExecutor) executeJob(jobID string, resultTTL int, operation AsyncOperation) {
119119
ctx := schemas.NewBifrostContext(context.Background(), schemas.NoDeadline)
120120

121+
markFailed := func(msg string) {
122+
now := time.Now().UTC()
123+
expiresAt := now.Add(time.Duration(resultTTL) * time.Second)
124+
errJSON, _ := sonic.Marshal(&schemas.BifrostError{Error: &schemas.ErrorField{Message: msg}})
125+
if err := e.logstore.UpdateAsyncJob(ctx, jobID, map[string]interface{}{
126+
"status": schemas.AsyncJobStatusFailed,
127+
"status_code": fasthttp.StatusInternalServerError,
128+
"error": string(errJSON),
129+
"completed_at": now,
130+
"expires_at": expiresAt,
131+
}); err != nil {
132+
e.logger.Warn("failed to update async job to failed: %v", err)
133+
}
134+
}
135+
136+
// The bifrost execution flow is very stable and panics are not expected.
137+
// This recover is purely defensive to ensure the job always reaches a terminal
138+
// state rather than being stuck in "processing" if an unexpected panic occurs.
139+
defer func() {
140+
if r := recover(); r != nil {
141+
e.logger.Warn("async job %s panicked: %v", jobID, r)
142+
markFailed(fmt.Sprintf("internal error: %v", r))
143+
}
144+
}()
145+
121146
// Mark as processing
122147
if err := e.logstore.UpdateAsyncJob(ctx, jobID, map[string]interface{}{
123148
"status": schemas.AsyncJobStatusProcessing,
@@ -137,6 +162,7 @@ func (e *AsyncJobExecutor) executeJob(jobID string, resultTTL int, operation Asy
137162
errJSON, err := sonic.Marshal(bifrostErr)
138163
if err != nil {
139164
e.logger.Warn("failed to marshal bifrost error: %v", err)
165+
markFailed(fmt.Sprintf("failed to serialize error response: %v", err))
140166
return
141167
}
142168
statusCode := fasthttp.StatusInternalServerError
@@ -158,6 +184,7 @@ func (e *AsyncJobExecutor) executeJob(jobID string, resultTTL int, operation Asy
158184
respJSON, err := sonic.Marshal(resp)
159185
if err != nil {
160186
e.logger.Warn("failed to marshal result: %v", err)
187+
markFailed(fmt.Sprintf("failed to serialize result: %v", err))
161188
return
162189
}
163190
if err := e.logstore.UpdateAsyncJob(ctx, jobID, map[string]interface{}{

transports/changelog.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
- fix: preserve original audio filename in transcription requests
1+
- fix: preserve original audio filename in transcription requests
2+
- fix: async jobs stuck in "processing" on marshal failure now correctly transition to "failed"

0 commit comments

Comments
 (0)