diff --git a/apps/gateway/src/anthropic/anthropic.ts b/apps/gateway/src/anthropic/anthropic.ts index 31347833ff..21ae53c341 100644 --- a/apps/gateway/src/anthropic/anthropic.ts +++ b/apps/gateway/src/anthropic/anthropic.ts @@ -143,6 +143,12 @@ const anthropicResponseSchema = z.object({ usage: z.object({ input_tokens: z.number(), output_tokens: z.number(), + // Anthropic emits these on caching-supported models, but we keep them + // optional with a 0 default so the schema doesn't fail validation if an + // older Claude model, a beta endpoint, or a future API change ever omits + // them. The downstream conversion code already handles 0 correctly. + cache_creation_input_tokens: z.number().optional().default(0), + cache_read_input_tokens: z.number().optional().default(0), }), }); @@ -207,21 +213,40 @@ anthropic.openapi(messages, async (c) => { // Transform Anthropic request to OpenAI format const openaiMessages: Array> = []; - // Add system message if provided + // Add system message if provided. + // When the caller supplies cache_control on any text block, preserve the + // per-block array form so the inner /v1/chat/completions path can forward + // cache_control markers verbatim to Anthropic. Otherwise, join with " " to + // preserve the legacy behavior (and matching token counts) for callers + // that pass array-form system without caching opt-in. if (anthropicRequest.system) { - let systemContent: string; if (typeof anthropicRequest.system === "string") { - systemContent = anthropicRequest.system; + openaiMessages.push({ + role: "system", + content: anthropicRequest.system, + }); } else { - // Handle array format - concatenate all text blocks - systemContent = anthropicRequest.system - .map((block) => block.text) - .join(" "); + const hasAnyCacheControl = anthropicRequest.system.some( + (block) => block.cache_control, + ); + if (hasAnyCacheControl) { + openaiMessages.push({ + role: "system", + content: anthropicRequest.system.map((block) => ({ + type: "text", + text: block.text, + ...(block.cache_control && { + cache_control: block.cache_control, + }), + })), + }); + } else { + openaiMessages.push({ + role: "system", + content: anthropicRequest.system.map((block) => block.text).join(" "), + }); + } } - openaiMessages.push({ - role: "system", - content: systemContent, - }); } // Transform messages using the approach from claude-code-proxy @@ -372,9 +397,13 @@ anthropic.openapi(messages, async (c) => { const hasOnlyText = message.content.every( (block) => block.type === "text", ); + const hasAnyCacheControl = message.content.some( + (block) => block.type === "text" && block.cache_control, + ); - if (hasOnlyText) { - // For text-only content, flatten to a simple string to avoid content type issues + if (hasOnlyText && !hasAnyCacheControl) { + // For text-only content with no cache markers, flatten to a simple + // string to avoid content type issues. const textContent = message.content .filter((block) => block.type === "text") .map((block) => block.text) @@ -385,10 +414,18 @@ anthropic.openapi(messages, async (c) => { content: textContent, }); } else { - // For true multi-modal content, transform blocks + // For multi-modal content, or text content with cache_control markers, + // transform blocks while preserving cache_control so the inner + // completions path can forward it to Anthropic. const content = message.content.map((block) => { if (block.type === "text" && block.text) { - return { type: "text", text: block.text }; + return { + type: "text", + text: block.text, + ...(block.cache_control && { + cache_control: block.cache_control, + }), + }; } if (block.type === "image" && block.source) { return { @@ -498,7 +535,17 @@ anthropic.openapi(messages, async (c) => { name?: string; input?: string; }> = []; - let usage = { input_tokens: 0, output_tokens: 0 }; + let usage: { + input_tokens: number; + output_tokens: number; + cache_creation_input_tokens: number; + cache_read_input_tokens: number; + } = { + input_tokens: 0, + output_tokens: 0, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 0, + }; let currentTextBlockIndex: number | null = null; const toolCallBlockIndex = new Map(); @@ -694,9 +741,23 @@ anthropic.openapi(messages, async (c) => { // Update usage if available if (chunk.usage) { + const promptDetails = + chunk.usage.prompt_tokens_details ?? {}; + const cacheRead: number = promptDetails.cached_tokens ?? 0; + const cacheCreation: number = + promptDetails.cache_creation_tokens ?? 0; + const totalPrompt: number = chunk.usage.prompt_tokens ?? 0; + const nonCachedInput = Math.max( + 0, + totalPrompt - cacheRead - cacheCreation, + ); usage = { - input_tokens: chunk.usage.prompt_tokens ?? 0, + input_tokens: nonCachedInput, output_tokens: chunk.usage.completion_tokens ?? 0, + // Match Anthropic's API and always emit both fields + // (set to 0 when inapplicable). + cache_creation_input_tokens: cacheCreation, + cache_read_input_tokens: cacheRead, }; } @@ -787,6 +848,15 @@ anthropic.openapi(messages, async (c) => { } } + const usageDetails = openaiResponse.usage?.prompt_tokens_details ?? {}; + const cachedTokens: number = usageDetails.cached_tokens ?? 0; + const cacheCreationTokens: number = usageDetails.cache_creation_tokens ?? 0; + const totalPromptTokens: number = openaiResponse.usage?.prompt_tokens ?? 0; + const nonCachedInputTokens = Math.max( + 0, + totalPromptTokens - cachedTokens - cacheCreationTokens, + ); + const anthropicResponse = { id: openaiResponse.id, type: "message" as const, @@ -798,8 +868,13 @@ anthropic.openapi(messages, async (c) => { ), stop_sequence: null, usage: { - input_tokens: openaiResponse.usage?.prompt_tokens ?? 0, + input_tokens: nonCachedInputTokens, output_tokens: openaiResponse.usage?.completion_tokens ?? 0, + // Match Anthropic's actual API: always emit both fields (set to 0 + // when inapplicable) so SDK clients with strict typing can read them + // without optionality checks. + cache_creation_input_tokens: cacheCreationTokens, + cache_read_input_tokens: cachedTokens, }, }; diff --git a/apps/gateway/src/chat-prompt-caching.e2e.ts b/apps/gateway/src/chat-prompt-caching.e2e.ts index bdf01d7302..a8d6abc98c 100644 --- a/apps/gateway/src/chat-prompt-caching.e2e.ts +++ b/apps/gateway/src/chat-prompt-caching.e2e.ts @@ -167,32 +167,59 @@ describe("e2e prompt caching", getConcurrentTestOptions(), () => { }); } - // Second request - should read from cache - const secondRequestId = generateTestRequestId(); - const secondRes = await app.request("/v1/chat/completions", { - method: "POST", - headers: { - "Content-Type": "application/json", - "x-request-id": secondRequestId, - Authorization: `Bearer real-token`, - }, - body: JSON.stringify({ - model: model, - messages: [ - { - role: "system", - content: longSystemPrompt, - }, - { - role: "user", - content: - "Just reply with 'OK' to confirm you received the context.", - }, - ], - }), - }); - - const secondJson = await secondRes.json(); + // Second request - should read from cache. + // Anthropic prompt cache writes are eventually consistent, so a + // back-to-back request can occasionally miss. Retry with backoff + // until we observe a cache read or run out of attempts. + const sendCacheRequest = async () => { + const secondRequestId = generateTestRequestId(); + const res = await app.request("/v1/chat/completions", { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-request-id": secondRequestId, + Authorization: `Bearer real-token`, + }, + body: JSON.stringify({ + model: model, + messages: [ + { + role: "system", + content: longSystemPrompt, + }, + { + role: "user", + content: + "Just reply with 'OK' to confirm you received the context.", + }, + ], + }), + }); + const json = await res.json(); + return { res, json, secondRequestId }; + }; + + let attempt = 0; + const maxAttempts = 4; + let secondRes: Response; + let secondJson: any; + let secondRequestId: string; + do { + attempt++; + ({ + res: secondRes, + json: secondJson, + secondRequestId, + } = await sendCacheRequest()); + const cached = + secondJson?.usage?.prompt_tokens_details?.cached_tokens ?? 0; + if (secondRes.status !== 200 || cached > 0) { + break; + } + if (attempt < maxAttempts) { + await new Promise((r) => setTimeout(r, 750 * attempt)); + } + } while (attempt < maxAttempts); if (logMode) { console.log("Second response:", JSON.stringify(secondJson, null, 2)); } diff --git a/apps/gateway/src/chat/chat.ts b/apps/gateway/src/chat/chat.ts index de20ea3a5d..2890b750d5 100644 --- a/apps/gateway/src/chat/chat.ts +++ b/apps/gateway/src/chat/chat.ts @@ -5403,6 +5403,7 @@ chat.openapi(completions, async (c) => { let totalTokens = null; let reasoningTokens = null; let cachedTokens = null; + let cacheCreationTokens: number | null = null; let streamingToolCalls = null; let imageByteSize = 0; // Track total image data size for token estimation let outputImageCount = 0; // Track number of output images for cost calculation @@ -6570,6 +6571,9 @@ chat.openapi(completions, async (c) => { if (usage.cachedTokens !== null) { cachedTokens = usage.cachedTokens; } + if (usage.cacheCreationTokens !== null) { + cacheCreationTokens = usage.cacheCreationTokens; + } // Estimate tokens if not provided and we have a finish reason if (finishReason && (!promptTokens || !completionTokens)) { @@ -7103,9 +7107,15 @@ chat.openapi(completions, async (c) => { 1, Math.round(adjPrompt + adjCompletion), ), - ...(cachedTokens !== null && { + ...((cachedTokens !== null || + (cacheCreationTokens !== null && + cacheCreationTokens > 0)) && { prompt_tokens_details: { - cached_tokens: cachedTokens, + cached_tokens: cachedTokens ?? 0, + ...(cacheCreationTokens !== null && + cacheCreationTokens > 0 && { + cache_creation_tokens: cacheCreationTokens, + }), }, }), cost_usd_total: streamingCostsEarly.totalCost, @@ -8720,6 +8730,7 @@ chat.openapi(completions, async (c) => { completionTokens, reasoningTokens, cachedTokens, + cacheCreationTokens, toolResults, images, annotations, @@ -8897,6 +8908,7 @@ chat.openapi(completions, async (c) => { routingAttempts.length > 0 ? routingAttempts : null, requestId, usedRegion, + cacheCreationTokens, ); // Extract plugin IDs for logging diff --git a/apps/gateway/src/chat/schemas/completions.ts b/apps/gateway/src/chat/schemas/completions.ts index c451c254b5..da4718d215 100644 --- a/apps/gateway/src/chat/schemas/completions.ts +++ b/apps/gateway/src/chat/schemas/completions.ts @@ -19,6 +19,11 @@ export const completionsRequestSchema = z.object({ z.object({ type: z.literal("text"), text: z.string(), + cache_control: z + .object({ + type: z.literal("ephemeral"), + }) + .optional(), }), z.object({ type: z.literal("image_url"), diff --git a/apps/gateway/src/chat/tools/extract-token-usage.ts b/apps/gateway/src/chat/tools/extract-token-usage.ts index 780619a6af..c1ac12f8e0 100644 --- a/apps/gateway/src/chat/tools/extract-token-usage.ts +++ b/apps/gateway/src/chat/tools/extract-token-usage.ts @@ -45,6 +45,7 @@ export function extractTokenUsage( let totalTokens = null; let reasoningTokens = null; let cachedTokens = null; + let cacheCreationTokens = null; switch (provider) { case "google-ai-studio": @@ -110,6 +111,7 @@ export function extractTokenUsage( completionTokens = data.usage.outputTokens ?? null; // Cached tokens are the tokens read from cache (discount applies to these) cachedTokens = cacheReadTokens; + cacheCreationTokens = cacheWriteTokens; totalTokens = data.usage.totalTokens ?? null; } break; @@ -118,15 +120,16 @@ export function extractTokenUsage( // For Anthropic: input_tokens are the non-cached tokens // We need to add cache_creation_input_tokens to get total input tokens const inputTokens = data.usage.input_tokens ?? 0; - const cacheCreationTokens = data.usage.cache_creation_input_tokens ?? 0; + const cacheCreation = data.usage.cache_creation_input_tokens ?? 0; const cacheReadTokens = data.usage.cache_read_input_tokens ?? 0; // Total prompt tokens = non-cached + cache creation + cache read - promptTokens = inputTokens + cacheCreationTokens + cacheReadTokens; + promptTokens = inputTokens + cacheCreation + cacheReadTokens; completionTokens = data.usage.output_tokens ?? null; reasoningTokens = data.usage.reasoning_output_tokens ?? null; // Cached tokens are the tokens read from cache (discount applies to these) cachedTokens = cacheReadTokens; + cacheCreationTokens = cacheCreation; totalTokens = (promptTokens ?? 0) + (completionTokens ?? 0); } break; @@ -157,5 +160,6 @@ export function extractTokenUsage( totalTokens, reasoningTokens, cachedTokens, + cacheCreationTokens, }; } diff --git a/apps/gateway/src/chat/tools/parse-provider-response.ts b/apps/gateway/src/chat/tools/parse-provider-response.ts index bd7e506822..d933af78af 100644 --- a/apps/gateway/src/chat/tools/parse-provider-response.ts +++ b/apps/gateway/src/chat/tools/parse-provider-response.ts @@ -30,6 +30,7 @@ export function parseProviderResponse( let totalTokens = null; let reasoningTokens = null; let cachedTokens = null; + let cacheCreationTokens = null; let toolResults = null; let images: ImageObject[] = []; const annotations: Annotation[] = []; @@ -106,6 +107,7 @@ export function parseProviderResponse( totalTokens = json.usage.totalTokens ?? null; // Cached tokens are the tokens read from cache (discount applies to these) cachedTokens = cacheReadTokens; + cacheCreationTokens = cacheWriteTokens; } // Extract tool calls if present @@ -185,15 +187,16 @@ export function parseProviderResponse( // We need to add cache_creation_input_tokens to get total input tokens if (json.usage) { const inputTokens = json.usage.input_tokens ?? 0; - const cacheCreationTokens = json.usage.cache_creation_input_tokens ?? 0; + const cacheCreation = json.usage.cache_creation_input_tokens ?? 0; const cacheReadTokens = json.usage.cache_read_input_tokens ?? 0; // Total prompt tokens = non-cached + cache creation + cache read - promptTokens = inputTokens + cacheCreationTokens + cacheReadTokens; + promptTokens = inputTokens + cacheCreation + cacheReadTokens; completionTokens = json.usage.output_tokens ?? null; reasoningTokens = json.usage.reasoning_output_tokens ?? null; // Cached tokens are the tokens read from cache (discount applies to these) cachedTokens = cacheReadTokens; + cacheCreationTokens = cacheCreation; totalTokens = promptTokens && completionTokens ? promptTokens + completionTokens @@ -841,6 +844,7 @@ export function parseProviderResponse( totalTokens, reasoningTokens, cachedTokens, + cacheCreationTokens, toolResults, images, annotations: annotations.length > 0 ? annotations : null, diff --git a/apps/gateway/src/chat/tools/transform-response-to-openai.ts b/apps/gateway/src/chat/tools/transform-response-to-openai.ts index eec83ddbdd..133cbe4cbc 100644 --- a/apps/gateway/src/chat/tools/transform-response-to-openai.ts +++ b/apps/gateway/src/chat/tools/transform-response-to-openai.ts @@ -107,7 +107,11 @@ function buildUsageObject( cachedTokens: number | null, costs: CostData | null, showUpgradeMessage = false, + cacheCreationTokens: number | null = null, ) { + const hasCacheRead = cachedTokens !== null; + const hasCacheCreation = + cacheCreationTokens !== null && cacheCreationTokens > 0; return { prompt_tokens: Math.max(1, promptTokens ?? 1), completion_tokens: completionTokens ?? 0, @@ -119,9 +123,12 @@ function buildUsageObject( ...(reasoningTokens !== null && { reasoning_tokens: reasoningTokens, }), - ...(cachedTokens !== null && { + ...((hasCacheRead || hasCacheCreation) && { prompt_tokens_details: { - cached_tokens: cachedTokens, + cached_tokens: cachedTokens ?? 0, + ...(hasCacheCreation && { + cache_creation_tokens: cacheCreationTokens, + }), }, }), ...(costs !== null && { @@ -166,6 +173,7 @@ export function transformResponseToOpenai( routing: RoutingAttempt[] | null = null, requestId = "", usedRegion?: string | undefined, + cacheCreationTokens: number | null = null, ) { let transformedResponse = json; @@ -225,6 +233,7 @@ export function transformResponseToOpenai( cachedTokens, costs, showUpgradeMessage, + cacheCreationTokens, ), metadata: buildMetadata( requestedModel, @@ -275,6 +284,7 @@ export function transformResponseToOpenai( cachedTokens, costs, showUpgradeMessage, + cacheCreationTokens, ), metadata: buildMetadata( requestedModel, @@ -319,6 +329,7 @@ export function transformResponseToOpenai( cachedTokens, costs, showUpgradeMessage, + cacheCreationTokens, ), metadata: buildMetadata( requestedModel, @@ -413,6 +424,7 @@ export function transformResponseToOpenai( cachedTokens, costs, showUpgradeMessage, + cacheCreationTokens, ), metadata: buildMetadata( requestedModel, @@ -455,6 +467,7 @@ export function transformResponseToOpenai( cachedTokens, costs, showUpgradeMessage, + cacheCreationTokens, ), metadata: buildMetadata( requestedModel, @@ -553,6 +566,7 @@ export function transformResponseToOpenai( cachedTokens, costs, showUpgradeMessage, + cacheCreationTokens, ), metadata: buildMetadata( requestedModel, @@ -652,6 +666,7 @@ export function transformResponseToOpenai( cachedTokens, costs, showUpgradeMessage, + cacheCreationTokens, ), metadata: buildMetadata( requestedModel, @@ -742,6 +757,7 @@ export function transformResponseToOpenai( cachedTokens, costs, showUpgradeMessage, + cacheCreationTokens, ), metadata: buildMetadata( requestedModel, @@ -833,6 +849,7 @@ export function transformResponseToOpenai( cachedTokens, costs, showUpgradeMessage, + cacheCreationTokens, ), metadata: buildMetadata( requestedModel, diff --git a/apps/gateway/src/chat/tools/transform-streaming-to-openai.ts b/apps/gateway/src/chat/tools/transform-streaming-to-openai.ts index 69f98acd26..bbc5f838b2 100644 --- a/apps/gateway/src/chat/tools/transform-streaming-to-openai.ts +++ b/apps/gateway/src/chat/tools/transform-streaming-to-openai.ts @@ -9,6 +9,29 @@ import { transformOpenaiStreaming } from "./transform-openai-streaming.js"; import type { Annotation, StreamingDelta } from "./types.js"; import type { Provider } from "@llmgateway/models"; +function normalizeAnthropicUsage(usage: any): any { + if (!usage || typeof usage !== "object") { + return null; + } + const inputTokens = usage.input_tokens ?? 0; + const cacheCreation = usage.cache_creation_input_tokens ?? 0; + const cacheRead = usage.cache_read_input_tokens ?? 0; + const outputTokens = usage.output_tokens ?? 0; + const promptTokens = inputTokens + cacheCreation + cacheRead; + const hasCacheInfo = cacheRead > 0 || cacheCreation > 0; + return { + prompt_tokens: promptTokens, + completion_tokens: outputTokens, + total_tokens: promptTokens + outputTokens, + ...(hasCacheInfo && { + prompt_tokens_details: { + cached_tokens: cacheRead, + ...(cacheCreation > 0 && { cache_creation_tokens: cacheCreation }), + }, + }), + }; +} + export function transformStreamingToOpenai( usedProvider: Provider, usedModel: string, @@ -58,7 +81,7 @@ export function transformStreamingToOpenai( finish_reason: null, }, ], - usage: data.usage ?? null, + usage: normalizeAnthropicUsage(data.usage), }; } else if ( data.type === "content_block_delta" && @@ -80,7 +103,7 @@ export function transformStreamingToOpenai( finish_reason: null, }, ], - usage: data.usage ?? null, + usage: normalizeAnthropicUsage(data.usage), }; } else if ( data.type === "content_block_start" && @@ -106,7 +129,7 @@ export function transformStreamingToOpenai( finish_reason: null, }, ], - usage: data.usage ?? null, + usage: normalizeAnthropicUsage(data.usage), }; } else if ( data.type === "content_block_start" && @@ -137,7 +160,7 @@ export function transformStreamingToOpenai( finish_reason: null, }, ], - usage: data.usage ?? null, + usage: normalizeAnthropicUsage(data.usage), }; } else if ( data.type === "content_block_delta" && @@ -163,7 +186,7 @@ export function transformStreamingToOpenai( finish_reason: null, }, ], - usage: data.usage ?? null, + usage: normalizeAnthropicUsage(data.usage), }; } else { transformedData = { @@ -188,7 +211,7 @@ export function transformStreamingToOpenai( finish_reason: null, }, ], - usage: data.usage ?? null, + usage: normalizeAnthropicUsage(data.usage), }; } } else if ( @@ -224,7 +247,7 @@ export function transformStreamingToOpenai( finish_reason: null, }, ], - usage: data.usage ?? null, + usage: normalizeAnthropicUsage(data.usage), }; } else if (data.type === "message_delta" && data.delta?.stop_reason) { const stopReason = data.delta.stop_reason; @@ -249,7 +272,7 @@ export function transformStreamingToOpenai( : "stop", }, ], - usage: data.usage ?? null, + usage: normalizeAnthropicUsage(data.usage), }; } else if (data.type === "message_stop" || data.stop_reason) { const stopReason = data.stop_reason ?? "end_turn"; @@ -274,7 +297,7 @@ export function transformStreamingToOpenai( : "stop", }, ], - usage: data.usage ?? null, + usage: normalizeAnthropicUsage(data.usage), }; } else if (data.delta?.text) { transformedData = { @@ -292,7 +315,7 @@ export function transformStreamingToOpenai( finish_reason: null, }, ], - usage: data.usage ?? null, + usage: normalizeAnthropicUsage(data.usage), }; } else { logger.warn("[streaming] Unrecognized Anthropic chunk", { @@ -316,7 +339,7 @@ export function transformStreamingToOpenai( finish_reason: null, }, ], - usage: data.usage ?? null, + usage: normalizeAnthropicUsage(data.usage), }; } break; diff --git a/apps/gateway/src/native-anthropic-cache.e2e.ts b/apps/gateway/src/native-anthropic-cache.e2e.ts new file mode 100644 index 0000000000..b564d82bc2 --- /dev/null +++ b/apps/gateway/src/native-anthropic-cache.e2e.ts @@ -0,0 +1,531 @@ +import "dotenv/config"; +import { beforeAll, beforeEach, describe, expect, test } from "vitest"; + +import { + beforeAllHook, + beforeEachHook, + generateTestRequestId, + getConcurrentTestOptions, + getTestOptions, + logMode, +} from "@/chat-helpers.e2e.js"; + +import { app } from "./app.js"; + +// Generate a system prompt long enough to cross Anthropic's minimum cacheable +// threshold for Haiku 4.5 (>= ~2k tokens). 500 repeats produces ~6.5k tokens. +function buildLongSystemPrompt(): string { + return ( + "You are a helpful AI assistant. " + + "This is detailed context information that should be cached for optimal efficiency. ".repeat( + 500, + ) + + "Please analyze carefully." + ); +} + +// Anthropic's prompt cache writes are eventually consistent — back-to-back +// requests sometimes miss the cache the first time. Retry with a short backoff. +async function sendUntilCacheRead( + send: () => Promise<{ status: number; json: any }>, + maxAttempts = 4, +): Promise<{ status: number; json: any; attempts: number }> { + let last: { status: number; json: any } = { status: 0, json: null }; + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + last = await send(); + if (last.status !== 200) { + return { ...last, attempts: attempt }; + } + const usage = last.json?.usage; + const cacheRead = + usage?.cache_read_input_tokens ?? + usage?.prompt_tokens_details?.cached_tokens ?? + 0; + if (cacheRead > 0) { + return { ...last, attempts: attempt }; + } + if (attempt < maxAttempts) { + await new Promise((r) => setTimeout(r, 500 * attempt)); + } + } + return { ...last, attempts: maxAttempts }; +} + +const hasAnthropicKey = !!process.env.LLM_ANTHROPIC_API_KEY; +const hasBedrockKey = !!process.env.LLM_AWS_BEDROCK_API_KEY; + +// Anthropic prompt cache reads are billed at ~10% of normal input price. +// Within a single response that has both cached and uncached input tokens, +// the per-token cached cost MUST be strictly less than the per-token uncached +// cost — otherwise the gateway is overbilling the cache discount. +// +// We compare ratios within ONE response (rather than across two requests) +// because the upstream provider's cache is warm across test runs, so a +// "first call uncached / second call cached" assertion is unreliable. +function assertCacheDiscountApplied(usage: any) { + const cachedTokens = usage?.prompt_tokens_details?.cached_tokens ?? 0; + const promptTokens = usage?.prompt_tokens ?? 0; + const uncachedTokens = promptTokens - cachedTokens; + const inputCost = usage?.cost_usd_input; + const cachedInputCost = usage?.cost_usd_cached_input; + if ( + typeof inputCost !== "number" || + typeof cachedInputCost !== "number" || + cachedTokens === 0 || + uncachedTokens === 0 + ) { + // Without both cached and uncached tokens we can't compare per-token + // rates. Skip rather than fail — the test that primes the cache will + // still verify cached_tokens > 0 separately. + return; + } + const uncachedPerToken = inputCost / uncachedTokens; + const cachedPerToken = cachedInputCost / cachedTokens; + expect( + cachedPerToken, + `expected per-token cached cost (${cachedPerToken}) to be less than per-token uncached cost (${uncachedPerToken})`, + ).toBeLessThan(uncachedPerToken); +} + +async function readSseChunks(stream: ReadableStream | null) { + if (!stream) { + return [] as any[]; + } + const reader = stream.getReader(); + const chunks: any[] = []; + let buffer = ""; + const decoder = new TextDecoder(); + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + for (const line of lines) { + if (!line.startsWith("data: ")) { + continue; + } + const payload = line.slice(6).trim(); + if (!payload || payload === "[DONE]") { + continue; + } + try { + chunks.push(JSON.parse(payload)); + } catch { + // ignore non-JSON keepalives + } + } + } + return chunks; +} + +describe("e2e native /v1/messages cache", getConcurrentTestOptions(), () => { + beforeAll(beforeAllHook); + beforeEach(beforeEachHook); + + test("empty", () => { + expect(true).toBe(true); + }); + + (hasAnthropicKey ? test : test.skip)( + "native messages forwards explicit cache_control and surfaces cache token usage", + getTestOptions(), + async () => { + const longText = buildLongSystemPrompt(); + const body = { + model: "anthropic/claude-haiku-4-5", + max_tokens: 50, + system: [ + { + type: "text" as const, + text: longText, + cache_control: { type: "ephemeral" as const }, + }, + ], + messages: [ + { + role: "user" as const, + content: "Just reply OK.", + }, + ], + }; + + const send = async () => { + const requestId = generateTestRequestId(); + const res = await app.request("/v1/messages", { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-request-id": requestId, + Authorization: `Bearer real-token`, + }, + body: JSON.stringify(body), + }); + const json = await res.json(); + if (logMode) { + console.log( + "native /v1/messages", + requestId, + "status", + res.status, + "usage", + JSON.stringify(json.usage), + ); + } + return { status: res.status, json }; + }; + + // Prime the cache (first call may write, second should read). + const first = await send(); + expect(first.status).toBe(200); + expect(first.json.usage).toBeDefined(); + expect(typeof first.json.usage.input_tokens).toBe("number"); + expect(typeof first.json.usage.output_tokens).toBe("number"); + + // Retry the second call until Anthropic reports a cache read, to + // avoid flakiness from cache-write propagation latency. + const second = await sendUntilCacheRead(send); + expect(second.status).toBe(200); + expect( + second.json.usage.cache_read_input_tokens, + `expected cache_read_input_tokens > 0 after ${second.attempts} attempts`, + ).toBeGreaterThan(0); + + // Sanity: input_tokens should be the *non-cached* input tokens, not + // the total. The cached portion lives in cache_read_input_tokens. + expect(second.json.usage.input_tokens).toBeLessThan( + second.json.usage.cache_read_input_tokens, + ); + + assertCacheDiscountApplied(second.json.usage); + }, + ); + + (hasAnthropicKey ? test : test.skip)( + "openai-compat /v1/chat/completions surfaces cached tokens for anthropic", + getTestOptions(), + async () => { + const longText = buildLongSystemPrompt(); + const body = { + model: "anthropic/claude-haiku-4-5", + messages: [ + { role: "system", content: longText }, + { role: "user", content: "Just reply OK." }, + ], + }; + + const send = async () => { + const requestId = generateTestRequestId(); + const res = await app.request("/v1/chat/completions", { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-request-id": requestId, + Authorization: `Bearer real-token`, + }, + body: JSON.stringify(body), + }); + const json = await res.json(); + if (logMode) { + console.log( + "openai-compat", + requestId, + "status", + res.status, + "usage", + JSON.stringify(json.usage), + ); + } + return { status: res.status, json }; + }; + + const first = await send(); + expect(first.status).toBe(200); + + const second = await sendUntilCacheRead(send); + expect(second.status).toBe(200); + expect(second.json.usage.prompt_tokens_details).toBeDefined(); + expect( + second.json.usage.prompt_tokens_details.cached_tokens, + `expected cached_tokens > 0 after ${second.attempts} attempts`, + ).toBeGreaterThan(0); + + assertCacheDiscountApplied(second.json.usage); + }, + ); + + // Bedrock pass-through: same surface as the openai-compat anthropic test + // above, but routed through AWS Bedrock so we exercise the cachePoint path + // in prepare-request-body and the bedrock streaming usage extraction. + (hasBedrockKey ? test : test.skip)( + "openai-compat /v1/chat/completions surfaces cached tokens for bedrock", + getTestOptions(), + async () => { + const longText = buildLongSystemPrompt(); + const body = { + model: "aws-bedrock/claude-haiku-4-5", + messages: [ + { role: "system", content: longText }, + { role: "user", content: "Just reply OK." }, + ], + }; + + const send = async () => { + const requestId = generateTestRequestId(); + const res = await app.request("/v1/chat/completions", { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-request-id": requestId, + Authorization: `Bearer real-token`, + }, + body: JSON.stringify(body), + }); + const json = await res.json(); + if (logMode) { + console.log( + "openai-compat bedrock", + requestId, + "status", + res.status, + "usage", + JSON.stringify(json.usage), + ); + } + return { status: res.status, json }; + }; + + const first = await send(); + expect(first.status).toBe(200); + + const second = await sendUntilCacheRead(send); + expect(second.status).toBe(200); + expect(second.json.usage.prompt_tokens_details).toBeDefined(); + expect( + second.json.usage.prompt_tokens_details.cached_tokens, + `expected cached_tokens > 0 after ${second.attempts} attempts`, + ).toBeGreaterThan(0); + + assertCacheDiscountApplied(second.json.usage); + }, + ); + + // Streaming Anthropic: verifies normalizeAnthropicUsage in + // transform-streaming-to-openai surfaces cache token usage in streamed + // chunks. Without this, billing/observability for streaming clients is + // silently broken on Anthropic prompt cache reads. + (hasAnthropicKey ? test : test.skip)( + "streaming /v1/chat/completions surfaces cached tokens for anthropic", + getTestOptions(), + async () => { + const longText = buildLongSystemPrompt(); + const body = { + model: "anthropic/claude-haiku-4-5", + stream: true, + messages: [ + { role: "system", content: longText }, + { role: "user", content: "Just reply OK." }, + ], + }; + + const send = async () => { + const requestId = generateTestRequestId(); + const res = await app.request("/v1/chat/completions", { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-request-id": requestId, + Authorization: `Bearer real-token`, + }, + body: JSON.stringify(body), + }); + if (res.status !== 200) { + return { status: res.status, json: await res.json() }; + } + const chunks = await readSseChunks(res.body); + let cachedTokens = 0; + let usageChunk: any = null; + for (const chunk of chunks) { + const cached = + chunk?.usage?.prompt_tokens_details?.cached_tokens ?? 0; + if (cached > cachedTokens) { + cachedTokens = cached; + } + if (chunk?.usage) { + usageChunk = chunk; + } + } + if (logMode) { + console.log( + "streaming anthropic cache", + requestId, + "final usage", + JSON.stringify(usageChunk?.usage), + ); + } + // Synthesize a json shape with .usage so the retry helper sees it. + return { + status: res.status, + json: { + usage: { + prompt_tokens_details: { cached_tokens: cachedTokens }, + ...usageChunk?.usage, + }, + }, + }; + }; + + const first = await send(); + expect(first.status).toBe(200); + + const second = await sendUntilCacheRead(send); + expect(second.status).toBe(200); + expect( + second.json.usage.prompt_tokens_details.cached_tokens, + `expected streaming cached_tokens > 0 after ${second.attempts} attempts`, + ).toBeGreaterThan(0); + }, + ); + + // Streaming Bedrock: same as above but routed through Bedrock to exercise + // extract-token-usage's bedrock branch that surfaces cacheWriteTokens / + // cacheReadTokens through the streaming pipeline. + (hasBedrockKey ? test : test.skip)( + "streaming /v1/chat/completions surfaces cached tokens for bedrock", + getTestOptions(), + async () => { + const longText = buildLongSystemPrompt(); + const body = { + model: "aws-bedrock/claude-haiku-4-5", + stream: true, + messages: [ + { role: "system", content: longText }, + { role: "user", content: "Just reply OK." }, + ], + }; + + const send = async () => { + const requestId = generateTestRequestId(); + const res = await app.request("/v1/chat/completions", { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-request-id": requestId, + Authorization: `Bearer real-token`, + }, + body: JSON.stringify(body), + }); + if (res.status !== 200) { + return { status: res.status, json: await res.json() }; + } + const chunks = await readSseChunks(res.body); + let cachedTokens = 0; + let usageChunk: any = null; + for (const chunk of chunks) { + const cached = + chunk?.usage?.prompt_tokens_details?.cached_tokens ?? 0; + if (cached > cachedTokens) { + cachedTokens = cached; + } + if (chunk?.usage) { + usageChunk = chunk; + } + } + if (logMode) { + console.log( + "streaming bedrock cache", + requestId, + "final usage", + JSON.stringify(usageChunk?.usage), + ); + } + return { + status: res.status, + json: { + usage: { + prompt_tokens_details: { cached_tokens: cachedTokens }, + ...usageChunk?.usage, + }, + }, + }; + }; + + const first = await send(); + expect(first.status).toBe(200); + + const second = await sendUntilCacheRead(send); + expect(second.status).toBe(200); + expect( + second.json.usage.prompt_tokens_details.cached_tokens, + `expected streaming cached_tokens > 0 after ${second.attempts} attempts`, + ).toBeGreaterThan(0); + }, + ); + + // Explicit cache_control on /v1/chat/completions: exercises the new + // completions schema field that lets clients pass per-text-block + // cache_control markers (Anthropic-style) through the OpenAI-compat + // endpoint. Without this path, callers can't opt into caching from a + // shorter-than-heuristic prompt or override the heuristic for placement. + (hasAnthropicKey ? test : test.skip)( + "openai-compat /v1/chat/completions honors explicit cache_control", + getTestOptions(), + async () => { + const longText = buildLongSystemPrompt(); + const body = { + model: "anthropic/claude-haiku-4-5", + messages: [ + { + role: "system", + content: [ + { + type: "text" as const, + text: longText, + cache_control: { type: "ephemeral" as const }, + }, + ], + }, + { role: "user", content: "Just reply OK." }, + ], + }; + + const send = async () => { + const requestId = generateTestRequestId(); + const res = await app.request("/v1/chat/completions", { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-request-id": requestId, + Authorization: `Bearer real-token`, + }, + body: JSON.stringify(body), + }); + const json = await res.json(); + if (logMode) { + console.log( + "explicit cache_control", + requestId, + "status", + res.status, + "usage", + JSON.stringify(json.usage), + ); + } + return { status: res.status, json }; + }; + + const first = await send(); + expect(first.status).toBe(200); + expect(first.json.usage.prompt_tokens_details).toBeDefined(); + + const second = await sendUntilCacheRead(send); + expect(second.status).toBe(200); + expect( + second.json.usage.prompt_tokens_details.cached_tokens, + `expected cached_tokens > 0 after ${second.attempts} attempts`, + ).toBeGreaterThan(0); + }, + ); +}); diff --git a/packages/actions/src/prepare-request-body.spec.ts b/packages/actions/src/prepare-request-body.spec.ts index 9b3f1d6eda..43303edf1c 100644 --- a/packages/actions/src/prepare-request-body.spec.ts +++ b/packages/actions/src/prepare-request-body.spec.ts @@ -856,7 +856,8 @@ describe("prepareRequestBody - AWS Bedrock", () => { content: [{ text: "What is the weather and time in Berlin?" }], }); expect(requestBody.messages[1].role).toBe("assistant"); - expect(requestBody.messages[1].content).toHaveLength(2); + // 2 toolUse blocks + 1 turn-boundary cachePoint + expect(requestBody.messages[1].content).toHaveLength(3); expect(requestBody.messages[1].content[0]).toEqual({ toolUse: { toolUseId: "tool_1", @@ -871,6 +872,9 @@ describe("prepareRequestBody - AWS Bedrock", () => { input: { city: "Berlin" }, }, }); + expect(requestBody.messages[1].content[2]).toEqual({ + cachePoint: { type: "default" }, + }); expect(requestBody.messages[2]).toEqual({ role: "user", content: [ diff --git a/packages/actions/src/prepare-request-body.ts b/packages/actions/src/prepare-request-body.ts index 723aa4e0c7..6b5962cf58 100644 --- a/packages/actions/src/prepare-request-body.ts +++ b/packages/actions/src/prepare-request-body.ts @@ -5,6 +5,7 @@ import { type ProviderId, type BaseMessage, type FunctionParameter, + isTextContent, type OpenAIFunctionToolInput, type OpenAIRequestBody, type OpenAIResponsesRequestBody, @@ -772,6 +773,38 @@ export async function prepareRequestBody( processedMessages = transformMessagesForNoSystemRole(messages); } + // Strip Anthropic-style cache_control markers from text content parts when + // the resolved provider doesn't natively understand them. The Anthropic and + // AWS Bedrock branches below transform/forward cache_control on their own; + // every other provider receives the raw `processedMessages` and would + // otherwise pass an unknown field through to OpenAI/Google/etc., risking a + // 400 from strict providers and confusing logs from lenient ones. + const providerHandlesCacheControl = + usedProvider === "anthropic" || usedProvider === "aws-bedrock"; + if (!providerHandlesCacheControl) { + processedMessages = processedMessages.map((m) => { + if (!Array.isArray(m.content)) { + return m; + } + let mutated = false; + const newContent = m.content.map((part) => { + const asRecord = part as unknown as Record; + if ( + asRecord && + typeof asRecord === "object" && + asRecord.type === "text" && + asRecord.cache_control !== undefined + ) { + mutated = true; + const { cache_control: _ignored, ...rest } = asRecord; + return rest as unknown as typeof part; + } + return part; + }); + return mutated ? { ...m, content: newContent } : m; + }); + } + // Start with a base structure that can be modified for each provider const requestBody: any = { model: usedModel, @@ -1134,41 +1167,84 @@ export async function prepareRequestBody( cache_control?: { type: "ephemeral" }; }> = []; - for (const sysMsg of systemMessages) { - let text: string; - if (typeof sysMsg.content === "string") { - text = sysMsg.content; - } else if (Array.isArray(sysMsg.content)) { - // Concatenate text from array content - text = sysMsg.content - .filter((c) => c.type === "text" && "text" in c) - .map((c) => (c as { type: "text"; text: string }).text) - .join(""); - } else { - continue; + // Detect whether any text block in the incoming system messages has + // a caller-supplied cache_control marker. If so, we preserve the + // per-block structure so we can forward markers verbatim. Otherwise + // we fall back to the legacy behavior of concatenating each system + // message's text into a single block (and applying the length-based + // heuristic per concatenated block). + const callerSetCacheControl = systemMessages.some((sysMsg) => { + if (!Array.isArray(sysMsg.content)) { + return false; } + return sysMsg.content.some( + (c) => isTextContent(c) && !!c.cache_control, + ); + }); - if (!text || text.trim() === "") { - continue; + if (callerSetCacheControl) { + for (const sysMsg of systemMessages) { + if (typeof sysMsg.content === "string") { + if (!sysMsg.content.trim()) { + continue; + } + systemContent.push({ type: "text", text: sysMsg.content }); + } else if (Array.isArray(sysMsg.content)) { + for (const part of sysMsg.content) { + if (!isTextContent(part) || !part.text || !part.text.trim()) { + continue; + } + const explicit = part.cache_control; + if (explicit) { + if (systemCacheControlCount < maxCacheControlBlocks) { + systemCacheControlCount++; + systemContent.push({ + type: "text", + text: part.text, + cache_control: explicit, + }); + } else { + systemContent.push({ type: "text", text: part.text }); + } + } else { + systemContent.push({ type: "text", text: part.text }); + } + } + } } + } else { + for (const sysMsg of systemMessages) { + let text: string; + if (typeof sysMsg.content === "string") { + text = sysMsg.content; + } else if (Array.isArray(sysMsg.content)) { + // Concatenate text from array content (legacy behavior). + text = sysMsg.content + .filter((c) => c.type === "text" && "text" in c) + .map((c) => (c as { type: "text"; text: string }).text) + .join(""); + } else { + continue; + } - // Add cache_control for text blocks exceeding the model's minimum cacheable threshold - const shouldCache = - text.length >= minCacheableChars && - systemCacheControlCount < maxCacheControlBlocks; - - if (shouldCache) { - systemCacheControlCount++; - systemContent.push({ - type: "text", - text, - cache_control: { type: "ephemeral" }, - }); - } else { - systemContent.push({ - type: "text", - text, - }); + if (!text || text.trim() === "") { + continue; + } + + const shouldCache = + text.length >= minCacheableChars && + systemCacheControlCount < maxCacheControlBlocks; + + if (shouldCache) { + systemCacheControlCount++; + systemContent.push({ + type: "text", + text, + cache_control: { type: "ephemeral" }, + }); + } else { + systemContent.push({ type: "text", text }); + } } } @@ -1322,39 +1398,62 @@ export async function prepareRequestBody( (m) => m.role !== "system", ); - // Build the system field with cachePoint for long prompts - // AWS Bedrock uses "cachePoint" (not "cacheControl") as a SEPARATE content block after the text block + // Build the system field with cachePoint for long prompts. + // AWS Bedrock uses "cachePoint" (not "cacheControl") as a SEPARATE + // content block after the text block. Honor caller-supplied + // cache_control markers (Anthropic format) by mapping them to + // cachePoint, and fall back to a length heuristic when nothing was + // explicitly opted in. if (bedrockSystemMessages.length > 0) { const systemContent: Array< { text: string } | { cachePoint: { type: "default" } } > = []; + const collectedBedrockBlocks: Array<{ + text: string; + hasExplicitCacheControl: boolean; + }> = []; for (const sysMsg of bedrockSystemMessages) { - let text: string; if (typeof sysMsg.content === "string") { - text = sysMsg.content; + if (sysMsg.content.trim()) { + collectedBedrockBlocks.push({ + text: sysMsg.content, + hasExplicitCacheControl: false, + }); + } } else if (Array.isArray(sysMsg.content)) { - text = sysMsg.content - .filter((c: any) => c.type === "text" && "text" in c) - .map((c: any) => c.text) - .join(""); - } else { - continue; + for (const part of sysMsg.content as any[]) { + if (part.type === "text" && part.text && part.text.trim()) { + collectedBedrockBlocks.push({ + text: part.text, + hasExplicitCacheControl: !!part.cache_control, + }); + } + } } + } + + const callerSetBedrockCacheControl = collectedBedrockBlocks.some( + (b) => b.hasExplicitCacheControl, + ); + + for (const block of collectedBedrockBlocks) { + systemContent.push({ text: block.text }); - if (!text || text.trim() === "") { + if (block.hasExplicitCacheControl) { + if (bedrockCacheControlCount < bedrockMaxCacheControlBlocks) { + bedrockCacheControlCount++; + systemContent.push({ cachePoint: { type: "default" } }); + } continue; } - // Add text block first - systemContent.push({ text }); - - // Add cachePoint as separate block for long text (model-specific threshold) - const shouldCache = - text.length >= bedrockMinCacheableChars && + const shouldHeuristicCache = + !callerSetBedrockCacheControl && + block.text.length >= bedrockMinCacheableChars && bedrockCacheControlCount < bedrockMaxCacheControlBlocks; - if (shouldCache) { + if (shouldHeuristicCache) { bedrockCacheControlCount++; systemContent.push({ cachePoint: { type: "default" } }); } @@ -1472,16 +1571,26 @@ export async function prepareRequestBody( text: part.text, }); - // Add cachePoint as separate block for long text parts (model-specific threshold) - const shouldCache = - part.text.length >= bedrockMinCacheableChars && - bedrockCacheControlCount < bedrockMaxCacheControlBlocks; - - if (shouldCache) { - bedrockCacheControlCount++; - bedrockMessage.content.push({ - cachePoint: { type: "default" }, - }); + if (part.cache_control) { + if (bedrockCacheControlCount < bedrockMaxCacheControlBlocks) { + bedrockCacheControlCount++; + bedrockMessage.content.push({ + cachePoint: { type: "default" }, + }); + } + } else { + // Add cachePoint as separate block for long text parts + // (model-specific threshold) + const shouldCache = + part.text.length >= bedrockMinCacheableChars && + bedrockCacheControlCount < bedrockMaxCacheControlBlocks; + + if (shouldCache) { + bedrockCacheControlCount++; + bedrockMessage.content.push({ + cachePoint: { type: "default" }, + }); + } } } } else if (part.type === "image_url") { @@ -1496,6 +1605,44 @@ export async function prepareRequestBody( } flushPendingToolResults(); + + // Turn-boundary caching: place a cachePoint after the last content + // block of the message just before the final user turn. This caches + // the entire conversation prefix (all prior turns) so only the + // newest user message is uncached. This mirrors the Anthropic + // turn-boundary logic in transformAnthropicMessages. + if (bedrockMessages.length >= 3) { + let lastUserIdx = -1; + for (let i = bedrockMessages.length - 1; i >= 0; i--) { + if (bedrockMessages[i].role === "user") { + lastUserIdx = i; + break; + } + } + + const boundaryIdx = lastUserIdx > 0 ? lastUserIdx - 1 : -1; + if ( + boundaryIdx >= 0 && + bedrockCacheControlCount < bedrockMaxCacheControlBlocks + ) { + const boundaryMsg = bedrockMessages[boundaryIdx]; + if ( + Array.isArray(boundaryMsg.content) && + boundaryMsg.content.length > 0 + ) { + const lastBlock = + boundaryMsg.content[boundaryMsg.content.length - 1]; + // Only add if the last block isn't already a cachePoint. + if (!lastBlock.cachePoint) { + boundaryMsg.content.push({ + cachePoint: { type: "default" }, + }); + bedrockCacheControlCount++; + } + } + } + } + requestBody.messages = bedrockMessages; // Transform tools from OpenAI format to Bedrock format diff --git a/packages/actions/src/transform-anthropic-messages.ts b/packages/actions/src/transform-anthropic-messages.ts index 09dd3d0da5..8a04020834 100644 --- a/packages/actions/src/transform-anthropic-messages.ts +++ b/packages/actions/src/transform-anthropic-messages.ts @@ -287,5 +287,51 @@ export async function transformAnthropicMessages( role: anthropicRole, }); } + // Turn-boundary caching: in a multi-turn conversation the entire prefix + // (everything before the last user message) is identical between requests. + // Placing cache_control on the last content block of the message just before + // the final user turn lets Anthropic cache the entire prefix, dramatically + // improving the cache hit ratio for long conversations (e.g. Claude Code + // sessions with 100k+ token context). + if (shouldApplyCacheControl && results.length >= 3) { + // Find the last user message index — that's the "new" turn. + let lastUserIdx = -1; + for (let i = results.length - 1; i >= 0; i--) { + if (results[i]!.role === "user") { + lastUserIdx = i; + break; + } + } + + // The turn boundary is the message right before the last user message. + const boundaryIdx = lastUserIdx > 0 ? lastUserIdx - 1 : -1; + if (boundaryIdx >= 0 && cacheControlCount < maxCacheControlBlocks) { + const boundaryMsg = results[boundaryIdx]!; + if ( + Array.isArray(boundaryMsg.content) && + boundaryMsg.content.length > 0 + ) { + // Find the last text content block in the boundary message. + let lastTextIdx = -1; + for (let i = boundaryMsg.content.length - 1; i >= 0; i--) { + const part = boundaryMsg.content[i]; + if (part && isTextContent(part as MessageContent)) { + lastTextIdx = i; + break; + } + } + if (lastTextIdx >= 0) { + const target = boundaryMsg.content[lastTextIdx] as TextContent; + if (!target.cache_control) { + (boundaryMsg.content[lastTextIdx] as TextContent).cache_control = { + type: "ephemeral", + }; + cacheControlCount++; + } + } + } + } + } + return results; }