diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index bc7141f008..2eade0376a 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -506,6 +506,7 @@ server_notification_definitions! { TurnStarted => "turn/started" (v2::TurnStartedNotification), TurnCompleted => "turn/completed" (v2::TurnCompletedNotification), TurnDiffUpdated => "turn/diff/updated" (v2::TurnDiffUpdatedNotification), + TurnPlanUpdated => "turn/plan/updated" (v2::TurnPlanUpdatedNotification), ItemStarted => "item/started" (v2::ItemStartedNotification), ItemCompleted => "item/completed" (v2::ItemCompletedNotification), AgentMessageDelta => "item/agentMessage/delta" (v2::AgentMessageDeltaNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 3ee502ebd5..0192e1e82f 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -11,6 +11,8 @@ use codex_protocol::items::AgentMessageContent as CoreAgentMessageContent; use codex_protocol::items::TurnItem as CoreTurnItem; use codex_protocol::models::ResponseItem; use codex_protocol::parse_command::ParsedCommand as CoreParsedCommand; +use codex_protocol::plan_tool::PlanItemArg as CorePlanItemArg; +use codex_protocol::plan_tool::StepStatus as CorePlanStepStatus; use codex_protocol::protocol::CodexErrorInfo as CoreCodexErrorInfo; use codex_protocol::protocol::CreditsSnapshot as CoreCreditsSnapshot; use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot; @@ -1210,6 +1212,51 @@ pub struct TurnDiffUpdatedNotification { pub diff: String, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct TurnPlanUpdatedNotification { + pub turn_id: String, + pub explanation: Option, + pub plan: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct TurnPlanStep { + pub step: String, + pub status: TurnPlanStepStatus, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub enum TurnPlanStepStatus { + Pending, + InProgress, + Completed, +} + +impl From for TurnPlanStep { + fn from(value: CorePlanItemArg) -> Self { + Self { + step: value.step, + status: value.status.into(), + } + } +} + +impl From for TurnPlanStepStatus { + fn from(value: CorePlanStepStatus) -> Self { + match value { + CorePlanStepStatus::Pending => Self::Pending, + CorePlanStepStatus::InProgress => Self::InProgress, + CorePlanStepStatus::Completed => Self::Completed, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 176cd27339..59b67ab899 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -244,6 +244,7 @@ The app-server streams JSON-RPC notifications while a turn is running. Each turn - `turn/started` — `{ turn }` with the turn id, empty `items`, and `status: "inProgress"`. - `turn/completed` — `{ turn }` where `turn.status` is `completed`, `interrupted`, or `failed`; failures carry `{ error: { message, codexErrorInfo? } }`. +- `turn/plan/updated` — `{ turnId, explanation?, plan }` whenever the agent shares or changes its plan; each `plan` entry is `{ step, status }` with `status` in `pending`, `inProgress`, or `completed`. Today both notifications carry an empty `items` array even when item events were streamed; rely on `item/*` notifications for the canonical item list until this is fixed. diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 4b4e40d0c6..3789843614 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -43,6 +43,8 @@ use codex_app_server_protocol::TurnCompletedNotification; use codex_app_server_protocol::TurnDiffUpdatedNotification; use codex_app_server_protocol::TurnError; use codex_app_server_protocol::TurnInterruptResponse; +use codex_app_server_protocol::TurnPlanStep; +use codex_app_server_protocol::TurnPlanUpdatedNotification; use codex_app_server_protocol::TurnStatus; use codex_core::CodexConversation; use codex_core::parse_command::shlex_join; @@ -60,6 +62,7 @@ use codex_core::protocol::TokenCountEvent; use codex_core::protocol::TurnDiffEvent; use codex_core::review_format::format_review_findings_block; use codex_protocol::ConversationId; +use codex_protocol::plan_tool::UpdatePlanArgs; use codex_protocol::protocol::ReviewOutputEvent; use std::collections::HashMap; use std::convert::TryFrom; @@ -570,6 +573,15 @@ pub(crate) async fn apply_bespoke_event_handling( ) .await; } + EventMsg::PlanUpdate(plan_update_event) => { + handle_turn_plan_update( + &event_turn_id, + plan_update_event, + api_version, + outgoing.as_ref(), + ) + .await; + } _ => {} } @@ -592,6 +604,28 @@ async fn handle_turn_diff( } } +async fn handle_turn_plan_update( + event_turn_id: &str, + plan_update_event: UpdatePlanArgs, + api_version: ApiVersion, + outgoing: &OutgoingMessageSender, +) { + if let ApiVersion::V2 = api_version { + let notification = TurnPlanUpdatedNotification { + turn_id: event_turn_id.to_string(), + explanation: plan_update_event.explanation, + plan: plan_update_event + .plan + .into_iter() + .map(TurnPlanStep::from) + .collect(), + }; + outgoing + .send_server_notification(ServerNotification::TurnPlanUpdated(notification)) + .await; + } +} + async fn emit_turn_completed_with_status( conversation_id: ConversationId, event_turn_id: String, @@ -1108,12 +1142,15 @@ mod tests { use anyhow::Result; use anyhow::anyhow; use anyhow::bail; + use codex_app_server_protocol::TurnPlanStepStatus; use codex_core::protocol::CreditsSnapshot; use codex_core::protocol::McpInvocation; use codex_core::protocol::RateLimitSnapshot; use codex_core::protocol::RateLimitWindow; use codex_core::protocol::TokenUsage; use codex_core::protocol::TokenUsageInfo; + use codex_protocol::plan_tool::PlanItemArg; + use codex_protocol::plan_tool::StepStatus; use mcp_types::CallToolResult; use mcp_types::ContentBlock; use mcp_types::TextContent; @@ -1273,6 +1310,46 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> { + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = OutgoingMessageSender::new(tx); + let update = UpdatePlanArgs { + explanation: Some("need plan".to_string()), + plan: vec![ + PlanItemArg { + step: "first".to_string(), + status: StepStatus::Pending, + }, + PlanItemArg { + step: "second".to_string(), + status: StepStatus::Completed, + }, + ], + }; + + handle_turn_plan_update("turn-123", update, ApiVersion::V2, &outgoing).await; + + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send one notification"))?; + match msg { + OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => { + assert_eq!(n.turn_id, "turn-123"); + assert_eq!(n.explanation.as_deref(), Some("need plan")); + assert_eq!(n.plan.len(), 2); + assert_eq!(n.plan[0].step, "first"); + assert_eq!(n.plan[0].status, TurnPlanStepStatus::Pending); + assert_eq!(n.plan[1].step, "second"); + assert_eq!(n.plan[1].status, TurnPlanStepStatus::Completed); + } + other => bail!("unexpected message: {other:?}"), + } + assert!(rx.try_recv().is_err(), "no extra messages expected"); + Ok(()) + } + #[tokio::test] async fn test_handle_token_count_event_emits_usage_and_rate_limits() -> Result<()> { let conversation_id = ConversationId::new();