Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codex-rs/core/src/unified_exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub(crate) const DEFAULT_MAX_OUTPUT_TOKENS: usize = 10_000;
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_TOKENS: usize = UNIFIED_EXEC_OUTPUT_MAX_BYTES / 4;
pub(crate) const MAX_UNIFIED_EXEC_SESSIONS: usize = 64;
pub(crate) const STALE_SESSION_AGE: Duration = Duration::from_secs(10 * 60);

pub(crate) struct UnifiedExecContext {
pub session: Arc<Session>,
Expand Down
146 changes: 125 additions & 21 deletions codex-rs/core/src/unified_exec/session_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::truncate::approx_token_count;
use crate::truncate::formatted_truncate_text;

use super::ExecCommandRequest;
use super::STALE_SESSION_AGE;
use super::MAX_UNIFIED_EXEC_SESSIONS;
use super::SessionEntry;
use super::UnifiedExecContext;
Expand Down Expand Up @@ -575,22 +576,33 @@ impl UnifiedExecSessionManager {
}

fn prune_sessions_if_needed(sessions: &mut HashMap<i32, SessionEntry>) {
if sessions.len() < MAX_UNIFIED_EXEC_SESSIONS {
return;
}

let meta: Vec<(i32, Instant, bool)> = sessions
.iter()
.map(|(id, entry)| (*id, entry.last_used, entry.session.has_exited()))
.collect();
let now = Instant::now();

if let Some(session_id) = Self::session_id_to_prune_from_meta(&meta) {
sessions.remove(&session_id);
// Drop exited or stale sessions up front.
sessions.retain(|_, entry| {
if entry.session.has_exited() {
return false;
}
now.duration_since(entry.last_used) < STALE_SESSION_AGE
});

// Enforce cap by pruning until we are at or under the limit.
while sessions.len() >= MAX_UNIFIED_EXEC_SESSIONS {
let meta: Vec<(i32, Instant, bool)> = sessions
.iter()
.map(|(id, entry)| (*id, entry.last_used, entry.session.has_exited()))
.collect();

if let Some(session_id) = Self::session_id_to_prune_from_meta(&meta, now) {
sessions.remove(&session_id);
} else {
break;
}
}
}

// Centralized pruning policy so we can easily swap strategies later.
fn session_id_to_prune_from_meta(meta: &[(i32, Instant, bool)]) -> Option<i32> {
fn session_id_to_prune_from_meta(meta: &[(i32, Instant, bool)], now: Instant) -> Option<i32> {
if meta.is_empty() {
return None;
}
Expand All @@ -603,16 +615,17 @@ impl UnifiedExecSessionManager {
.map(|(session_id, _, _)| *session_id)
.collect();

let mut lru = meta.to_vec();
lru.sort_by_key(|(_, last_used, _)| *last_used);

if let Some((session_id, _, _)) = lru
if let Some((session_id, _, _)) = meta
.iter()
.find(|(session_id, _, exited)| !protected.contains(session_id) && *exited)
.filter(|(_, last_used, exited)| *exited || now.duration_since(*last_used) >= STALE_SESSION_AGE)
.min_by_key(|(_, last_used, _)| *last_used)
{
return Some(*session_id);
}

let mut lru = meta.to_vec();
lru.sort_by_key(|(_, last_used, _)| *last_used);

lru.into_iter()
.find(|(session_id, _, _)| !protected.contains(session_id))
.map(|(session_id, _, _)| session_id)
Expand Down Expand Up @@ -643,6 +656,27 @@ mod tests {
use tokio::time::Duration;
use tokio::time::Instant;

fn retain_active(meta: &[(i32, Instant, bool)], now: Instant) -> Vec<(i32, Instant, bool)> {
meta.iter()
.filter(|(_, last_used, exited)| !*exited && now.duration_since(*last_used) < STALE_SESSION_AGE)
.cloned()
.collect()
}

fn prune_until_cap(meta: &[(i32, Instant, bool)], now: Instant) -> Vec<i32> {
let mut retained = meta.to_vec();
let mut removed = Vec::new();
while retained.len() >= MAX_UNIFIED_EXEC_SESSIONS {
if let Some(id) = UnifiedExecSessionManager::session_id_to_prune_from_meta(&retained, now) {
removed.push(id);
retained.retain(|(session_id, _, _)| *session_id != id);
} else {
break;
}
}
removed
}

#[test]
fn unified_exec_env_injects_defaults() {
let env = apply_unified_exec_env(HashMap::new());
Expand Down Expand Up @@ -688,7 +722,7 @@ mod tests {
(10, now - Duration::from_secs(13), false),
];

let candidate = UnifiedExecSessionManager::session_id_to_prune_from_meta(&meta);
let candidate = UnifiedExecSessionManager::session_id_to_prune_from_meta(&meta, now);

assert_eq!(candidate, Some(2));
}
Expand All @@ -709,13 +743,13 @@ mod tests {
(10, now - Duration::from_secs(13), false),
];

let candidate = UnifiedExecSessionManager::session_id_to_prune_from_meta(&meta);
let candidate = UnifiedExecSessionManager::session_id_to_prune_from_meta(&meta, now);

assert_eq!(candidate, Some(1));
}

#[test]
fn pruning_protects_recent_sessions_even_if_exited() {
fn pruning_does_not_protect_recent_exited_sessions() {
let now = Instant::now();
let meta = vec![
(1, now - Duration::from_secs(40), false),
Expand All @@ -730,9 +764,79 @@ mod tests {
(10, now - Duration::from_secs(13), true),
];

let candidate = UnifiedExecSessionManager::session_id_to_prune_from_meta(&meta);
let candidate = UnifiedExecSessionManager::session_id_to_prune_from_meta(&meta, now);

// Exited sessions are always eligible, even if they were recently used.
assert_eq!(candidate, Some(3));
}

#[test]
fn pruning_drops_stale_sessions_even_if_not_exited() {
let now = Instant::now();
let stale = now - STALE_SESSION_AGE - Duration::from_secs(1);
let fresh = now - Duration::from_secs(30);
let meta = vec![(1, stale, false), (2, fresh, false)];

let candidate = UnifiedExecSessionManager::session_id_to_prune_from_meta(&meta, now);

// (10) is exited but among the last 8; we should drop the LRU outside that set.
assert_eq!(candidate, Some(1));
}

#[test]
fn retain_filters_exited_and_stale_before_cap() {
let now = Instant::now();
let meta = vec![
(1, now - Duration::from_secs(5), false),
(2, now - STALE_SESSION_AGE - Duration::from_secs(1), false),
(3, now - Duration::from_secs(1), true),
(4, now - Duration::from_secs(2), false),
];

let retained = retain_active(&meta, now);
let retained_ids: Vec<i32> = retained.into_iter().map(|(id, _, _)| id).collect();

assert_eq!(retained_ids, vec![1, 4]);
}

#[test]
fn prune_until_cap_removes_oldest_non_protected() {
let now = Instant::now();
// Build 70 sessions with strictly increasing recency (id == recency rank).
let mut meta = Vec::new();
for id in 0..70 {
let age = Duration::from_secs((70 - id) as u64);
meta.push((id, now - age, false));
}

let mut removed = prune_until_cap(&meta, now);
removed.sort();

// We expect to evict the 7 oldest (ids 0-6) to bring 70 -> 63 sessions (< cap).
assert_eq!(removed, vec![0, 1, 2, 3, 4, 5, 6]);
}

#[test]
fn prune_prefers_stale_over_active_when_over_cap() {
let now = Instant::now();
let mut meta = Vec::new();
// Fresh sessions.
for id in 0..65 {
meta.push((id, now - Duration::from_secs(1), false));
}
// One stale session newer than many but over the age threshold.
meta.push((999, now - STALE_SESSION_AGE - Duration::from_secs(1), false));

let mut removed = prune_until_cap(&meta, now);
removed.sort();

assert!(
removed.contains(&999),
"stale session should be pruned before active sessions when over cap"
);
assert_eq!(
removed.len(),
3,
"should prune exactly three to reach < cap after >= check"
);
}
}
Loading