diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index e3c5de5762..3b43560d37 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -12,10 +12,11 @@ use crate::compact::run_inline_auto_compact_task; use crate::compact::should_use_remote_compact_task; use crate::compact_remote::run_inline_remote_auto_compact_task; use crate::features::Feature; +#[cfg(test)] use crate::function_tool::FunctionCallError; use crate::parse_command::parse_command; use crate::parse_turn_item; -use crate::response_processing::process_items; +mod turn_event; use crate::terminal; use crate::truncate::TruncationPolicy; use crate::user_notification::UserNotifier; @@ -38,7 +39,7 @@ use codex_protocol::protocol::TurnContextItem; use codex_rmcp_client::ElicitationResponse; use futures::future::BoxFuture; use futures::prelude::*; -use futures::stream::FuturesOrdered; +use futures::stream::FuturesUnordered; use mcp_types::CallToolResult; use mcp_types::ListResourceTemplatesRequestParams; use mcp_types::ListResourceTemplatesResult; @@ -128,6 +129,7 @@ use codex_otel::otel_event_manager::OtelEventManager; use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::models::ContentItem; +#[cfg(test)] use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; @@ -136,6 +138,8 @@ use codex_protocol::protocol::InitialHistory; use codex_protocol::user_input::UserInput; use codex_utils_readiness::Readiness; use codex_utils_readiness::ReadinessFlag; +use turn_event::handle_non_tool_response_item; +use turn_event::handle_output_item_done; /// The high-level interface to the Codex system. /// It operates as a queue pair where you send submissions and receive events. @@ -1967,15 +1971,16 @@ pub(crate) async fn run_task( .await { Ok(turn_output) => { - let processed_items = turn_output; + let TurnRunResult { + responses, + last_agent_message: turn_last_agent_message, + } = turn_output; let limit = turn_context .client .get_auto_compact_token_limit() .unwrap_or(i64::MAX); let total_usage_tokens = sess.get_total_token_usage().await; let token_limit_reached = total_usage_tokens >= limit; - let (responses, items_to_record_in_conversation_history) = - process_items(processed_items, &sess, &turn_context).await; // as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop. if token_limit_reached { @@ -1989,9 +1994,7 @@ pub(crate) async fn run_task( } if responses.is_empty() { - last_agent_message = get_last_assistant_message_from_turn( - &items_to_record_in_conversation_history, - ); + last_agent_message = turn_last_agent_message; sess.notifier() .notify(&UserNotification::AgentTurnComplete { thread_id: sess.conversation_id.to_string(), @@ -2004,10 +2007,7 @@ pub(crate) async fn run_task( } continue; } - Err(CodexErr::TurnAborted { - dangling_artifacts: processed_items, - }) => { - let _ = process_items(processed_items, &sess, &turn_context).await; + Err(CodexErr::TurnAborted) => { // Aborted turn is reported via a different event. break; } @@ -2030,7 +2030,7 @@ async fn run_turn( turn_diff_tracker: SharedTurnDiffTracker, input: Vec, cancellation_token: CancellationToken, -) -> CodexResult> { +) -> CodexResult { let mcp_tools = sess .services .mcp_connection_manager @@ -2095,12 +2095,8 @@ async fn run_turn( .await { Ok(output) => return Ok(output), - Err(CodexErr::TurnAborted { - dangling_artifacts: processed_items, - }) => { - return Err(CodexErr::TurnAborted { - dangling_artifacts: processed_items, - }); + Err(CodexErr::TurnAborted) => { + return Err(CodexErr::TurnAborted); } Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted), Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)), @@ -2151,14 +2147,10 @@ async fn run_turn( } } -/// When the model is prompted, it returns a stream of events. Some of these -/// events map to a `ResponseItem`. A `ResponseItem` may need to be -/// "handled" such that it produces a `ResponseInputItem` that needs to be -/// sent back to the model on the next turn. #[derive(Debug)] -pub struct ProcessedResponseItem { - pub item: ResponseItem, - pub response: Option, +struct TurnRunResult { + responses: Vec, + last_agent_message: Option, } #[allow(clippy::too_many_arguments)] @@ -2169,7 +2161,7 @@ async fn try_run_turn( turn_diff_tracker: SharedTurnDiffTracker, prompt: &Prompt, cancellation_token: CancellationToken, -) -> CodexResult> { +) -> CodexResult { let rollout_item = RolloutItem::TurnContext(TurnContextItem { cwd: turn_context.cwd.clone(), approval_policy: turn_context.approval_policy, @@ -2187,235 +2179,168 @@ async fn try_run_turn( .or_cancel(&cancellation_token) .await??; - let tool_runtime = ToolCallRuntime::new( + let tool_runtime = Arc::new(ToolCallRuntime::new( Arc::clone(&router), Arc::clone(&sess), Arc::clone(&turn_context), Arc::clone(&turn_diff_tracker), - ); - let mut output: FuturesOrdered>> = - FuturesOrdered::new(); - + )); + let mut in_flight: FuturesUnordered>> = + FuturesUnordered::new(); + let mut responses: Vec = Vec::new(); + let mut last_agent_message: Option = None; let mut active_item: Option = None; loop { - // Poll the next item from the model stream. We must inspect *both* Ok and Err - // cases so that transient stream failures (e.g., dropped SSE connection before - // `response.completed`) bubble up and trigger the caller's retry logic. - let event = match stream.next().or_cancel(&cancellation_token).await { - Ok(event) => event, - Err(codex_async_utils::CancelErr::Cancelled) => { - let processed_items = output.try_collect().await?; - return Err(CodexErr::TurnAborted { - dangling_artifacts: processed_items, - }); - } - }; - - let event = match event { - Some(res) => res?, - None => { - return Err(CodexErr::Stream( - "stream closed before response.completed".into(), - None, - )); + tokio::select! { + Some(res) = in_flight.next(), if !in_flight.is_empty() => { + let response_input = res?; + responses.push(response_input); } - }; + event = stream.next().or_cancel(&cancellation_token) => { + let event = match event { + Ok(event) => event, + Err(codex_async_utils::CancelErr::Cancelled) => { + while let Some(res) = in_flight.next().await { + let _ = res?; + } + return Err(CodexErr::TurnAborted); + } + }; - let add_completed = &mut |response_item: ProcessedResponseItem| { - output.push_back(future::ready(Ok(response_item)).boxed()); - }; + let event = match event { + Some(res) => res?, + None => { + return Err(CodexErr::Stream( + "stream closed before response.completed".into(), + None, + )); + } + }; - match event { - ResponseEvent::Created => {} - ResponseEvent::OutputItemDone(item) => { - let previously_active_item = active_item.take(); - match ToolRouter::build_tool_call(sess.as_ref(), item.clone()).await { - Ok(Some(call)) => { - let payload_preview = call.payload.log_payload().into_owned(); - tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview); - - let response = - tool_runtime.handle_tool_call(call, cancellation_token.child_token()); - - output.push_back( - async move { - Ok(ProcessedResponseItem { - item, - response: Some(response.await?), - }) - } - .boxed(), - ); + match event { + ResponseEvent::Created => {} + ResponseEvent::OutputItemDone(item) => { + let previously_active_item = active_item.take(); + handle_output_item_done( + &sess, + &turn_context, + Arc::clone(&tool_runtime), + item, + previously_active_item, + &mut in_flight, + &mut responses, + &mut last_agent_message, + cancellation_token.child_token(), + ) + .await?; } - Ok(None) => { + ResponseEvent::OutputItemAdded(item) => { if let Some(turn_item) = handle_non_tool_response_item(&item).await { - if previously_active_item.is_none() { - sess.emit_turn_item_started(&turn_context, &turn_item).await; - } + let tracked_item = turn_item.clone(); + sess.emit_turn_item_started(&turn_context, &turn_item).await; - sess.emit_turn_item_completed(&turn_context, turn_item) - .await; + active_item = Some(tracked_item); } - - add_completed(ProcessedResponseItem { - item, - response: None, - }); } - Err(FunctionCallError::MissingLocalShellCallId) => { - let msg = "LocalShellCall without call_id or id"; - turn_context - .client - .get_otel_event_manager() - .log_tool_failed("local_shell", msg); - error!(msg); - - let response = ResponseInputItem::FunctionCallOutput { - call_id: String::new(), - output: FunctionCallOutputPayload { - content: msg.to_string(), - ..Default::default() - }, - }; - add_completed(ProcessedResponseItem { - item, - response: Some(response), - }); + ResponseEvent::RateLimits(snapshot) => { + // Update internal state with latest rate limits, but defer sending until + // token usage is available to avoid duplicate TokenCount events. + sess.update_rate_limits(&turn_context, snapshot).await; } - Err(FunctionCallError::RespondToModel(message)) - | Err(FunctionCallError::Denied(message)) => { - let response = ResponseInputItem::FunctionCallOutput { - call_id: String::new(), - output: FunctionCallOutputPayload { - content: message, - ..Default::default() - }, + ResponseEvent::Completed { + response_id: _, + token_usage, + } => { + sess.update_token_usage_info(&turn_context, token_usage.as_ref()) + .await; + while let Some(res) = in_flight.next().await { + responses.push(res?); + } + let unified_diff = { + let mut tracker = turn_diff_tracker.lock().await; + tracker.get_unified_diff() }; - add_completed(ProcessedResponseItem { - item, - response: Some(response), + if let Ok(Some(unified_diff)) = unified_diff { + let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff }); + sess.send_event(&turn_context, msg).await; + } + + return Ok(TurnRunResult { + responses, + last_agent_message, }); } - Err(FunctionCallError::Fatal(message)) => { - return Err(CodexErr::Fatal(message)); + ResponseEvent::OutputTextDelta(delta) => { + // In review child threads, suppress assistant text deltas; the + // UI will show a selection popup from the final ReviewOutput. + if let Some(active) = active_item.as_ref() { + let event = AgentMessageContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: active.id(), + delta: delta.clone(), + }; + sess.send_event(&turn_context, EventMsg::AgentMessageContentDelta(event)) + .await; + } else { + error_or_panic("OutputTextDelta without active item".to_string()); + } } - } - } - ResponseEvent::OutputItemAdded(item) => { - if let Some(turn_item) = handle_non_tool_response_item(&item).await { - let tracked_item = turn_item.clone(); - sess.emit_turn_item_started(&turn_context, &turn_item).await; - - active_item = Some(tracked_item); - } - } - ResponseEvent::RateLimits(snapshot) => { - // Update internal state with latest rate limits, but defer sending until - // token usage is available to avoid duplicate TokenCount events. - sess.update_rate_limits(&turn_context, snapshot).await; - } - ResponseEvent::Completed { - response_id: _, - token_usage, - } => { - sess.update_token_usage_info(&turn_context, token_usage.as_ref()) - .await; - let processed_items = output.try_collect().await?; - let unified_diff = { - let mut tracker = turn_diff_tracker.lock().await; - tracker.get_unified_diff() - }; - if let Ok(Some(unified_diff)) = unified_diff { - let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff }); - sess.send_event(&turn_context, msg).await; - } - - return Ok(processed_items); - } - ResponseEvent::OutputTextDelta(delta) => { - // In review child threads, suppress assistant text deltas; the - // UI will show a selection popup from the final ReviewOutput. - if let Some(active) = active_item.as_ref() { - let event = AgentMessageContentDeltaEvent { - thread_id: sess.conversation_id.to_string(), - turn_id: turn_context.sub_id.clone(), - item_id: active.id(), - delta: delta.clone(), - }; - sess.send_event(&turn_context, EventMsg::AgentMessageContentDelta(event)) - .await; - } else { - error_or_panic("OutputTextDelta without active item".to_string()); - } - } - ResponseEvent::ReasoningSummaryDelta { - delta, - summary_index, - } => { - if let Some(active) = active_item.as_ref() { - let event = ReasoningContentDeltaEvent { - thread_id: sess.conversation_id.to_string(), - turn_id: turn_context.sub_id.clone(), - item_id: active.id(), + ResponseEvent::ReasoningSummaryDelta { delta, summary_index, - }; - sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event)) - .await; - } else { - error_or_panic("ReasoningSummaryDelta without active item".to_string()); - } - } - ResponseEvent::ReasoningSummaryPartAdded { summary_index } => { - if let Some(active) = active_item.as_ref() { - let event = - EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent { - item_id: active.id(), - summary_index, - }); - sess.send_event(&turn_context, event).await; - } else { - error_or_panic("ReasoningSummaryPartAdded without active item".to_string()); - } - } - ResponseEvent::ReasoningContentDelta { - delta, - content_index, - } => { - if let Some(active) = active_item.as_ref() { - let event = ReasoningRawContentDeltaEvent { - thread_id: sess.conversation_id.to_string(), - turn_id: turn_context.sub_id.clone(), - item_id: active.id(), + } => { + if let Some(active) = active_item.as_ref() { + let event = ReasoningContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: active.id(), + delta, + summary_index, + }; + sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event)) + .await; + } else { + error_or_panic("ReasoningSummaryDelta without active item".to_string()); + } + } + ResponseEvent::ReasoningSummaryPartAdded { summary_index } => { + if let Some(active) = active_item.as_ref() { + let event = + EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent { + item_id: active.id(), + summary_index, + }); + sess.send_event(&turn_context, event).await; + } else { + error_or_panic("ReasoningSummaryPartAdded without active item".to_string()); + } + } + ResponseEvent::ReasoningContentDelta { delta, content_index, - }; - sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event)) - .await; - } else { - error_or_panic("ReasoningRawContentDelta without active item".to_string()); + } => { + if let Some(active) = active_item.as_ref() { + let event = ReasoningRawContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: active.id(), + delta, + content_index, + }; + sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event)) + .await; + } else { + error_or_panic("ReasoningRawContentDelta without active item".to_string()); + } + } } } } } } -async fn handle_non_tool_response_item(item: &ResponseItem) -> Option { - debug!(?item, "Output item"); - - match item { - ResponseItem::Message { .. } - | ResponseItem::Reasoning { .. } - | ResponseItem::WebSearchCall { .. } => parse_turn_item(item), - ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => { - debug!("unexpected tool output from stream"); - None - } - _ => None, - } -} - pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option { responses.iter().rev().find_map(|item| { if let ResponseItem::Message { role, content, .. } = item { diff --git a/codex-rs/core/src/codex/turn_event.rs b/codex-rs/core/src/codex/turn_event.rs new file mode 100644 index 0000000000..8b4524b2b5 --- /dev/null +++ b/codex-rs/core/src/codex/turn_event.rs @@ -0,0 +1,186 @@ +use std::pin::Pin; +use std::sync::Arc; + +use futures::stream::FuturesUnordered; +use tokio_util::sync::CancellationToken; + +use super::CodexErr; +use super::CodexResult; +use super::Session; +use super::ToolRouter; +use super::TurnContext; +use super::TurnItem; +use crate::function_tool::FunctionCallError; +use crate::parse_turn_item; +use codex_protocol::models::FunctionCallOutputPayload; +use codex_protocol::models::ResponseInputItem; +use codex_protocol::models::ResponseItem; +use futures::Future; +use tracing::debug; + +/// Handle a completed output item from the model stream, recording it and +/// queuing any tool execution futures. This records items immediately so +/// history and rollout stay in sync even if the turn is later cancelled. +pub(super) type InFlightFuture<'f> = + Pin> + Send + 'f>>; + +#[allow(clippy::too_many_arguments)] +pub(super) async fn handle_output_item_done( + sess: &Arc, + turn_context: &Arc, + tool_runtime: Arc, + item: ResponseItem, + previously_active_item: Option, + in_flight: &mut FuturesUnordered>, + responses: &mut Vec, + last_agent_message: &mut Option, + cancellation_token: CancellationToken, +) -> CodexResult<()> { + match ToolRouter::build_tool_call(sess.as_ref(), item.clone()).await { + Ok(Some(call)) => { + let payload_preview = call.payload.log_payload().into_owned(); + tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview); + + sess.record_conversation_items(turn_context, std::slice::from_ref(&item)) + .await; + + let sess_for_output: Arc = Arc::clone(sess); + let turn_for_output: Arc = Arc::clone(turn_context); + let tool_runtime = Arc::clone(&tool_runtime); + + in_flight.push(Box::pin(async move { + let response_input = tool_runtime + .handle_tool_call(call, cancellation_token) + .await?; + if let Some(response_item) = response_input_to_response_item(&response_input) { + sess_for_output + .record_conversation_items( + turn_for_output.as_ref(), + std::slice::from_ref(&response_item), + ) + .await; + } + Ok(response_input) + })); + } + Ok(None) => { + if let Some(turn_item) = handle_non_tool_response_item(&item).await { + if previously_active_item.is_none() { + sess.emit_turn_item_started(turn_context, &turn_item).await; + } + + sess.emit_turn_item_completed(turn_context, turn_item).await; + } + + sess.record_conversation_items(turn_context, std::slice::from_ref(&item)) + .await; + if let Some(agent_message) = last_assistant_message_from_item(&item) { + *last_agent_message = Some(agent_message); + } + } + Err(FunctionCallError::MissingLocalShellCallId) => { + let msg = "LocalShellCall without call_id or id"; + turn_context + .client + .get_otel_event_manager() + .log_tool_failed("local_shell", msg); + tracing::error!(msg); + + let response = ResponseInputItem::FunctionCallOutput { + call_id: String::new(), + output: FunctionCallOutputPayload { + content: msg.to_string(), + ..Default::default() + }, + }; + sess.record_conversation_items(turn_context, std::slice::from_ref(&item)) + .await; + if let Some(response_item) = response_input_to_response_item(&response) { + sess.record_conversation_items(turn_context, std::slice::from_ref(&response_item)) + .await; + } + responses.push(response); + } + Err(FunctionCallError::RespondToModel(message)) + | Err(FunctionCallError::Denied(message)) => { + let response = ResponseInputItem::FunctionCallOutput { + call_id: String::new(), + output: FunctionCallOutputPayload { + content: message, + ..Default::default() + }, + }; + sess.record_conversation_items(turn_context, std::slice::from_ref(&item)) + .await; + if let Some(response_item) = response_input_to_response_item(&response) { + sess.record_conversation_items(turn_context, std::slice::from_ref(&response_item)) + .await; + } + responses.push(response); + } + Err(FunctionCallError::Fatal(message)) => { + return Err(CodexErr::Fatal(message)); + } + } + + Ok(()) +} + +pub(super) async fn handle_non_tool_response_item(item: &ResponseItem) -> Option { + debug!(?item, "Output item"); + + match item { + ResponseItem::Message { .. } + | ResponseItem::Reasoning { .. } + | ResponseItem::WebSearchCall { .. } => parse_turn_item(item), + ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => { + debug!("unexpected tool output from stream"); + None + } + _ => None, + } +} + +pub(super) fn last_assistant_message_from_item(item: &ResponseItem) -> Option { + if let ResponseItem::Message { role, content, .. } = item + && role == "assistant" + { + return content.iter().rev().find_map(|ci| match ci { + codex_protocol::models::ContentItem::OutputText { text } => Some(text.clone()), + _ => None, + }); + } + None +} + +pub(super) fn response_input_to_response_item(input: &ResponseInputItem) -> Option { + match input { + ResponseInputItem::FunctionCallOutput { call_id, output } => { + Some(ResponseItem::FunctionCallOutput { + call_id: call_id.clone(), + output: output.clone(), + }) + } + ResponseInputItem::CustomToolCallOutput { call_id, output } => { + Some(ResponseItem::CustomToolCallOutput { + call_id: call_id.clone(), + output: output.clone(), + }) + } + ResponseInputItem::McpToolCallOutput { call_id, result } => { + let output = match result { + Ok(call_tool_result) => FunctionCallOutputPayload::from(call_tool_result), + Err(err) => FunctionCallOutputPayload { + content: err.clone(), + success: Some(false), + ..Default::default() + }, + }; + Some(ResponseItem::FunctionCallOutput { + call_id: call_id.clone(), + output, + }) + } + _ => None, + } +} diff --git a/codex-rs/core/src/error.rs b/codex-rs/core/src/error.rs index 9130b40e1e..77129199e5 100644 --- a/codex-rs/core/src/error.rs +++ b/codex-rs/core/src/error.rs @@ -1,4 +1,3 @@ -use crate::codex::ProcessedResponseItem; use crate::exec::ExecToolCallOutput; use crate::token_data::KnownPlan; use crate::token_data::PlanType; @@ -61,9 +60,7 @@ pub enum SandboxErr { pub enum CodexErr { // todo(aibrahim): git rid of this error carrying the dangling artifacts #[error("turn aborted. Something went wrong? Hit `/feedback` to report the issue.")] - TurnAborted { - dangling_artifacts: Vec, - }, + TurnAborted, /// Returned by ResponsesClient when the SSE stream disconnects or errors out **after** the HTTP /// handshake has succeeded but **before** it finished emitting `response.completed`. @@ -173,9 +170,7 @@ pub enum CodexErr { impl From for CodexErr { fn from(_: CancelErr) -> Self { - CodexErr::TurnAborted { - dangling_artifacts: Vec::new(), - } + CodexErr::TurnAborted } } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 7a9440eb28..fab73df9d4 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -40,7 +40,6 @@ mod message_history; mod model_provider_info; pub mod parse_command; pub mod powershell; -mod response_processing; pub mod sandboxing; mod text_encoding; pub mod token_data; diff --git a/codex-rs/core/src/response_processing.rs b/codex-rs/core/src/response_processing.rs deleted file mode 100644 index 458f82526a..0000000000 --- a/codex-rs/core/src/response_processing.rs +++ /dev/null @@ -1,70 +0,0 @@ -use crate::codex::Session; -use crate::codex::TurnContext; -use codex_protocol::models::FunctionCallOutputPayload; -use codex_protocol::models::ResponseInputItem; -use codex_protocol::models::ResponseItem; -use tracing::warn; - -/// Process streamed `ResponseItem`s from the model into the pair of: -/// - items we should record in conversation history; and -/// - `ResponseInputItem`s to send back to the model on the next turn. -pub(crate) async fn process_items( - processed_items: Vec, - sess: &Session, - turn_context: &TurnContext, -) -> (Vec, Vec) { - let mut outputs_to_record = Vec::::new(); - let mut new_inputs_to_record = Vec::::new(); - let mut responses = Vec::::new(); - for processed_response_item in processed_items { - let crate::codex::ProcessedResponseItem { item, response } = processed_response_item; - - if let Some(response) = &response { - responses.push(response.clone()); - } - - match response { - Some(ResponseInputItem::FunctionCallOutput { call_id, output }) => { - new_inputs_to_record.push(ResponseItem::FunctionCallOutput { - call_id: call_id.clone(), - output: output.clone(), - }); - } - - Some(ResponseInputItem::CustomToolCallOutput { call_id, output }) => { - new_inputs_to_record.push(ResponseItem::CustomToolCallOutput { - call_id: call_id.clone(), - output: output.clone(), - }); - } - Some(ResponseInputItem::McpToolCallOutput { call_id, result }) => { - let output = match result { - Ok(call_tool_result) => FunctionCallOutputPayload::from(&call_tool_result), - Err(err) => FunctionCallOutputPayload { - content: err.clone(), - success: Some(false), - ..Default::default() - }, - }; - new_inputs_to_record.push(ResponseItem::FunctionCallOutput { - call_id: call_id.clone(), - output, - }); - } - None => {} - _ => { - warn!("Unexpected response item: {item:?} with response: {response:?}"); - } - }; - - outputs_to_record.push(item); - } - - let all_items_to_record = [outputs_to_record, new_inputs_to_record].concat(); - // Only attempt to take the lock if there is something to record. - if !all_items_to_record.is_empty() { - sess.record_conversation_items(turn_context, &all_items_to_record) - .await; - } - (responses, all_items_to_record) -}