diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 88f42c0f6c..39a2711f22 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -472,6 +472,7 @@ impl Session { turn_context.sandbox_policy.clone(), turn_context.cwd.clone(), config.codex_linux_sandbox_exe.clone(), + None, )), }; @@ -1073,16 +1074,6 @@ impl Session { .map_err(FunctionCallError::RespondToModel) } - pub(crate) async fn run_unified_exec_request( - &self, - request: crate::unified_exec::UnifiedExecRequest<'_>, - ) -> Result { - self.services - .unified_exec_manager - .handle_request(request) - .await - } - pub async fn interrupt_task(self: &Arc) { info!("interrupt received: abort current task, if any"); self.abort_all_tasks(TurnAbortReason::Interrupted).await; @@ -2771,6 +2762,7 @@ mod tests { turn_context.sandbox_policy.clone(), turn_context.cwd.clone(), None, + None, )), }; let session = Session { @@ -2839,6 +2831,7 @@ mod tests { config.sandbox_policy.clone(), config.cwd.clone(), None, + None, )), }; let session = Arc::new(Session { diff --git a/codex-rs/core/src/exec.rs b/codex-rs/core/src/exec.rs index 22d1deeb1c..e28a202647 100644 --- a/codex-rs/core/src/exec.rs +++ b/codex-rs/core/src/exec.rs @@ -18,13 +18,14 @@ use tokio::process::Child; use crate::error::CodexErr; use crate::error::Result; use crate::error::SandboxErr; -use crate::landlock::spawn_command_under_linux_sandbox; +use crate::executor::SandboxLaunch; +use crate::executor::SandboxLaunchError; +use crate::executor::build_launch_for_sandbox; use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::ExecCommandOutputDeltaEvent; use crate::protocol::ExecOutputStream; use crate::protocol::SandboxPolicy; -use crate::seatbelt::spawn_command_under_seatbelt; use crate::spawn::StdioPolicy; use crate::spawn::spawn_child_async; @@ -87,57 +88,35 @@ pub async fn process_exec_tool_call( codex_linux_sandbox_exe: &Option, stdout_stream: Option, ) -> Result { - let start = Instant::now(); - - let timeout_duration = params.timeout_duration(); + let launch = build_launch_for_sandbox( + sandbox_type, + ¶ms.command, + sandbox_policy, + sandbox_cwd, + codex_linux_sandbox_exe.as_ref(), + ) + .map_err(CodexErr::from)?; + execute_sandbox_launch(params, launch, sandbox_type, sandbox_policy, stdout_stream).await +} - let raw_output_result: std::result::Result = match sandbox_type - { - SandboxType::None => exec(params, sandbox_policy, stdout_stream.clone()).await, - SandboxType::MacosSeatbelt => { - let ExecParams { - command, - cwd: command_cwd, - env, - .. - } = params; - let child = spawn_command_under_seatbelt( - command, - command_cwd, - sandbox_policy, - sandbox_cwd, - StdioPolicy::RedirectForShellTool, - env, - ) - .await?; - consume_truncated_output(child, timeout_duration, stdout_stream.clone()).await - } - SandboxType::LinuxSeccomp => { - let ExecParams { - command, - cwd: command_cwd, - env, - .. - } = params; - - let codex_linux_sandbox_exe = codex_linux_sandbox_exe - .as_ref() - .ok_or(CodexErr::LandlockSandboxExecutableNotProvided)?; - let child = spawn_command_under_linux_sandbox( - codex_linux_sandbox_exe, - command, - command_cwd, - sandbox_policy, - sandbox_cwd, - StdioPolicy::RedirectForShellTool, - env, - ) - .await?; - - consume_truncated_output(child, timeout_duration, stdout_stream).await - } - }; +pub(crate) async fn execute_sandbox_launch( + params: ExecParams, + launch: SandboxLaunch, + sandbox_type: SandboxType, + sandbox_policy: &SandboxPolicy, + stdout_stream: Option, +) -> Result { + let start = Instant::now(); + let raw_output_result = spawn_with_launch(params, launch, sandbox_policy, stdout_stream).await; let duration = start.elapsed(); + finalize_exec_result(raw_output_result, sandbox_type, duration) +} + +fn finalize_exec_result( + raw_output_result: std::result::Result, + sandbox_type: SandboxType, + duration: Duration, +) -> Result { match raw_output_result { Ok(raw_output) => { #[allow(unused_mut)] @@ -192,12 +171,73 @@ pub async fn process_exec_tool_call( } } +async fn spawn_with_launch( + params: ExecParams, + launch: SandboxLaunch, + sandbox_policy: &SandboxPolicy, + stdout_stream: Option, +) -> std::result::Result { + let ExecParams { + command: _, + cwd, + timeout_ms, + mut env, + with_escalated_permissions, + justification, + } = params; + + let SandboxLaunch { + program, + args, + env: launch_env, + .. + } = launch; + env.extend(launch_env); + + let mut command = Vec::with_capacity(1 + args.len()); + command.push(program); + command.extend(args); + + let updated_params = ExecParams { + command, + cwd, + timeout_ms, + env, + with_escalated_permissions, + justification, + }; + + exec(updated_params, sandbox_policy, stdout_stream).await +} + +pub(crate) mod errors { + use super::CodexErr; + use super::SandboxLaunchError; + + impl From for CodexErr { + fn from(err: SandboxLaunchError) -> Self { + match err { + SandboxLaunchError::MissingCommandLine => CodexErr::Io(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "command args are empty", + )), + SandboxLaunchError::MissingLinuxSandboxExecutable => { + CodexErr::LandlockSandboxExecutableNotProvided + } + } + } + } +} + /// We don't have a fully deterministic way to tell if our command failed /// because of the sandbox - a command in the user's zshrc file might hit an /// error, but the command itself might fail or succeed for other reasons. /// For now, we conservatively check for well known command failure exit codes and /// also look for common sandbox denial keywords in the command output. -fn is_likely_sandbox_denied(sandbox_type: SandboxType, exec_output: &ExecToolCallOutput) -> bool { +pub(crate) fn is_likely_sandbox_denied( + sandbox_type: SandboxType, + exec_output: &ExecToolCallOutput, +) -> bool { if sandbox_type == SandboxType::None || exec_output.exit_code == 0 { return false; } diff --git a/codex-rs/core/src/exec_command/exec_command_session.rs b/codex-rs/core/src/exec_command/exec_command_session.rs index 31b3c929b2..786055cbf4 100644 --- a/codex-rs/core/src/exec_command/exec_command_session.rs +++ b/codex-rs/core/src/exec_command/exec_command_session.rs @@ -27,9 +27,12 @@ pub(crate) struct ExecCommandSession { /// Tracks whether the underlying process has exited. exit_status: std::sync::Arc, + /// Captures the process exit code once it becomes available. + exit_code: std::sync::Arc>>, } impl ExecCommandSession { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( writer_tx: mpsc::Sender>, output_tx: broadcast::Sender>, @@ -38,6 +41,7 @@ impl ExecCommandSession { writer_handle: JoinHandle<()>, wait_handle: JoinHandle<()>, exit_status: std::sync::Arc, + exit_code: std::sync::Arc>>, ) -> (Self, broadcast::Receiver>) { let initial_output_rx = output_tx.subscribe(); ( @@ -49,6 +53,7 @@ impl ExecCommandSession { writer_handle: StdMutex::new(Some(writer_handle)), wait_handle: StdMutex::new(Some(wait_handle)), exit_status, + exit_code, }, initial_output_rx, ) @@ -65,6 +70,10 @@ impl ExecCommandSession { pub(crate) fn has_exited(&self) -> bool { self.exit_status.load(std::sync::atomic::Ordering::SeqCst) } + + pub(crate) fn exit_code(&self) -> Option { + self.exit_code.lock().ok().and_then(|guard| *guard) + } } impl Drop for ExecCommandSession { diff --git a/codex-rs/core/src/exec_command/session_manager.rs b/codex-rs/core/src/exec_command/session_manager.rs index cd1c5329a2..6e2ae35a69 100644 --- a/codex-rs/core/src/exec_command/session_manager.rs +++ b/codex-rs/core/src/exec_command/session_manager.rs @@ -1,16 +1,7 @@ use std::collections::HashMap; -use std::io::ErrorKind; -use std::io::Read; -use std::sync::Arc; -use std::sync::Mutex as StdMutex; -use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU32; -use portable_pty::CommandBuilder; -use portable_pty::PtySize; -use portable_pty::native_pty_system; use tokio::sync::Mutex; -use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::time::Duration; use tokio::time::Instant; @@ -20,6 +11,7 @@ use crate::exec_command::exec_command_params::ExecCommandParams; use crate::exec_command::exec_command_params::WriteStdinParams; use crate::exec_command::exec_command_session::ExecCommandSession; use crate::exec_command::session_id::SessionId; +use crate::pty::spawn_pty_process; use crate::truncate::truncate_middle; #[derive(Debug, Default)] @@ -242,102 +234,12 @@ async fn create_exec_command_session( login, } = params; - // Use the native pty implementation for the system - let pty_system = native_pty_system(); - - // Create a new pty - let pair = pty_system.openpty(PtySize { - rows: 24, - cols: 80, - pixel_width: 0, - pixel_height: 0, - })?; - - // Spawn a shell into the pty - let mut command_builder = CommandBuilder::new(shell); let shell_mode_opt = if login { "-lc" } else { "-c" }; - command_builder.arg(shell_mode_opt); - command_builder.arg(cmd); - - let mut child = pair.slave.spawn_command(command_builder)?; - // Obtain a killer that can signal the process independently of `.wait()`. - let killer = child.clone_killer(); - - // Channel to forward write requests to the PTY writer. - let (writer_tx, mut writer_rx) = mpsc::channel::>(128); - // Broadcast for streaming PTY output to readers: subscribers receive from subscription time. - let (output_tx, _) = tokio::sync::broadcast::channel::>(256); - // Reader task: drain PTY and forward chunks to output channel. - let mut reader = pair.master.try_clone_reader()?; - let output_tx_clone = output_tx.clone(); - let reader_handle = tokio::task::spawn_blocking(move || { - let mut buf = [0u8; 8192]; - loop { - match reader.read(&mut buf) { - Ok(0) => break, // EOF - Ok(n) => { - // Forward to broadcast; best-effort if there are subscribers. - let _ = output_tx_clone.send(buf[..n].to_vec()); - } - Err(ref e) if e.kind() == ErrorKind::Interrupted => { - // Retry on EINTR - continue; - } - Err(ref e) if e.kind() == ErrorKind::WouldBlock => { - // We're in a blocking thread; back off briefly and retry. - std::thread::sleep(Duration::from_millis(5)); - continue; - } - Err(_) => break, - } - } - }); - - // Writer task: apply stdin writes to the PTY writer. - let writer = pair.master.take_writer()?; - let writer = Arc::new(StdMutex::new(writer)); - let writer_handle = tokio::spawn({ - let writer = writer.clone(); - async move { - while let Some(bytes) = writer_rx.recv().await { - let writer = writer.clone(); - // Perform blocking write on a blocking thread. - let _ = tokio::task::spawn_blocking(move || { - if let Ok(mut guard) = writer.lock() { - use std::io::Write; - let _ = guard.write_all(&bytes); - let _ = guard.flush(); - } - }) - .await; - } - } - }); - - // Keep the child alive until it exits, then signal exit code. - let (exit_tx, exit_rx) = oneshot::channel::(); - let exit_status = Arc::new(AtomicBool::new(false)); - let wait_exit_status = exit_status.clone(); - let wait_handle = tokio::task::spawn_blocking(move || { - let code = match child.wait() { - Ok(status) => status.exit_code() as i32, - Err(_) => -1, - }; - wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst); - let _ = exit_tx.send(code); - }); - - // Create and store the session with channels. - let (session, initial_output_rx) = ExecCommandSession::new( - writer_tx, - output_tx, - killer, - reader_handle, - writer_handle, - wait_handle, - exit_status, - ); - Ok((session, initial_output_rx, exit_rx)) + let args = vec![shell_mode_opt.to_string(), cmd]; + + let env = HashMap::new(); + let spawned = spawn_pty_process(&shell, &args, &env).await?; + Ok((spawned.session, spawned.output_rx, spawned.exit_rx)) } #[cfg(test)] diff --git a/codex-rs/core/src/executor/backends.rs b/codex-rs/core/src/executor/backends.rs index 9c65745c4f..fc7396bcdd 100644 --- a/codex-rs/core/src/executor/backends.rs +++ b/codex-rs/core/src/executor/backends.rs @@ -11,6 +11,7 @@ use crate::function_tool::FunctionCallError; pub(crate) enum ExecutionMode { Shell, + InteractiveShell, ApplyPatch(ApplyPatchExec), } @@ -36,7 +37,7 @@ static APPLY_PATCH_BACKEND: ApplyPatchBackend = ApplyPatchBackend; pub(crate) fn backend_for_mode(mode: &ExecutionMode) -> &'static dyn ExecutionBackend { match mode { - ExecutionMode::Shell => &SHELL_BACKEND, + ExecutionMode::Shell | ExecutionMode::InteractiveShell => &SHELL_BACKEND, ExecutionMode::ApplyPatch(_) => &APPLY_PATCH_BACKEND, } } @@ -52,7 +53,7 @@ impl ExecutionBackend for ShellBackend { _config: &ExecutorConfig, ) -> Result { match mode { - ExecutionMode::Shell => Ok(params), + ExecutionMode::Shell | ExecutionMode::InteractiveShell => Ok(params), _ => Err(FunctionCallError::RespondToModel( "shell backend invoked with non-shell mode".to_string(), )), @@ -97,9 +98,11 @@ impl ExecutionBackend for ApplyPatchBackend { justification: params.justification, }) } - ExecutionMode::Shell => Err(FunctionCallError::RespondToModel( - "apply_patch backend invoked without patch context".to_string(), - )), + ExecutionMode::Shell | ExecutionMode::InteractiveShell => { + Err(FunctionCallError::RespondToModel( + "apply_patch backend invoked without patch context".to_string(), + )) + } } } diff --git a/codex-rs/core/src/executor/mod.rs b/codex-rs/core/src/executor/mod.rs index 97d7b29294..026d28ec84 100644 --- a/codex-rs/core/src/executor/mod.rs +++ b/codex-rs/core/src/executor/mod.rs @@ -1,13 +1,46 @@ +//! Executor: centralized sandbox policy, approvals, and execution planning. +//! +//! Purpose and responsibilities +//! - Normalizes per‑mode parameters via backends (`backends.rs`). +//! - Selects sandbox placement and handles approvals (`sandbox.rs`). +//! - Produces an `ExecutionPlan` (single source of truth for policy) that +//! callers can either execute directly via `Executor::run` (non‑PTY, piped), +//! or consume piecemeal (e.g., Unified Exec) to launch with a PTY while +//! retaining consistent policy decisions. +//! +//! Key types +//! - `ExecutionMode`: `Shell`, `InteractiveShell`, `ApplyPatch`. +//! - `ExecutionRequest`: inputs + mode + stdout streaming preference. +//! - `ExecutionPlan`: immutable snapshot of the policy decision and helpers to +//! build a `SandboxLaunch` and retry without a sandbox when approved. +//! - `SandboxLaunch`: concrete program/args/env to execute under the chosen +//! sandbox. +//! +//! Typical flows +//! - Non‑PTY (piped): `Executor::run(request, …)` handles plan → launch → +//! execution and post‑processing, including converting sandbox failures into +//! user‑facing messages. +//! - PTY (Unified Exec): build the plan with `prepare_execution_plan` and then +//! use `ExecutionPlan::attempt_with_retry_if` to drive the spawn with +//! `SandboxLaunch`; PTY I/O and buffering remain the caller’s responsibility. +//! +//! This separation keeps sandbox logic and user interaction consistent while +//! allowing different transports (piped vs PTY) to manage their own lifecycles. + mod backends; mod cache; mod runner; mod sandbox; pub(crate) use backends::ExecutionMode; +pub(crate) use runner::ExecutionPlan; pub(crate) use runner::ExecutionRequest; pub(crate) use runner::Executor; pub(crate) use runner::ExecutorConfig; pub(crate) use runner::normalize_exec_result; +pub(crate) use sandbox::SandboxLaunch; +pub(crate) use sandbox::SandboxLaunchError; +pub(crate) use sandbox::build_launch_for_sandbox; pub(crate) mod linkers { use crate::exec::ExecParams; @@ -45,6 +78,7 @@ pub(crate) mod linkers { pub mod errors { use crate::error::CodexErr; + use crate::executor::SandboxLaunchError; use crate::function_tool::FunctionCallError; use thiserror::Error; @@ -61,4 +95,10 @@ pub mod errors { FunctionCallError::RespondToModel(msg.into()).into() } } + + impl From for ExecError { + fn from(err: SandboxLaunchError) -> Self { + CodexErr::from(err).into() + } + } } diff --git a/codex-rs/core/src/executor/runner.rs b/codex-rs/core/src/executor/runner.rs index e13016e37f..de4a8a7fec 100644 --- a/codex-rs/core/src/executor/runner.rs +++ b/codex-rs/core/src/executor/runner.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::path::PathBuf; use std::sync::Arc; use std::sync::RwLock; @@ -15,21 +16,27 @@ use crate::exec::ExecToolCallOutput; use crate::exec::SandboxType; use crate::exec::StdoutStream; use crate::exec::StreamOutput; -use crate::exec::process_exec_tool_call; +use crate::exec::execute_sandbox_launch; use crate::executor::errors::ExecError; +use crate::executor::sandbox::RetrySandboxContext; +use crate::executor::sandbox::SandboxDecision; +use crate::executor::sandbox::SandboxLaunch; +use crate::executor::sandbox::SandboxLaunchError; +use crate::executor::sandbox::build_launch_for_sandbox; use crate::executor::sandbox::select_sandbox; use crate::function_tool::FunctionCallError; use crate::protocol::AskForApproval; -use crate::protocol::ReviewDecision; use crate::protocol::SandboxPolicy; use crate::shell; use crate::tools::context::ExecCommandContext; -use codex_otel::otel_event_manager::ToolDecisionSource; #[derive(Clone, Debug)] pub(crate) struct ExecutorConfig { pub(crate) sandbox_policy: SandboxPolicy, pub(crate) sandbox_cwd: PathBuf, + // Path to codex-linux-sandbox executable (Linux-only). Used by initial_launch when selecting Linux sandbox. + pub(crate) codex_linux_sandbox_exe: Option, + // Path to the codex binary itself (used by apply_patch backend to self-invoke when needed). pub(crate) codex_exe: Option, } @@ -37,16 +44,134 @@ impl ExecutorConfig { pub(crate) fn new( sandbox_policy: SandboxPolicy, sandbox_cwd: PathBuf, + codex_linux_sandbox_exe: Option, codex_exe: Option, ) -> Self { + let codex_exe = codex_exe.or_else(|| derive_codex_exe(&codex_linux_sandbox_exe)); Self { sandbox_policy, sandbox_cwd, + codex_linux_sandbox_exe, codex_exe, } } } +fn derive_codex_exe(sandbox_exe: &Option) -> Option { + sandbox_exe.as_ref().and_then(|path| { + let stem_matches_sandbox = path + .file_stem() + .and_then(|stem| stem.to_str()) + .is_some_and(|stem| stem == "codex-linux-sandbox"); + if stem_matches_sandbox { + None + } else { + Some(path.clone()) + } + }) +} + +pub(crate) struct ExecutionPlan { + request: ExecutionRequest, + config: ExecutorConfig, + sandbox_decision: SandboxDecision, + stdout_stream: Option, + context: ExecCommandContext, +} + +impl ExecutionPlan { + pub(crate) fn request(&self) -> &ExecutionRequest { + &self.request + } + + pub(crate) fn config(&self) -> &ExecutorConfig { + &self.config + } + + pub(crate) fn stdout_stream(&self) -> Option { + self.stdout_stream.clone() + } + + pub(crate) fn initial_launch(&self) -> Result { + build_launch_for_sandbox( + self.sandbox_decision.initial_sandbox, + &self.request.params.command, + &self.config.sandbox_policy, + &self.config.sandbox_cwd, + self.config.codex_linux_sandbox_exe.as_ref(), + ) + } + + pub(crate) fn retry_launch(&self) -> Result { + build_launch_for_sandbox( + SandboxType::None, + &self.request.params.command, + &self.config.sandbox_policy, + &self.config.sandbox_cwd, + None, + ) + } + + pub(crate) fn approval_command(&self) -> &[String] { + &self.request.approval_command + } + + pub(crate) async fn prompt_retry_without_sandbox( + &self, + session: &Session, + failure_message: impl Into, + ) -> bool { + if !self.sandbox_decision.escalate_on_failure { + return false; + } + + let approval = crate::executor::sandbox::request_retry_without_sandbox( + session, + failure_message.into(), + self.approval_command(), + self.request.params.cwd.clone(), + RetrySandboxContext { + sub_id: &self.context.sub_id, + call_id: &self.context.call_id, + tool_name: &self.context.tool_name, + otel_event_manager: &self.context.otel_event_manager, + }, + ) + .await; + + approval.is_some() + } + + /// Like `attempt_with_retry`, but only retries if `should_retry(err)` returns true. + pub(crate) async fn attempt_with_retry_if( + &self, + session: &Session, + attempt: Attempt, + should_retry: Should, + ) -> Result + where + Attempt: Fn(SandboxLaunch) -> Fut, + Fut: Future>, + Should: Fn(&E) -> bool, + E: From + std::fmt::Display, + { + let initial_launch = self.initial_launch().map_err(E::from)?; + match attempt(initial_launch).await { + Ok(result) => Ok(result), + Err(err) if self.sandbox_decision.escalate_on_failure && should_retry(&err) => { + let failure = format!("Execution failed: {err}"); + if self.prompt_retry_without_sandbox(session, failure).await { + let retry_launch = self.retry_launch().map_err(E::from)?; + attempt(retry_launch).await + } else { + Err(err) + } + } + Err(err) => Err(err), + } + } +} + /// Coordinates sandbox selection, backend-specific preparation, and command /// execution for tool calls requested by the model. pub(crate) struct Executor { @@ -62,26 +187,21 @@ impl Executor { } } - /// Updates the sandbox policy and working directory used for future - /// executions without recreating the executor. - pub(crate) fn update_environment(&self, sandbox_policy: SandboxPolicy, sandbox_cwd: PathBuf) { - if let Ok(mut cfg) = self.config.write() { - cfg.sandbox_policy = sandbox_policy; - cfg.sandbox_cwd = sandbox_cwd; - } + pub(crate) fn record_session_approval(&self, command: Vec) { + self.approval_cache.insert(command); } - /// Runs a prepared execution request end-to-end: prepares parameters, decides on - /// sandbox placement (prompting the user when necessary), launches the command, - /// and lets the backend post-process the final output. - pub(crate) async fn run( + pub(crate) async fn prepare_execution_plan( &self, mut request: ExecutionRequest, session: &Session, approval_policy: AskForApproval, context: &ExecCommandContext, - ) -> Result { - if matches!(request.mode, ExecutionMode::Shell) { + ) -> Result { + if matches!( + request.mode, + ExecutionMode::Shell | ExecutionMode::InteractiveShell + ) { request.params = maybe_translate_shell_command(request.params, session, request.use_shell_profile); } @@ -104,6 +224,12 @@ impl Executor { .prepare(request.params, &request.mode, &config) .map_err(ExecError::from)?; + let config = self + .config + .read() + .map_err(|_| ExecError::rejection("executor config poisoned"))? + .clone(); + // Step 3: Decide sandbox placement, prompting for approval when needed. let sandbox_decision = select_sandbox( &request, @@ -120,118 +246,71 @@ impl Executor { self.approval_cache.insert(request.approval_command.clone()); } - // Step 4: Launch the command within the chosen sandbox. - let first_attempt = self - .spawn( - request.params.clone(), - sandbox_decision.initial_sandbox, - &config, - stdout_stream.clone(), - ) - .await; + Ok(ExecutionPlan { + request, + config, + sandbox_decision, + stdout_stream, + context: context.clone(), + }) + } - // Step 5: Handle sandbox outcomes, optionally escalating to an unsandboxed retry. - match first_attempt { - Ok(output) => Ok(output), - Err(CodexErr::Sandbox(SandboxErr::Timeout { output })) => { - Err(CodexErr::Sandbox(SandboxErr::Timeout { output }).into()) - } - Err(CodexErr::Sandbox(error)) => { - if sandbox_decision.escalate_on_failure { - self.retry_without_sandbox( - &request, - &config, - session, - context, - stdout_stream, - error, - ) - .await - } else { - let message = sandbox_failure_message(error); - Err(ExecError::rejection(message)) - } - } - Err(err) => Err(err.into()), + /// Updates the sandbox policy and working directory used for future + /// executions without recreating the executor. + pub(crate) fn update_environment(&self, sandbox_policy: SandboxPolicy, sandbox_cwd: PathBuf) { + if let Ok(mut cfg) = self.config.write() { + cfg.sandbox_policy = sandbox_policy; + cfg.sandbox_cwd = sandbox_cwd; } } - /// Fallback path invoked when a sandboxed run is denied so the user can - /// approve rerunning without isolation. - async fn retry_without_sandbox( + /// Runs a prepared execution request end-to-end: prepares parameters, decides on + /// sandbox placement (prompting the user when necessary), launches the command, + /// and lets the backend post-process the final output. + pub(crate) async fn run( &self, - request: &ExecutionRequest, - config: &ExecutorConfig, + request: ExecutionRequest, session: &Session, + approval_policy: AskForApproval, context: &ExecCommandContext, - stdout_stream: Option, - sandbox_error: SandboxErr, ) -> Result { - session - .notify_background_event( - &context.sub_id, - format!("Execution failed: {sandbox_error}"), - ) - .await; - let decision = session - .request_command_approval( - context.sub_id.to_string(), - context.call_id.to_string(), - request.approval_command.clone(), - request.params.cwd.clone(), - Some("command failed; retry without sandbox?".to_string()), + let plan = self + .prepare_execution_plan(request, session, approval_policy, context) + .await?; + + let stdout_stream = plan.stdout_stream(); + let sandbox_policy = plan.config().sandbox_policy.clone(); + + // Drive attempts via the shared helper, but do not retry on timeouts. + let result: Result = plan + .attempt_with_retry_if( + session, + |launch| { + let params = plan.request().params.clone(); + let sandbox = plan.sandbox_decision.initial_sandbox; + let policy = sandbox_policy.clone(); + let stream = stdout_stream.clone(); + async move { execute_sandbox_launch(params, launch, sandbox, &policy, stream).await } + }, + |err: &CodexErr| { !matches!(err, CodexErr::Sandbox(SandboxErr::Timeout { .. })) }, ) .await; - context.otel_event_manager.tool_decision( - &context.tool_name, - &context.call_id, - decision, - ToolDecisionSource::User, - ); - match decision { - ReviewDecision::Approved | ReviewDecision::ApprovedForSession => { - if matches!(decision, ReviewDecision::ApprovedForSession) { - self.approval_cache.insert(request.approval_command.clone()); - } - session - .notify_background_event(&context.sub_id, "retrying command without sandbox") - .await; - - let retry_output = self - .spawn( - request.params.clone(), - SandboxType::None, - config, - stdout_stream, - ) - .await?; - - Ok(retry_output) + match result { + Ok(output) => Ok(output), + Err(CodexErr::Sandbox(SandboxErr::Timeout { output })) => { + Err(ExecError::Codex(CodexErr::Sandbox(SandboxErr::Timeout { + output, + }))) } - ReviewDecision::Denied | ReviewDecision::Abort => { - Err(ExecError::rejection("exec command rejected by user")) + Err(CodexErr::Sandbox(error)) => { + // Convert non-timeout sandbox errors into user-facing rejection messages. + let message = sandbox_failure_message(error); + Err(ExecError::rejection(message)) } + Err(err) => Err(err.into()), } } - - async fn spawn( - &self, - params: ExecParams, - sandbox: SandboxType, - config: &ExecutorConfig, - stdout_stream: Option, - ) -> Result { - process_exec_tool_call( - params, - sandbox, - &config.sandbox_policy, - &config.sandbox_cwd, - &config.codex_exe, - stdout_stream, - ) - .await - } } fn maybe_translate_shell_command( diff --git a/codex-rs/core/src/executor/sandbox.rs b/codex-rs/core/src/executor/sandbox.rs index 5c01ff69b4..473dfe46b7 100644 --- a/codex-rs/core/src/executor/sandbox.rs +++ b/codex-rs/core/src/executor/sandbox.rs @@ -5,14 +5,103 @@ use crate::executor::ExecutionMode; use crate::executor::ExecutionRequest; use crate::executor::ExecutorConfig; use crate::executor::errors::ExecError; +use crate::landlock::create_linux_sandbox_command_args; +use crate::protocol::SandboxPolicy; use crate::safety::SafetyCheck; use crate::safety::assess_command_safety; use crate::safety::assess_patch_safety; +use crate::seatbelt::MACOS_PATH_TO_SEATBELT_EXECUTABLE; +use crate::seatbelt::create_seatbelt_command_args; +use crate::spawn::CODEX_SANDBOX_ENV_VAR; +use crate::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use codex_otel::otel_event_manager::OtelEventManager; use codex_otel::otel_event_manager::ToolDecisionSource; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::ReviewDecision; +use std::collections::HashMap; use std::collections::HashSet; +use std::path::Path; +use std::path::PathBuf; +use thiserror::Error; + +#[derive(Debug)] +pub(crate) struct SandboxLaunch { + pub sandbox_type: SandboxType, + pub program: String, + pub args: Vec, + pub env: HashMap, +} + +#[derive(Debug, Error)] +pub(crate) enum SandboxLaunchError { + #[error("missing command line for sandbox launch")] + MissingCommandLine, + #[error("missing codex-linux-sandbox executable path")] + MissingLinuxSandboxExecutable, +} + +pub(crate) fn build_launch_for_sandbox( + sandbox: SandboxType, + command: &[String], + sandbox_policy: &SandboxPolicy, + sandbox_policy_cwd: &Path, + codex_linux_sandbox_exe: Option<&PathBuf>, +) -> Result { + let mut env = HashMap::new(); + if !sandbox_policy.has_full_network_access() { + env.insert( + CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR.to_string(), + "1".to_string(), + ); + } + + match sandbox { + SandboxType::None => { + let (program, args) = command + .split_first() + .ok_or(SandboxLaunchError::MissingCommandLine)?; + Ok(SandboxLaunch { + sandbox_type: SandboxType::None, + program: program.clone(), + args: args.to_vec(), + env, + }) + } + SandboxType::MacosSeatbelt => { + env.insert(CODEX_SANDBOX_ENV_VAR.to_string(), "seatbelt".to_string()); + let args = + create_seatbelt_command_args(command.to_vec(), sandbox_policy, sandbox_policy_cwd); + Ok(SandboxLaunch { + sandbox_type: SandboxType::MacosSeatbelt, + program: MACOS_PATH_TO_SEATBELT_EXECUTABLE.to_string(), + args, + env, + }) + } + SandboxType::LinuxSeccomp => { + let exe = + codex_linux_sandbox_exe.ok_or(SandboxLaunchError::MissingLinuxSandboxExecutable)?; + let args = create_linux_sandbox_command_args( + command.to_vec(), + sandbox_policy, + sandbox_policy_cwd, + ); + Ok(SandboxLaunch { + sandbox_type: SandboxType::LinuxSeccomp, + program: exe.to_string_lossy().to_string(), + args, + env, + }) + } + } +} + +pub(crate) struct RetrySandboxContext<'a> { + pub sub_id: &'a str, + pub call_id: &'a str, + pub tool_name: &'a str, + pub otel_event_manager: &'a OtelEventManager, +} /// Sandbox placement options selected for an execution run, including whether /// to escalate after failures and whether approvals should persist. @@ -50,6 +139,53 @@ fn should_escalate_on_failure(approval: AskForApproval, sandbox: SandboxType) -> ) } +pub(crate) async fn request_retry_without_sandbox( + session: &Session, + failure_message: impl Into, + command: &[String], + cwd: PathBuf, + ctx: RetrySandboxContext<'_>, +) -> Option { + session + .notify_background_event(ctx.sub_id, failure_message.into()) + .await; + + let approval_command = command.to_vec(); + let decision = session + .request_command_approval( + ctx.sub_id.to_string(), + ctx.call_id.to_string(), + approval_command.clone(), + cwd, + Some("command failed; retry without sandbox?".to_string()), + ) + .await; + + ctx.otel_event_manager.tool_decision( + ctx.tool_name, + ctx.call_id, + decision, + ToolDecisionSource::User, + ); + + match decision { + ReviewDecision::Approved | ReviewDecision::ApprovedForSession => { + if matches!(decision, ReviewDecision::ApprovedForSession) { + session + .services + .executor + .record_session_approval(approval_command); + } + + session + .notify_background_event(ctx.sub_id, "retrying command without sandbox") + .await; + Some(decision) + } + ReviewDecision::Denied | ReviewDecision::Abort => None, + } +} + /// Determines how a command should be sandboxed, prompting the user when /// policy requires explicit approval. #[allow(clippy::too_many_arguments)] @@ -64,7 +200,7 @@ pub async fn select_sandbox( otel_event_manager: &OtelEventManager, ) -> Result { match &request.mode { - ExecutionMode::Shell => { + ExecutionMode::Shell | ExecutionMode::InteractiveShell => { select_shell_sandbox( request, approval_policy, @@ -207,7 +343,7 @@ mod tests { action, user_explicitly_approved_this_action: true, }; - let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None); + let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None, None); let request = ExecutionRequest { params: ExecParams { command: vec!["apply_patch".into()], @@ -250,7 +386,12 @@ mod tests { action, user_explicitly_approved_this_action: false, }; - let cfg = ExecutorConfig::new(SandboxPolicy::DangerFullAccess, std::env::temp_dir(), None); + let cfg = ExecutorConfig::new( + SandboxPolicy::DangerFullAccess, + std::env::temp_dir(), + None, + None, + ); let request = ExecutionRequest { params: ExecParams { command: vec!["apply_patch".into()], @@ -278,9 +419,8 @@ mod tests { ) .await .expect("ok"); - // On platforms with a sandbox, DangerFullAccess still prefers it - let expected = crate::safety::get_platform_sandbox().unwrap_or(SandboxType::None); - assert_eq!(decision.initial_sandbox, expected); + // DangerFullAccess bypasses sandboxing entirely. + assert_eq!(decision.initial_sandbox, SandboxType::None); assert_eq!(decision.escalate_on_failure, false); } @@ -294,7 +434,7 @@ mod tests { action, user_explicitly_approved_this_action: false, }; - let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None); + let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None, None); let request = ExecutionRequest { params: ExecParams { command: vec!["apply_patch".into()], @@ -333,7 +473,12 @@ mod tests { #[tokio::test] async fn select_shell_autoapprove_in_danger_mode() { let (session, ctx) = make_session_and_context(); - let cfg = ExecutorConfig::new(SandboxPolicy::DangerFullAccess, std::env::temp_dir(), None); + let cfg = ExecutorConfig::new( + SandboxPolicy::DangerFullAccess, + std::env::temp_dir(), + None, + None, + ); let request = ExecutionRequest { params: ExecParams { command: vec!["some-unknown".into()], @@ -369,7 +514,7 @@ mod tests { #[tokio::test] async fn select_shell_escalates_on_failure_with_platform_sandbox() { let (session, ctx) = make_session_and_context(); - let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None); + let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None, None); let request = ExecutionRequest { params: ExecParams { // Unknown command => untrusted but not flagged dangerous diff --git a/codex-rs/core/src/landlock.rs b/codex-rs/core/src/landlock.rs index 264ea747ca..f5b765efd6 100644 --- a/codex-rs/core/src/landlock.rs +++ b/codex-rs/core/src/landlock.rs @@ -40,7 +40,7 @@ where } /// Converts the sandbox policy into the CLI invocation for `codex-linux-sandbox`. -fn create_linux_sandbox_command_args( +pub(crate) fn create_linux_sandbox_command_args( command: Vec, sandbox_policy: &SandboxPolicy, sandbox_policy_cwd: &Path, diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 22e1d4cdf0..c3629325ce 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -39,6 +39,7 @@ mod mcp_tool_call; mod message_history; mod model_provider_info; pub mod parse_command; +mod pty; mod truncate; mod unified_exec; mod user_instructions; diff --git a/codex-rs/core/src/pty.rs b/codex-rs/core/src/pty.rs new file mode 100644 index 0000000000..7e4382f90f --- /dev/null +++ b/codex-rs/core/src/pty.rs @@ -0,0 +1,130 @@ +use std::collections::HashMap; +use std::io::ErrorKind; +use std::sync::Arc; +use std::sync::Mutex as StdMutex; +use std::sync::atomic::AtomicBool; + +use portable_pty::CommandBuilder; +use portable_pty::PtySize; +use portable_pty::native_pty_system; +use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::task::JoinHandle; +use tokio::time::Duration; + +use crate::exec_command::ExecCommandSession; + +#[derive(Debug)] +pub(crate) struct SpawnedPty { + pub session: ExecCommandSession, + pub output_rx: broadcast::Receiver>, + pub exit_rx: oneshot::Receiver, +} + +/// Spawn a PTY-based process and return the interactive session along with +/// receivers for streaming output and exit status. +pub(crate) async fn spawn_pty_process( + program: &str, + args: &[String], + env: &HashMap, +) -> anyhow::Result { + if program.is_empty() { + anyhow::bail!("missing program for PTY spawn"); + } + + let pty_system = native_pty_system(); + let pair = pty_system.openpty(PtySize { + rows: 24, + cols: 80, + pixel_width: 0, + pixel_height: 0, + })?; + + let mut command_builder = CommandBuilder::new(program); + for arg in args { + command_builder.arg(arg.clone()); + } + for (key, value) in env { + command_builder.env(key.clone(), value.clone()); + } + + let mut child = pair.slave.spawn_command(command_builder)?; + let killer = child.clone_killer(); + + let (writer_tx, mut writer_rx) = mpsc::channel::>(128); + let (output_tx, _) = broadcast::channel::>(256); + + let mut reader = pair.master.try_clone_reader()?; + let output_tx_clone = output_tx.clone(); + let reader_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || { + let mut buf = [0u8; 8192]; + loop { + match reader.read(&mut buf) { + Ok(0) => break, + Ok(n) => { + let _ = output_tx_clone.send(buf[..n].to_vec()); + } + Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, + Err(ref e) if e.kind() == ErrorKind::WouldBlock => { + std::thread::sleep(Duration::from_millis(5)); + continue; + } + Err(_) => break, + } + } + }); + + let writer = pair.master.take_writer()?; + let writer = Arc::new(StdMutex::new(writer)); + let writer_handle: JoinHandle<()> = tokio::spawn({ + let writer = writer.clone(); + async move { + while let Some(bytes) = writer_rx.recv().await { + let writer = writer.clone(); + let _ = tokio::task::spawn_blocking(move || { + if let Ok(mut guard) = writer.lock() { + use std::io::Write; + let _ = guard.write_all(&bytes); + let _ = guard.flush(); + } + }) + .await; + } + } + }); + + let (exit_tx, exit_rx) = oneshot::channel::(); + let exit_status = Arc::new(AtomicBool::new(false)); + let wait_exit_status = exit_status.clone(); + let exit_code = Arc::new(StdMutex::new(None)); + let wait_exit_code = exit_code.clone(); + let wait_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || { + let code = match child.wait() { + Ok(status) => status.exit_code() as i32, + Err(_) => -1, + }; + wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst); + if let Ok(mut guard) = wait_exit_code.lock() { + *guard = Some(code); + } + let _ = exit_tx.send(code); + }); + + let (session, output_rx) = ExecCommandSession::new( + writer_tx, + output_tx, + killer, + reader_handle, + writer_handle, + wait_handle, + exit_status, + exit_code, + ); + + Ok(SpawnedPty { + session, + output_rx, + exit_rx, + }) +} diff --git a/codex-rs/core/src/safety.rs b/codex-rs/core/src/safety.rs index 0ed0f929ff..bf5c2c1319 100644 --- a/codex-rs/core/src/safety.rs +++ b/codex-rs/core/src/safety.rs @@ -55,23 +55,23 @@ pub fn assess_patch_safety( if is_write_patch_constrained_to_writable_paths(action, sandbox_policy, cwd) || policy == AskForApproval::OnFailure { - // Only auto‑approve when we can actually enforce a sandbox. Otherwise - // fall back to asking the user because the patch may touch arbitrary - // paths outside the project. - match get_platform_sandbox() { - Some(sandbox_type) => SafetyCheck::AutoApprove { - sandbox_type, + if matches!(sandbox_policy, SandboxPolicy::DangerFullAccess) { + // DangerFullAccess is intended to bypass sandboxing entirely. + SafetyCheck::AutoApprove { + sandbox_type: SandboxType::None, user_explicitly_approved: false, - }, - None if sandbox_policy == &SandboxPolicy::DangerFullAccess => { - // If the user has explicitly requested DangerFullAccess, then - // we can auto-approve even without a sandbox. - SafetyCheck::AutoApprove { - sandbox_type: SandboxType::None, + } + } else { + // Only auto‑approve when we can actually enforce a sandbox. Otherwise + // fall back to asking the user because the patch may touch arbitrary + // paths outside the project. + match get_platform_sandbox() { + Some(sandbox_type) => SafetyCheck::AutoApprove { + sandbox_type, user_explicitly_approved: false, - } + }, + None => SafetyCheck::AskUser, } - None => SafetyCheck::AskUser, } } else if policy == AskForApproval::Never { SafetyCheck::Reject { diff --git a/codex-rs/core/src/seatbelt.rs b/codex-rs/core/src/seatbelt.rs index 09e93668bc..abd88d41bf 100644 --- a/codex-rs/core/src/seatbelt.rs +++ b/codex-rs/core/src/seatbelt.rs @@ -14,7 +14,7 @@ const MACOS_SEATBELT_BASE_POLICY: &str = include_str!("seatbelt_base_policy.sbpl /// to defend against an attacker trying to inject a malicious version on the /// PATH. If /usr/bin/sandbox-exec has been tampered with, then the attacker /// already has root access. -const MACOS_PATH_TO_SEATBELT_EXECUTABLE: &str = "/usr/bin/sandbox-exec"; +pub(crate) const MACOS_PATH_TO_SEATBELT_EXECUTABLE: &str = "/usr/bin/sandbox-exec"; pub async fn spawn_command_under_seatbelt( command: Vec, @@ -39,7 +39,7 @@ pub async fn spawn_command_under_seatbelt( .await } -fn create_seatbelt_command_args( +pub(crate) fn create_seatbelt_command_args( command: Vec, sandbox_policy: &SandboxPolicy, sandbox_policy_cwd: &Path, diff --git a/codex-rs/core/src/tools/handlers/unified_exec.rs b/codex-rs/core/src/tools/handlers/unified_exec.rs index ce47dded3c..074082771e 100644 --- a/codex-rs/core/src/tools/handlers/unified_exec.rs +++ b/codex-rs/core/src/tools/handlers/unified_exec.rs @@ -35,7 +35,13 @@ impl ToolHandler for UnifiedExecHandler { async fn handle(&self, invocation: ToolInvocation) -> Result { let ToolInvocation { - session, payload, .. + session, + turn, + sub_id, + call_id, + tool_name, + payload, + .. } = invocation; let args = match payload { @@ -73,13 +79,24 @@ impl ToolHandler for UnifiedExecHandler { }; let request = UnifiedExecRequest { - session_id: parsed_session_id, input_chunks: &input, timeout_ms, }; let value = session - .run_unified_exec_request(request) + .services + .unified_exec_manager + .handle_request( + request, + crate::unified_exec::UnifiedExecContext { + session: &session, + turn: turn.as_ref(), + sub_id: &sub_id, + call_id: &call_id, + tool_name: &tool_name, + session_id: parsed_session_id, + }, + ) .await .map_err(|err| { FunctionCallError::RespondToModel(format!("unified exec failed: {err:?}")) diff --git a/codex-rs/core/src/unified_exec/errors.rs b/codex-rs/core/src/unified_exec/errors.rs index 6bf5bf7ec5..eb28d95f61 100644 --- a/codex-rs/core/src/unified_exec/errors.rs +++ b/codex-rs/core/src/unified_exec/errors.rs @@ -1,22 +1,39 @@ +use crate::executor::SandboxLaunchError; use thiserror::Error; #[derive(Debug, Error)] pub(crate) enum UnifiedExecError { - #[error("Failed to create unified exec session: {pty_error}")] - CreateSession { - #[source] - pty_error: anyhow::Error, - }, + #[error("Failed to create unified exec session: {message}")] + CreateSession { message: String }, #[error("Unknown session id {session_id}")] UnknownSessionId { session_id: i32 }, #[error("failed to write to stdin")] WriteToStdin, #[error("missing command line for unified exec request")] MissingCommandLine, + #[error("missing codex-linux-sandbox executable path")] + MissingLinuxSandboxExecutable, + #[error("Command denied by sandbox: {message}")] + SandboxDenied { message: String }, } impl UnifiedExecError { - pub(crate) fn create_session(error: anyhow::Error) -> Self { - Self::CreateSession { pty_error: error } + pub(crate) fn create_session(message: String) -> Self { + Self::CreateSession { message } + } + + pub(crate) fn sandbox_denied(message: String) -> Self { + Self::SandboxDenied { message } + } +} + +impl From for UnifiedExecError { + fn from(err: SandboxLaunchError) -> Self { + match err { + SandboxLaunchError::MissingCommandLine => UnifiedExecError::MissingCommandLine, + SandboxLaunchError::MissingLinuxSandboxExecutable => { + UnifiedExecError::MissingLinuxSandboxExecutable + } + } } } diff --git a/codex-rs/core/src/unified_exec/mod.rs b/codex-rs/core/src/unified_exec/mod.rs index a8f754c92e..700cf90cf4 100644 --- a/codex-rs/core/src/unified_exec/mod.rs +++ b/codex-rs/core/src/unified_exec/mod.rs @@ -1,23 +1,61 @@ -use portable_pty::CommandBuilder; -use portable_pty::PtySize; -use portable_pty::native_pty_system; +//! Unified Exec: interactive PTY execution with session management. +//! +//! Purpose and responsibilities +//! - Manages interactive PTY sessions (create, reuse, buffer output with caps). +//! - Delegates sandbox selection, approvals, and policy decisions to the +//! executor (single source of truth) via `prepare_execution_plan`. +//! - Spawns the PTY using the `SandboxLaunch` produced by the plan and reuses +//! `ExecutionPlan::attempt_with_retry_if` to optionally retry without a +//! sandbox when policy allows and the user approves. +//! - After process exit, classifies sandbox denials using the shared +//! `is_likely_sandbox_denied` heuristic so denial messages stay consistent. +//! +//! Why not call `executor.run`? +//! `executor.run` drives a non‑PTY (piped) execution flow end‑to‑end. Unified +//! Exec needs an interactive PTY that persists across requests and supports +//! streaming I/O. To keep policy logic centralized while still owning the PTY +//! lifecycle, Unified Exec builds an `ExecutionRequest` with +//! `ExecutionMode::InteractiveShell`, asks the executor for an +//! `ExecutionPlan`, then performs the PTY spawn itself with the plan’s +//! sandboxed command and environment. +//! +//! Handoff at a glance +//! 1) Build `ExecutionRequest` (interactive shell). +//! 2) `executor.update_environment(turn.sandbox_policy, turn.cwd)`. +//! 3) `plan = executor.prepare_execution_plan(request, …)`. +//! 4) `plan.attempt_with_retry_if(|launch| spawn_pty_process(launch), retry_on_denied)`. +//! 5) Buffer+stream output, apply timeouts, and return `UnifiedExecResult`. +//! +//! This structure ensures Unified Exec benefits from the same approval, +//! sandbox selection, and escalation rules as regular exec, while keeping the +//! PTY/session concerns isolated here. + use std::collections::HashMap; use std::collections::VecDeque; -use std::io::ErrorKind; -use std::io::Read; use std::sync::Arc; -use std::sync::Mutex as StdMutex; -use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicI32; use std::sync::atomic::Ordering; use tokio::sync::Mutex; use tokio::sync::Notify; use tokio::sync::mpsc; +use tokio::sync::oneshot::error::TryRecvError; use tokio::task::JoinHandle; use tokio::time::Duration; use tokio::time::Instant; +use crate::codex::Session; +use crate::codex::TurnContext; +use crate::exec::ExecParams; +use crate::exec::ExecToolCallOutput; +use crate::exec::SandboxType; +use crate::exec::StreamOutput; +use crate::exec::is_likely_sandbox_denied; use crate::exec_command::ExecCommandSession; +use crate::executor::ExecutionMode; +use crate::executor::ExecutionPlan; +use crate::executor::ExecutionRequest; +use crate::pty::SpawnedPty; +use crate::tools::context::ExecCommandContext; use crate::truncate::truncate_middle; mod errors; @@ -28,9 +66,17 @@ const DEFAULT_TIMEOUT_MS: u64 = 1_000; const MAX_TIMEOUT_MS: u64 = 60_000; const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 128 * 1024; // 128 KiB +pub(crate) struct UnifiedExecContext<'a> { + pub session: &'a Session, + pub turn: &'a TurnContext, + pub sub_id: &'a str, + pub call_id: &'a str, + pub tool_name: &'a str, + pub session_id: Option, +} + #[derive(Debug)] pub(crate) struct UnifiedExecRequest<'a> { - pub session_id: Option, pub input_chunks: &'a [String], pub timeout_ms: Option, } @@ -44,17 +90,19 @@ pub(crate) struct UnifiedExecResult { #[derive(Debug, Default)] pub(crate) struct UnifiedExecSessionManager { next_session_id: AtomicI32, - sessions: Mutex>, + sessions: Mutex>, } #[derive(Debug)] -struct ManagedUnifiedExecSession { +/// Wraps a PTY session with buffered output and sandbox metadata for unified exec. +struct UnifiedExecSession { session: ExecCommandSession, output_buffer: OutputBuffer, /// Notifies waiters whenever new output has been appended to /// `output_buffer`, allowing clients to poll for fresh data. output_notify: Arc, output_task: JoinHandle<()>, + sandbox_type: SandboxType, } #[derive(Debug, Default)] @@ -94,15 +142,20 @@ impl OutputBufferState { self.total_bytes = 0; drained } + + fn snapshot(&self) -> Vec> { + self.chunks.iter().cloned().collect() + } } type OutputBuffer = Arc>; type OutputHandles = (OutputBuffer, Arc); -impl ManagedUnifiedExecSession { +impl UnifiedExecSession { fn new( session: ExecCommandSession, initial_output_rx: tokio::sync::broadcast::Receiver>, + sandbox_type: SandboxType, ) -> Self { let output_buffer = Arc::new(Mutex::new(OutputBufferState::default())); let output_notify = Arc::new(Notify::new()); @@ -134,6 +187,7 @@ impl ManagedUnifiedExecSession { output_buffer, output_notify, output_task, + sandbox_type, } } @@ -151,18 +205,158 @@ impl ManagedUnifiedExecSession { fn has_exited(&self) -> bool { self.session.has_exited() } + + fn exit_code(&self) -> Option { + self.session.exit_code() + } + + async fn snapshot_output(&self) -> Vec> { + let guard = self.output_buffer.lock().await; + guard.snapshot() + } + + fn sandbox_type(&self) -> SandboxType { + self.sandbox_type + } + + async fn check_for_sandbox_denial(&self) -> Result<(), UnifiedExecError> { + if self.sandbox_type() == SandboxType::None || !self.has_exited() { + return Ok(()); + } + + // Give the reader task a brief moment to flush any final PTY bytes after exit. + let _ = + tokio::time::timeout(Duration::from_millis(20), self.output_notify.notified()).await; + + let collected_chunks = self.snapshot_output().await; + let mut aggregated: Vec = Vec::new(); + for chunk in collected_chunks { + aggregated.extend_from_slice(&chunk); + } + let aggregated_text = String::from_utf8_lossy(&aggregated).to_string(); + let exit_code = self.exit_code().unwrap_or(-1); + + let exec_output = ExecToolCallOutput { + exit_code, + stdout: StreamOutput::new(aggregated_text.clone()), + stderr: StreamOutput::new(String::new()), + aggregated_output: StreamOutput::new(aggregated_text.clone()), + duration: Duration::ZERO, + timed_out: false, + }; + + if is_likely_sandbox_denied(self.sandbox_type(), &exec_output) { + let (snippet, _) = truncate_middle(&aggregated_text, UNIFIED_EXEC_OUTPUT_MAX_BYTES); + let message = if snippet.is_empty() { + format!("exit code {exit_code}") + } else { + snippet + }; + return Err(UnifiedExecError::sandbox_denied(message)); + } + + Ok(()) + } + + async fn from_spawned( + spawned: SpawnedPty, + sandbox_type: SandboxType, + ) -> Result { + let SpawnedPty { + session, + output_rx, + mut exit_rx, + } = spawned; + let managed = Self::new(session, output_rx, sandbox_type); + + let exit_ready = match exit_rx.try_recv() { + Ok(_) | Err(TryRecvError::Closed) => true, + Err(TryRecvError::Empty) => false, + }; + + if exit_ready { + managed.check_for_sandbox_denial().await?; + } + + Ok(managed) + } } -impl Drop for ManagedUnifiedExecSession { +impl Drop for UnifiedExecSession { fn drop(&mut self) { self.output_task.abort(); } } impl UnifiedExecSessionManager { + async fn open_session_with_sandbox( + &self, + command: Vec, + context: &UnifiedExecContext<'_>, + ) -> Result { + let executor = &context.session.services.executor; + let otel_event_manager = context.turn.client.get_otel_event_manager(); + let approval_command = command.clone(); + let exec_context = ExecCommandContext { + sub_id: context.sub_id.to_string(), + call_id: context.call_id.to_string(), + command_for_display: approval_command.clone(), + cwd: context.turn.cwd.clone(), + apply_patch: None, + tool_name: context.tool_name.to_string(), + otel_event_manager, + }; + + let execution_request = ExecutionRequest { + params: ExecParams { + command, + cwd: context.turn.cwd.clone(), + timeout_ms: None, + env: HashMap::new(), + with_escalated_permissions: None, + justification: None, + }, + approval_command, + mode: ExecutionMode::InteractiveShell, + stdout_stream: None, + use_shell_profile: false, + }; + + // Ensure the executor's environment reflects this turn before planning + executor.update_environment( + context.turn.sandbox_policy.clone(), + context.turn.cwd.clone(), + ); + + let plan: ExecutionPlan = executor + .prepare_execution_plan( + execution_request, + context.session, + context.turn.approval_policy, + &exec_context, + ) + .await + .map_err(|err| UnifiedExecError::create_session(err.to_string()))?; + + plan.attempt_with_retry_if( + context.session, + |launch| async move { + let sandbox_type = launch.sandbox_type; + let spawned = + crate::pty::spawn_pty_process(&launch.program, &launch.args, &launch.env) + .await + .map_err(|err| UnifiedExecError::create_session(err.to_string()))?; + UnifiedExecSession::from_spawned(spawned, sandbox_type).await + }, + |err: &UnifiedExecError| matches!(err, UnifiedExecError::SandboxDenied { .. }), + ) + .await + } + pub async fn handle_request( &self, request: UnifiedExecRequest<'_>, + context: UnifiedExecContext<'_>, ) -> Result { let (timeout_ms, timeout_warning) = match request.timeout_ms { Some(requested) if requested > MAX_TIMEOUT_MS => ( @@ -175,13 +369,13 @@ impl UnifiedExecSessionManager { None => (DEFAULT_TIMEOUT_MS, None), }; - let mut new_session: Option = None; + let mut new_session: Option = None; let session_id; let writer_tx; let output_buffer; let output_notify; - if let Some(existing_id) = request.session_id { + if let Some(existing_id) = context.session_id { let mut sessions = self.sessions.lock().await; match sessions.get(&existing_id) { Some(session) => { @@ -207,8 +401,7 @@ impl UnifiedExecSessionManager { } else { let command = request.input_chunks.to_vec(); let new_id = self.next_session_id.fetch_add(1, Ordering::SeqCst); - let (session, initial_output_rx) = create_unified_exec_session(&command).await?; - let managed_session = ManagedUnifiedExecSession::new(session, initial_output_rx); + let managed_session = self.open_session_with_sandbox(command, &context).await?; let (buffer, notify) = managed_session.output_handles(); writer_tx = managed_session.writer_sender(); output_buffer = buffer; @@ -217,11 +410,35 @@ impl UnifiedExecSessionManager { new_session = Some(managed_session); }; - if request.session_id.is_some() { - let joined_input = request.input_chunks.join(" "); - if !joined_input.is_empty() && writer_tx.send(joined_input.into_bytes()).await.is_err() - { - return Err(UnifiedExecError::WriteToStdin); + if context.session_id.is_some() { + let mut trailing_whitespace = true; + for chunk in request.input_chunks { + if chunk.is_empty() { + continue; + } + + let leading_whitespace = chunk + .chars() + .next() + .map(char::is_whitespace) + .unwrap_or(true); + + if !trailing_whitespace + && !leading_whitespace + && writer_tx.send(vec![b' ']).await.is_err() + { + return Err(UnifiedExecError::WriteToStdin); + } + + if writer_tx.send(chunk.as_bytes().to_vec()).await.is_err() { + return Err(UnifiedExecError::WriteToStdin); + } + + trailing_whitespace = chunk + .chars() + .next_back() + .map(char::is_whitespace) + .unwrap_or(trailing_whitespace); } } @@ -276,7 +493,7 @@ impl UnifiedExecSessionManager { let should_store_session = if let Some(session) = new_session.as_ref() { !session.has_exited() - } else if request.session_id.is_some() { + } else if context.session_id.is_some() { let mut sessions = self.sessions.lock().await; if let Some(existing) = sessions.get(&session_id) { if existing.has_exited() { @@ -309,114 +526,55 @@ impl UnifiedExecSessionManager { } } -async fn create_unified_exec_session( - command: &[String], -) -> Result< - ( - ExecCommandSession, - tokio::sync::broadcast::Receiver>, - ), - UnifiedExecError, -> { - if command.is_empty() { - return Err(UnifiedExecError::MissingCommandLine); - } - - let pty_system = native_pty_system(); - - let pair = pty_system - .openpty(PtySize { - rows: 24, - cols: 80, - pixel_width: 0, - pixel_height: 0, - }) - .map_err(UnifiedExecError::create_session)?; - - // Safe thanks to the check at the top of the function. - let mut command_builder = CommandBuilder::new(command[0].clone()); - for arg in &command[1..] { - command_builder.arg(arg); - } - - let mut child = pair - .slave - .spawn_command(command_builder) - .map_err(UnifiedExecError::create_session)?; - let killer = child.clone_killer(); - - let (writer_tx, mut writer_rx) = mpsc::channel::>(128); - let (output_tx, _) = tokio::sync::broadcast::channel::>(256); - - let mut reader = pair - .master - .try_clone_reader() - .map_err(UnifiedExecError::create_session)?; - let output_tx_clone = output_tx.clone(); - let reader_handle = tokio::task::spawn_blocking(move || { - let mut buf = [0u8; 8192]; - loop { - match reader.read(&mut buf) { - Ok(0) => break, - Ok(n) => { - let _ = output_tx_clone.send(buf[..n].to_vec()); - } - Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, - Err(ref e) if e.kind() == ErrorKind::WouldBlock => { - std::thread::sleep(Duration::from_millis(5)); - continue; - } - Err(_) => break, - } - } - }); - - let writer = pair - .master - .take_writer() - .map_err(UnifiedExecError::create_session)?; - let writer = Arc::new(StdMutex::new(writer)); - let writer_handle = tokio::spawn({ - let writer = writer.clone(); - async move { - while let Some(bytes) = writer_rx.recv().await { - let writer = writer.clone(); - let _ = tokio::task::spawn_blocking(move || { - if let Ok(mut guard) = writer.lock() { - use std::io::Write; - let _ = guard.write_all(&bytes); - let _ = guard.flush(); - } - }) - .await; - } - } - }); - - let exit_status = Arc::new(AtomicBool::new(false)); - let wait_exit_status = Arc::clone(&exit_status); - let wait_handle = tokio::task::spawn_blocking(move || { - let _ = child.wait(); - wait_exit_status.store(true, Ordering::SeqCst); - }); - - let (session, initial_output_rx) = ExecCommandSession::new( - writer_tx, - output_tx, - killer, - reader_handle, - writer_handle, - wait_handle, - exit_status, - ); - Ok((session, initial_output_rx)) -} - #[cfg(test)] +#[cfg(unix)] mod tests { use super::*; - #[cfg(unix)] + + use crate::codex::Session; + use crate::codex::TurnContext; + use crate::codex::make_session_and_context; + use crate::protocol::AskForApproval; + use crate::protocol::SandboxPolicy; use core_test_support::skip_if_sandbox; + use std::sync::Arc; + + fn test_session_and_turn() -> (Arc, Arc) { + let (session, mut turn) = make_session_and_context(); + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + (Arc::new(session), Arc::new(turn)) + } + + async fn run_unified_exec_request( + session: &Arc, + turn: &Arc, + session_id: Option, + input: Vec, + timeout_ms: Option, + ) -> Result { + let request_input = input; + let request = UnifiedExecRequest { + input_chunks: &request_input, + timeout_ms, + }; + + session + .services + .unified_exec_manager + .handle_request( + request, + UnifiedExecContext { + session, + turn: turn.as_ref(), + sub_id: "sub", + call_id: "call", + tool_name: "unified_exec", + session_id, + }, + ) + .await + } #[test] fn push_chunk_trims_only_excess_bytes() { @@ -435,158 +593,160 @@ mod tests { assert_eq!(buffer.chunks.pop_back().unwrap(), vec![b'b']); } - #[cfg(unix)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn unified_exec_persists_across_requests_jif() -> Result<(), UnifiedExecError> { skip_if_sandbox!(Ok(())); - let manager = UnifiedExecSessionManager::default(); + let (session, turn) = test_session_and_turn(); - let open_shell = manager - .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &["bash".to_string(), "-i".to_string()], - timeout_ms: Some(2_500), - }) - .await?; + let open_shell = run_unified_exec_request( + &session, + &turn, + None, + vec!["bash".to_string(), "-i".to_string()], + Some(2_500), + ) + .await?; let session_id = open_shell.session_id.expect("expected session_id"); - manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &[ - "export".to_string(), - "CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(), - ], - timeout_ms: Some(2_500), - }) - .await?; - - let out_2 = manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()], - timeout_ms: Some(2_500), - }) - .await?; + run_unified_exec_request( + &session, + &turn, + Some(session_id), + vec![ + "export".to_string(), + "CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(), + ], + Some(2_500), + ) + .await?; + + let out_2 = run_unified_exec_request( + &session, + &turn, + Some(session_id), + vec!["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()], + Some(2_500), + ) + .await?; assert!(out_2.output.contains("codex")); Ok(()) } - #[cfg(unix)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn multi_unified_exec_sessions() -> Result<(), UnifiedExecError> { skip_if_sandbox!(Ok(())); - let manager = UnifiedExecSessionManager::default(); + let (session, turn) = test_session_and_turn(); - let shell_a = manager - .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &["/bin/bash".to_string(), "-i".to_string()], - timeout_ms: Some(2_500), - }) - .await?; + let shell_a = run_unified_exec_request( + &session, + &turn, + None, + vec!["/bin/bash".to_string(), "-i".to_string()], + Some(2_500), + ) + .await?; let session_a = shell_a.session_id.expect("expected session id"); - manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_a), - input_chunks: &["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()], - timeout_ms: Some(2_500), - }) - .await?; - - let out_2 = manager - .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &[ - "echo".to_string(), - "$CODEX_INTERACTIVE_SHELL_VAR\n".to_string(), - ], - timeout_ms: Some(2_500), - }) - .await?; + run_unified_exec_request( + &session, + &turn, + Some(session_a), + vec!["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()], + Some(2_500), + ) + .await?; + + let out_2 = run_unified_exec_request( + &session, + &turn, + None, + vec![ + "echo".to_string(), + "$CODEX_INTERACTIVE_SHELL_VAR\n".to_string(), + ], + Some(2_500), + ) + .await?; assert!(!out_2.output.contains("codex")); - let out_3 = manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_a), - input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()], - timeout_ms: Some(2_500), - }) - .await?; + let out_3 = run_unified_exec_request( + &session, + &turn, + Some(session_a), + vec!["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()], + Some(2_500), + ) + .await?; assert!(out_3.output.contains("codex")); Ok(()) } - #[cfg(unix)] #[tokio::test] async fn unified_exec_timeouts() -> Result<(), UnifiedExecError> { skip_if_sandbox!(Ok(())); - let manager = UnifiedExecSessionManager::default(); + let (session, turn) = test_session_and_turn(); - let open_shell = manager - .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &["bash".to_string(), "-i".to_string()], - timeout_ms: Some(2_500), - }) - .await?; + let open_shell = run_unified_exec_request( + &session, + &turn, + None, + vec!["bash".to_string(), "-i".to_string()], + Some(2_500), + ) + .await?; let session_id = open_shell.session_id.expect("expected session id"); - manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &[ - "export".to_string(), - "CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(), - ], - timeout_ms: Some(2_500), - }) - .await?; - - let out_2 = manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &["sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()], - timeout_ms: Some(10), - }) - .await?; + run_unified_exec_request( + &session, + &turn, + Some(session_id), + vec![ + "export".to_string(), + "CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(), + ], + Some(2_500), + ) + .await?; + + let out_2 = run_unified_exec_request( + &session, + &turn, + Some(session_id), + vec!["sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()], + Some(10), + ) + .await?; assert!(!out_2.output.contains("codex")); tokio::time::sleep(Duration::from_secs(7)).await; - let empty = Vec::new(); - let out_3 = manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &empty, - timeout_ms: Some(100), - }) - .await?; + let out_3 = + run_unified_exec_request(&session, &turn, Some(session_id), Vec::new(), Some(100)) + .await?; assert!(out_3.output.contains("codex")); Ok(()) } - #[cfg(unix)] #[tokio::test] #[ignore] // Ignored while we have a better way to test this. async fn requests_with_large_timeout_are_capped() -> Result<(), UnifiedExecError> { - let manager = UnifiedExecSessionManager::default(); - - let result = manager - .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &["echo".to_string(), "codex".to_string()], - timeout_ms: Some(120_000), - }) - .await?; + let (session, turn) = test_session_and_turn(); + + let result = run_unified_exec_request( + &session, + &turn, + None, + vec!["echo".to_string(), "codex".to_string()], + Some(120_000), + ) + .await?; assert!(result.output.starts_with( "Warning: requested timeout 120000ms exceeds maximum of 60000ms; clamping to 60000ms.\n" @@ -596,61 +756,66 @@ mod tests { Ok(()) } - #[cfg(unix)] #[tokio::test] #[ignore] // Ignored while we have a better way to test this. async fn completed_commands_do_not_persist_sessions() -> Result<(), UnifiedExecError> { - let manager = UnifiedExecSessionManager::default(); - let result = manager - .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &["/bin/echo".to_string(), "codex".to_string()], - timeout_ms: Some(2_500), - }) - .await?; + let (session, turn) = test_session_and_turn(); + let result = run_unified_exec_request( + &session, + &turn, + None, + vec!["/bin/echo".to_string(), "codex".to_string()], + Some(2_500), + ) + .await?; assert!(result.session_id.is_none()); assert!(result.output.contains("codex")); - assert!(manager.sessions.lock().await.is_empty()); + assert!( + session + .services + .unified_exec_manager + .sessions + .lock() + .await + .is_empty() + ); Ok(()) } - #[cfg(unix)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn reusing_completed_session_returns_unknown_session() -> Result<(), UnifiedExecError> { skip_if_sandbox!(Ok(())); - let manager = UnifiedExecSessionManager::default(); + let (session, turn) = test_session_and_turn(); - let open_shell = manager - .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &["/bin/bash".to_string(), "-i".to_string()], - timeout_ms: Some(2_500), - }) - .await?; + let open_shell = run_unified_exec_request( + &session, + &turn, + None, + vec!["/bin/bash".to_string(), "-i".to_string()], + Some(2_500), + ) + .await?; let session_id = open_shell.session_id.expect("expected session id"); - manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &["exit\n".to_string()], - timeout_ms: Some(2_500), - }) - .await?; + run_unified_exec_request( + &session, + &turn, + Some(session_id), + vec!["exit\n".to_string()], + Some(2_500), + ) + .await?; tokio::time::sleep(Duration::from_millis(200)).await; - let err = manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &[], - timeout_ms: Some(100), - }) - .await - .expect_err("expected unknown session error"); + let err = + run_unified_exec_request(&session, &turn, Some(session_id), Vec::new(), Some(100)) + .await + .expect_err("expected unknown session error"); match err { UnifiedExecError::UnknownSessionId { session_id: err_id } => { @@ -659,7 +824,15 @@ mod tests { other => panic!("expected UnknownSessionId, got {other:?}"), } - assert!(!manager.sessions.lock().await.contains_key(&session_id)); + assert!( + !session + .services + .unified_exec_manager + .sessions + .lock() + .await + .contains_key(&session_id) + ); Ok(()) }