-
Notifications
You must be signed in to change notification settings - Fork 285
feat: migrate Bedrock Claude models to native Anthropic Messages API #1924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,3 @@ | ||
| - feat: add Filename field to TranscriptionInput schema to carry original filename through the request pipeline | ||
| - fix: add AudioFilenameFromBytes utility to detect audio format from file headers with mp3 fallback | ||
| - fix: add AudioFilenameFromBytes utility to detect audio format from file headers with mp3 fallback | ||
| - feat: shifted to anthropic native API in bedrock provider for claude models |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,132 @@ | ||||||||||||||||||||||||||||||||||
| package bedrock | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| "github.com/bytedance/sonic" | ||||||||||||||||||||||||||||||||||
| "github.com/maximhq/bifrost/core/providers/anthropic" | ||||||||||||||||||||||||||||||||||
| providerUtils "github.com/maximhq/bifrost/core/providers/utils" | ||||||||||||||||||||||||||||||||||
| schemas "github.com/maximhq/bifrost/core/schemas" | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // getBedrockAnthropicChatRequestBody prepares the Anthropic Messages API-compatible request body | ||||||||||||||||||||||||||||||||||
| // for Bedrock's InvokeModel endpoint. It adds the required anthropic_version body field and | ||||||||||||||||||||||||||||||||||
| // removes the model field (which is specified in the URL path, not the body). | ||||||||||||||||||||||||||||||||||
| // Note: streaming is determined by the URL endpoint (invoke vs invoke-with-response-stream), | ||||||||||||||||||||||||||||||||||
| // NOT by a "stream" field in the request body — so isStreaming only affects caller routing. | ||||||||||||||||||||||||||||||||||
| func getBedrockAnthropicChatRequestBody(ctx *schemas.BifrostContext, request *schemas.BifrostChatRequest, deployment string, providerName schemas.ModelProvider) ([]byte, *schemas.BifrostError) { | ||||||||||||||||||||||||||||||||||
| // Handle raw request body passthrough | ||||||||||||||||||||||||||||||||||
| if rawBody, ok := ctx.Value(schemas.BifrostContextKeyUseRawRequestBody).(bool); ok && rawBody { | ||||||||||||||||||||||||||||||||||
| rawJSON := request.GetRawRequestBody() | ||||||||||||||||||||||||||||||||||
| var requestBody map[string]interface{} | ||||||||||||||||||||||||||||||||||
| if err := sonic.Unmarshal(rawJSON, &requestBody); err != nil { | ||||||||||||||||||||||||||||||||||
| return nil, providerUtils.NewBifrostOperationError(schemas.ErrRequestBodyConversion, fmt.Errorf("failed to unmarshal request body: %w", err), providerName) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| if _, exists := requestBody["max_tokens"]; !exists { | ||||||||||||||||||||||||||||||||||
| requestBody["max_tokens"] = anthropic.AnthropicDefaultMaxTokens | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| if _, exists := requestBody["anthropic_version"]; !exists { | ||||||||||||||||||||||||||||||||||
| requestBody["anthropic_version"] = DefaultBedrockAnthropicVersion | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| delete(requestBody, "model") | ||||||||||||||||||||||||||||||||||
| delete(requestBody, "fallbacks") | ||||||||||||||||||||||||||||||||||
| // Do NOT add "stream" to the body — Bedrock uses the endpoint path for streaming | ||||||||||||||||||||||||||||||||||
| delete(requestBody, "stream") | ||||||||||||||||||||||||||||||||||
| jsonBody, err := sonic.Marshal(requestBody) | ||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||
| return nil, providerUtils.NewBifrostOperationError(schemas.ErrProviderRequestMarshal, err, providerName) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| return jsonBody, nil | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| reqBody, err := anthropic.ToAnthropicChatRequest(ctx, request) | ||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||
| return nil, providerUtils.NewBifrostOperationError(schemas.ErrRequestBodyConversion, err, providerName) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| if reqBody == nil { | ||||||||||||||||||||||||||||||||||
| return nil, providerUtils.NewBifrostOperationError("request body is not provided", nil, providerName) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| reqBody.Model = deployment | ||||||||||||||||||||||||||||||||||
| // Do NOT set Stream — Bedrock uses the endpoint path for streaming | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| return marshalBedrockAnthropicBody(reqBody, reqBody.GetExtraParams(), ctx, providerName) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // getBedrockAnthropicResponsesRequestBody prepares the Anthropic Messages API-compatible request body | ||||||||||||||||||||||||||||||||||
| // for Bedrock's InvokeModel endpoint when handling Responses API requests. | ||||||||||||||||||||||||||||||||||
| // Note: streaming is determined by the URL endpoint, NOT a "stream" body field. | ||||||||||||||||||||||||||||||||||
| func getBedrockAnthropicResponsesRequestBody(ctx *schemas.BifrostContext, request *schemas.BifrostResponsesRequest, deployment string, providerName schemas.ModelProvider) ([]byte, *schemas.BifrostError) { | ||||||||||||||||||||||||||||||||||
| // Handle raw request body passthrough | ||||||||||||||||||||||||||||||||||
| if rawBody, ok := ctx.Value(schemas.BifrostContextKeyUseRawRequestBody).(bool); ok && rawBody { | ||||||||||||||||||||||||||||||||||
| rawJSON := request.GetRawRequestBody() | ||||||||||||||||||||||||||||||||||
| var requestBody map[string]interface{} | ||||||||||||||||||||||||||||||||||
| if err := sonic.Unmarshal(rawJSON, &requestBody); err != nil { | ||||||||||||||||||||||||||||||||||
| return nil, providerUtils.NewBifrostOperationError(schemas.ErrRequestBodyConversion, fmt.Errorf("failed to unmarshal request body: %w", err), providerName) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| if _, exists := requestBody["max_tokens"]; !exists { | ||||||||||||||||||||||||||||||||||
| requestBody["max_tokens"] = anthropic.AnthropicDefaultMaxTokens | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| if _, exists := requestBody["anthropic_version"]; !exists { | ||||||||||||||||||||||||||||||||||
| requestBody["anthropic_version"] = DefaultBedrockAnthropicVersion | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| delete(requestBody, "model") | ||||||||||||||||||||||||||||||||||
| delete(requestBody, "fallbacks") | ||||||||||||||||||||||||||||||||||
| // Do NOT add "stream" to the body — Bedrock uses the endpoint path for streaming | ||||||||||||||||||||||||||||||||||
| delete(requestBody, "stream") | ||||||||||||||||||||||||||||||||||
| jsonBody, err := sonic.Marshal(requestBody) | ||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||
| return nil, providerUtils.NewBifrostOperationError(schemas.ErrProviderRequestMarshal, err, providerName) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| return jsonBody, nil | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Mutate the model before conversion so converters see the resolved deployment name | ||||||||||||||||||||||||||||||||||
| request.Model = deployment | ||||||||||||||||||||||||||||||||||
| reqBody, err := anthropic.ToAnthropicResponsesRequest(ctx, request) | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+83
to
+85
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid mutating the inbound Responses request model.
✅ Proposed fix- // Mutate the model before conversion so converters see the resolved deployment name
- request.Model = deployment
reqBody, err := anthropic.ToAnthropicResponsesRequest(ctx, request)
if err != nil {
return nil, providerUtils.NewBifrostOperationError(schemas.ErrRequestBodyConversion, err, providerName)
}
if reqBody == nil {
return nil, providerUtils.NewBifrostOperationError("request body is not provided", nil, providerName)
}
+ reqBody.Model = deployment📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||
| return nil, providerUtils.NewBifrostOperationError(schemas.ErrRequestBodyConversion, err, providerName) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| if reqBody == nil { | ||||||||||||||||||||||||||||||||||
| return nil, providerUtils.NewBifrostOperationError("request body is not provided", nil, providerName) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| // Do NOT set Stream — Bedrock uses the endpoint path for streaming | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| return marshalBedrockAnthropicBody(reqBody, reqBody.GetExtraParams(), ctx, providerName) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // marshalBedrockAnthropicBody converts an AnthropicMessageRequest to JSON suitable for | ||||||||||||||||||||||||||||||||||
| // Bedrock's InvokeModel endpoint. It adds anthropic_version, removes the model field | ||||||||||||||||||||||||||||||||||
| // (specified in the URL path), and merges extra params if passthrough is enabled. | ||||||||||||||||||||||||||||||||||
| func marshalBedrockAnthropicBody(reqBody *anthropic.AnthropicMessageRequest, extraParams map[string]interface{}, ctx *schemas.BifrostContext, providerName schemas.ModelProvider) ([]byte, *schemas.BifrostError) { | ||||||||||||||||||||||||||||||||||
| reqBytes, err := sonic.Marshal(reqBody) | ||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||
| return nil, providerUtils.NewBifrostOperationError(schemas.ErrProviderRequestMarshal, err, providerName) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| var requestBody map[string]interface{} | ||||||||||||||||||||||||||||||||||
| if err := sonic.Unmarshal(reqBytes, &requestBody); err != nil { | ||||||||||||||||||||||||||||||||||
| return nil, providerUtils.NewBifrostOperationError(schemas.ErrProviderRequestMarshal, err, providerName) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Add Bedrock-specific anthropic_version if not already present | ||||||||||||||||||||||||||||||||||
| if _, exists := requestBody["anthropic_version"]; !exists { | ||||||||||||||||||||||||||||||||||
| requestBody["anthropic_version"] = DefaultBedrockAnthropicVersion | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Remove model and stream — model is in URL path; streaming is via endpoint path, not body field | ||||||||||||||||||||||||||||||||||
| delete(requestBody, "model") | ||||||||||||||||||||||||||||||||||
| delete(requestBody, "stream") | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Merge extra params if passthrough is enabled | ||||||||||||||||||||||||||||||||||
| if ctx.Value(schemas.BifrostContextKeyPassthroughExtraParams) != nil && ctx.Value(schemas.BifrostContextKeyPassthroughExtraParams) == true { | ||||||||||||||||||||||||||||||||||
| if len(extraParams) > 0 { | ||||||||||||||||||||||||||||||||||
| providerUtils.MergeExtraParams(requestBody, extraParams) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+120
to
+125
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re-sanitize reserved fields after passthrough merge.
✅ Proposed fix // Merge extra params if passthrough is enabled
if ctx.Value(schemas.BifrostContextKeyPassthroughExtraParams) != nil && ctx.Value(schemas.BifrostContextKeyPassthroughExtraParams) == true {
if len(extraParams) > 0 {
providerUtils.MergeExtraParams(requestBody, extraParams)
+ // Keep Bedrock Anthropic invariants intact after merge
+ delete(requestBody, "model")
+ delete(requestBody, "stream")
+ delete(requestBody, "fallbacks")
}
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| jsonBody, err := sonic.Marshal(requestBody) | ||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||
| return nil, providerUtils.NewBifrostOperationError(schemas.ErrProviderRequestMarshal, err, providerName) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| return jsonBody, nil | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Release wrapper now exposes a pool reset policy violation.
Line 161 delegates to a release path that does not reset all pooled fields to zero values before
pool.Put()(for example,CreatedAtis set to current time and maps are reinitialized instead of zeroed). With this API exported, the risk footprint grows.🔧 Suggested fix (zero on release, initialize on acquire)
func (state *AnthropicResponsesStreamState) flush() { state.ChunkIndex = nil state.AccumulatedJSON = "" state.ComputerToolID = nil state.WebSearchToolID = nil state.WebSearchOutputIndex = nil state.WebSearchResult = nil - state.ContentIndexToOutputIndex = make(map[int]int) - state.ContentIndexToBlockType = make(map[int]AnthropicContentBlockType) - state.ToolArgumentBuffers = make(map[int]string) - state.MCPCallOutputIndices = make(map[int]bool) - state.ItemIDs = make(map[int]string) - state.ReasoningSignatures = make(map[int]string) - state.TextContentIndices = make(map[int]bool) - state.ReasoningContentIndices = make(map[int]bool) - state.CompactionContentIndices = make(map[int]*schemas.CacheControl) + state.ContentIndexToOutputIndex = nil + state.ContentIndexToBlockType = nil + state.ToolArgumentBuffers = nil + state.MCPCallOutputIndices = nil + state.ItemIDs = nil + state.ReasoningSignatures = nil + state.TextContentIndices = nil + state.ReasoningContentIndices = nil + state.CompactionContentIndices = nil state.CurrentOutputIndex = 0 state.MessageID = nil state.StopReason = nil state.Model = nil - state.CreatedAt = int(time.Now().Unix()) + state.CreatedAt = 0 state.HasEmittedCreated = false state.HasEmittedInProgress = false state.StructuredOutputToolName = "" state.StructuredOutputIndex = nil }As per coding guidelines "Always reset all fields of pooled objects to their zero values before calling pool.Put(). Stale data from previous requests can leak to the next user of the pooled object."
🤖 Prompt for AI Agents