diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs index 781e3fe372..0089f3f78b 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/core/src/rollout/list.rs @@ -9,6 +9,7 @@ use std::sync::atomic::AtomicBool; use time::OffsetDateTime; use time::PrimitiveDateTime; use time::format_description::FormatItem; +use time::format_description::well_known::Rfc3339; use time::macros::format_description; use uuid::Uuid; @@ -39,18 +40,15 @@ pub struct ConversationItem { pub path: PathBuf, /// First up to `HEAD_RECORD_LIMIT` JSONL records parsed as JSON (includes meta line). pub head: Vec, - /// Last up to `TAIL_RECORD_LIMIT` JSONL response records parsed as JSON. - pub tail: Vec, /// RFC3339 timestamp string for when the session was created, if available. pub created_at: Option, - /// RFC3339 timestamp string for the most recent response in the tail, if available. + /// RFC3339 timestamp string for the most recent update (from file mtime). pub updated_at: Option, } #[derive(Default)] struct HeadTailSummary { head: Vec, - tail: Vec, saw_session_meta: bool, saw_user_event: bool, source: Option, @@ -62,7 +60,6 @@ struct HeadTailSummary { /// Hard cap to bound worst‑case work per request. const MAX_SCAN_FILES: usize = 10000; const HEAD_RECORD_LIMIT: usize = 10; -const TAIL_RECORD_LIMIT: usize = 10; /// Pagination cursor identifying a file by timestamp and UUID. #[derive(Debug, Clone, PartialEq, Eq)] @@ -212,9 +209,8 @@ async fn traverse_directories_for_paths( more_matches_available = true; break 'outer; } - // Read head and simultaneously detect message events within the same - // first N JSONL records to avoid a second file read. - let summary = read_head_and_tail(&path, HEAD_RECORD_LIMIT, TAIL_RECORD_LIMIT) + // Read head and detect message events; stop once meta + user are found. + let summary = read_head_summary(&path, HEAD_RECORD_LIMIT) .await .unwrap_or_default(); if !allowed_sources.is_empty() @@ -233,16 +229,19 @@ async fn traverse_directories_for_paths( if summary.saw_session_meta && summary.saw_user_event { let HeadTailSummary { head, - tail, created_at, mut updated_at, .. } = summary; - updated_at = updated_at.or_else(|| created_at.clone()); + if updated_at.is_none() { + updated_at = file_modified_rfc3339(&path) + .await + .unwrap_or(None) + .or_else(|| created_at.clone()); + } items.push(ConversationItem { path, head, - tail, created_at, updated_at, }); @@ -384,11 +383,7 @@ impl<'a> ProviderMatcher<'a> { } } -async fn read_head_and_tail( - path: &Path, - head_limit: usize, - tail_limit: usize, -) -> io::Result { +async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result { use tokio::io::AsyncBufReadExt; let file = tokio::fs::File::open(path).await?; @@ -441,107 +436,30 @@ async fn read_head_and_tail( } } } - } - if tail_limit != 0 { - let (tail, updated_at) = read_tail_records(path, tail_limit).await?; - summary.tail = tail; - summary.updated_at = updated_at; + if summary.saw_session_meta && summary.saw_user_event { + break; + } } + Ok(summary) } /// Read up to `HEAD_RECORD_LIMIT` records from the start of the rollout file at `path`. /// This should be enough to produce a summary including the session meta line. pub async fn read_head_for_summary(path: &Path) -> io::Result> { - let summary = read_head_and_tail(path, HEAD_RECORD_LIMIT, 0).await?; + let summary = read_head_summary(path, HEAD_RECORD_LIMIT).await?; Ok(summary.head) } -async fn read_tail_records( - path: &Path, - max_records: usize, -) -> io::Result<(Vec, Option)> { - use std::io::SeekFrom; - use tokio::io::AsyncReadExt; - use tokio::io::AsyncSeekExt; - - if max_records == 0 { - return Ok((Vec::new(), None)); - } - - const CHUNK_SIZE: usize = 8192; - - let mut file = tokio::fs::File::open(path).await?; - let mut pos = file.seek(SeekFrom::End(0)).await?; - if pos == 0 { - return Ok((Vec::new(), None)); - } - - let mut buffer: Vec = Vec::new(); - let mut latest_timestamp: Option = None; - - loop { - let slice_start = match (pos > 0, buffer.iter().position(|&b| b == b'\n')) { - (true, Some(idx)) => idx + 1, - _ => 0, - }; - let (tail, newest_ts) = collect_last_response_values(&buffer[slice_start..], max_records); - if latest_timestamp.is_none() { - latest_timestamp = newest_ts.clone(); - } - if tail.len() >= max_records || pos == 0 { - return Ok((tail, latest_timestamp.or(newest_ts))); - } - - let read_size = CHUNK_SIZE.min(pos as usize); - if read_size == 0 { - return Ok((tail, latest_timestamp.or(newest_ts))); - } - pos -= read_size as u64; - file.seek(SeekFrom::Start(pos)).await?; - let mut chunk = vec![0; read_size]; - file.read_exact(&mut chunk).await?; - chunk.extend_from_slice(&buffer); - buffer = chunk; - } -} - -fn collect_last_response_values( - buffer: &[u8], - max_records: usize, -) -> (Vec, Option) { - use std::borrow::Cow; - - if buffer.is_empty() || max_records == 0 { - return (Vec::new(), None); - } - - let text: Cow<'_, str> = String::from_utf8_lossy(buffer); - let mut collected_rev: Vec = Vec::new(); - let mut latest_timestamp: Option = None; - for line in text.lines().rev() { - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - let parsed: serde_json::Result = serde_json::from_str(trimmed); - let Ok(rollout_line) = parsed else { continue }; - let RolloutLine { timestamp, item } = rollout_line; - if let RolloutItem::ResponseItem(item) = item - && let Ok(val) = serde_json::to_value(&item) - { - if latest_timestamp.is_none() { - latest_timestamp = Some(timestamp.clone()); - } - collected_rev.push(val); - if collected_rev.len() == max_records { - break; - } - } - } - collected_rev.reverse(); - (collected_rev, latest_timestamp) +async fn file_modified_rfc3339(path: &Path) -> io::Result> { + let meta = tokio::fs::metadata(path).await?; + let modified = meta.modified().ok(); + let Some(modified) = modified else { + return Ok(None); + }; + let dt = OffsetDateTime::from(modified); + Ok(dt.format(&Rfc3339).ok()) } /// Locate a recorded conversation rollout file by its UUID string using the existing diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/core/src/rollout/tests.rs index f367782b12..f52db79690 100644 --- a/codex-rs/core/src/rollout/tests.rs +++ b/codex-rs/core/src/rollout/tests.rs @@ -22,7 +22,6 @@ use anyhow::Result; use codex_protocol::ConversationId; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; -use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; @@ -226,28 +225,28 @@ async fn test_list_conversations_latest_first() { "model_provider": "test-provider", })]; + let updated_times: Vec> = + page.items.iter().map(|i| i.updated_at.clone()).collect(); + let expected = ConversationsPage { items: vec![ ConversationItem { path: p1, head: head_3, - tail: Vec::new(), created_at: Some("2025-01-03T12-00-00".into()), - updated_at: Some("2025-01-03T12-00-00".into()), + updated_at: updated_times.first().cloned().flatten(), }, ConversationItem { path: p2, head: head_2, - tail: Vec::new(), created_at: Some("2025-01-02T12-00-00".into()), - updated_at: Some("2025-01-02T12-00-00".into()), + updated_at: updated_times.get(1).cloned().flatten(), }, ConversationItem { path: p3, head: head_1, - tail: Vec::new(), created_at: Some("2025-01-01T12-00-00".into()), - updated_at: Some("2025-01-01T12-00-00".into()), + updated_at: updated_times.get(2).cloned().flatten(), }, ], next_cursor: None, @@ -355,6 +354,8 @@ async fn test_pagination_cursor() { "source": "vscode", "model_provider": "test-provider", })]; + let updated_page1: Vec> = + page1.items.iter().map(|i| i.updated_at.clone()).collect(); let expected_cursor1: Cursor = serde_json::from_str(&format!("\"2025-03-04T09-00-00|{u4}\"")).unwrap(); let expected_page1 = ConversationsPage { @@ -362,16 +363,14 @@ async fn test_pagination_cursor() { ConversationItem { path: p5, head: head_5, - tail: Vec::new(), created_at: Some("2025-03-05T09-00-00".into()), - updated_at: Some("2025-03-05T09-00-00".into()), + updated_at: updated_page1.first().cloned().flatten(), }, ConversationItem { path: p4, head: head_4, - tail: Vec::new(), created_at: Some("2025-03-04T09-00-00".into()), - updated_at: Some("2025-03-04T09-00-00".into()), + updated_at: updated_page1.get(1).cloned().flatten(), }, ], next_cursor: Some(expected_cursor1.clone()), @@ -422,6 +421,8 @@ async fn test_pagination_cursor() { "source": "vscode", "model_provider": "test-provider", })]; + let updated_page2: Vec> = + page2.items.iter().map(|i| i.updated_at.clone()).collect(); let expected_cursor2: Cursor = serde_json::from_str(&format!("\"2025-03-02T09-00-00|{u2}\"")).unwrap(); let expected_page2 = ConversationsPage { @@ -429,16 +430,14 @@ async fn test_pagination_cursor() { ConversationItem { path: p3, head: head_3, - tail: Vec::new(), created_at: Some("2025-03-03T09-00-00".into()), - updated_at: Some("2025-03-03T09-00-00".into()), + updated_at: updated_page2.first().cloned().flatten(), }, ConversationItem { path: p2, head: head_2, - tail: Vec::new(), created_at: Some("2025-03-02T09-00-00".into()), - updated_at: Some("2025-03-02T09-00-00".into()), + updated_at: updated_page2.get(1).cloned().flatten(), }, ], next_cursor: Some(expected_cursor2.clone()), @@ -473,13 +472,14 @@ async fn test_pagination_cursor() { "source": "vscode", "model_provider": "test-provider", })]; + let updated_page3: Vec> = + page3.items.iter().map(|i| i.updated_at.clone()).collect(); let expected_page3 = ConversationsPage { items: vec![ConversationItem { path: p1, head: head_1, - tail: Vec::new(), created_at: Some("2025-03-01T09-00-00".into()), - updated_at: Some("2025-03-01T09-00-00".into()), + updated_at: updated_page3.first().cloned().flatten(), }], next_cursor: None, num_scanned_files: 5, // scanned 05, 04 (anchor), 03, 02 (anchor), 01 @@ -533,9 +533,8 @@ async fn test_get_conversation_contents() { items: vec![ConversationItem { path: expected_path, head: expected_head, - tail: Vec::new(), created_at: Some(ts.into()), - updated_at: Some(ts.into()), + updated_at: page.items[0].updated_at.clone(), }], next_cursor: None, num_scanned_files: 1, @@ -570,7 +569,7 @@ async fn test_get_conversation_contents() { } #[tokio::test] -async fn test_tail_includes_last_response_items() -> Result<()> { +async fn test_updated_at_uses_file_mtime() -> Result<()> { let temp = TempDir::new().unwrap(); let home = temp.path(); @@ -636,229 +635,16 @@ async fn test_tail_includes_last_response_items() -> Result<()> { ) .await?; let item = page.items.first().expect("conversation item"); - let tail_len = item.tail.len(); - assert_eq!(tail_len, 10usize.min(total_messages)); - - let expected: Vec = (total_messages - tail_len..total_messages) - .map(|idx| { - serde_json::json!({ - "type": "message", - "role": "assistant", - "content": [ - { - "type": "output_text", - "text": format!("reply-{idx}"), - } - ], - }) - }) - .collect(); - - assert_eq!(item.tail, expected); assert_eq!(item.created_at.as_deref(), Some(ts)); - let expected_updated = format!("{ts}-{last:02}", last = total_messages - 1); - assert_eq!(item.updated_at.as_deref(), Some(expected_updated.as_str())); - - Ok(()) -} - -#[tokio::test] -async fn test_tail_handles_short_sessions() -> Result<()> { - let temp = TempDir::new().unwrap(); - let home = temp.path(); - - let ts = "2025-06-02T08-30-00"; - let uuid = Uuid::from_u128(7); - let day_dir = home.join("sessions").join("2025").join("06").join("02"); - fs::create_dir_all(&day_dir)?; - let file_path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl")); - let mut file = File::create(&file_path)?; - - let conversation_id = ConversationId::from_string(&uuid.to_string())?; - let meta_line = RolloutLine { - timestamp: ts.to_string(), - item: RolloutItem::SessionMeta(SessionMetaLine { - meta: SessionMeta { - id: conversation_id, - timestamp: ts.to_string(), - instructions: None, - cwd: ".".into(), - originator: "test_originator".into(), - cli_version: "test_version".into(), - source: SessionSource::VSCode, - model_provider: Some("test-provider".into()), - }, - git: None, - }), - }; - writeln!(file, "{}", serde_json::to_string(&meta_line)?)?; - - let user_event_line = RolloutLine { - timestamp: ts.to_string(), - item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { - message: "hi".into(), - images: None, - })), - }; - writeln!(file, "{}", serde_json::to_string(&user_event_line)?)?; - - for idx in 0..3 { - let response_line = RolloutLine { - timestamp: format!("{ts}-{idx:02}"), - item: RolloutItem::ResponseItem(ResponseItem::Message { - id: None, - role: "assistant".into(), - content: vec![ContentItem::OutputText { - text: format!("short-{idx}"), - }], - }), - }; - writeln!(file, "{}", serde_json::to_string(&response_line)?)?; - } - drop(file); - - let provider_filter = provider_vec(&[TEST_PROVIDER]); - let page = get_conversations( - home, - 1, - None, - INTERACTIVE_SESSION_SOURCES, - Some(provider_filter.as_slice()), - TEST_PROVIDER, - ) - .await?; - let tail = &page.items.first().expect("conversation item").tail; - - assert_eq!(tail.len(), 3); - - let expected: Vec = (0..3) - .map(|idx| { - serde_json::json!({ - "type": "message", - "role": "assistant", - "content": [ - { - "type": "output_text", - "text": format!("short-{idx}"), - } - ], - }) - }) - .collect(); - - assert_eq!(tail, &expected); - let expected_updated = format!("{ts}-{last:02}", last = 2); - assert_eq!( - page.items[0].updated_at.as_deref(), - Some(expected_updated.as_str()) - ); - - Ok(()) -} - -#[tokio::test] -async fn test_tail_skips_trailing_non_responses() -> Result<()> { - let temp = TempDir::new().unwrap(); - let home = temp.path(); - - let ts = "2025-06-03T10-00-00"; - let uuid = Uuid::from_u128(11); - let day_dir = home.join("sessions").join("2025").join("06").join("03"); - fs::create_dir_all(&day_dir)?; - let file_path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl")); - let mut file = File::create(&file_path)?; - - let conversation_id = ConversationId::from_string(&uuid.to_string())?; - let meta_line = RolloutLine { - timestamp: ts.to_string(), - item: RolloutItem::SessionMeta(SessionMetaLine { - meta: SessionMeta { - id: conversation_id, - timestamp: ts.to_string(), - instructions: None, - cwd: ".".into(), - originator: "test_originator".into(), - cli_version: "test_version".into(), - source: SessionSource::VSCode, - model_provider: Some("test-provider".into()), - }, - git: None, - }), - }; - writeln!(file, "{}", serde_json::to_string(&meta_line)?)?; - - let user_event_line = RolloutLine { - timestamp: ts.to_string(), - item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { - message: "hello".into(), - images: None, - })), - }; - writeln!(file, "{}", serde_json::to_string(&user_event_line)?)?; - - for idx in 0..4 { - let response_line = RolloutLine { - timestamp: format!("{ts}-{idx:02}"), - item: RolloutItem::ResponseItem(ResponseItem::Message { - id: None, - role: "assistant".into(), - content: vec![ContentItem::OutputText { - text: format!("response-{idx}"), - }], - }), - }; - writeln!(file, "{}", serde_json::to_string(&response_line)?)?; - } - - let compacted_line = RolloutLine { - timestamp: format!("{ts}-compacted"), - item: RolloutItem::Compacted(CompactedItem { - message: "compacted".into(), - replacement_history: None, - }), - }; - writeln!(file, "{}", serde_json::to_string(&compacted_line)?)?; - - let shutdown_event = RolloutLine { - timestamp: format!("{ts}-shutdown"), - item: RolloutItem::EventMsg(EventMsg::ShutdownComplete), - }; - writeln!(file, "{}", serde_json::to_string(&shutdown_event)?)?; - drop(file); - - let provider_filter = provider_vec(&[TEST_PROVIDER]); - let page = get_conversations( - home, - 1, - None, - INTERACTIVE_SESSION_SOURCES, - Some(provider_filter.as_slice()), - TEST_PROVIDER, - ) - .await?; - let tail = &page.items.first().expect("conversation item").tail; - - let expected: Vec = (0..4) - .map(|idx| { - serde_json::json!({ - "type": "message", - "role": "assistant", - "content": [ - { - "type": "output_text", - "text": format!("response-{idx}"), - } - ], - }) - }) - .collect(); - - assert_eq!(tail, &expected); - let expected_updated = format!("{ts}-{last:02}", last = 3); - assert_eq!( - page.items[0].updated_at.as_deref(), - Some(expected_updated.as_str()) - ); + let updated = item + .updated_at + .as_deref() + .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&chrono::Utc)) + .expect("updated_at set from file mtime"); + let now = chrono::Utc::now(); + let age = now - updated; + assert!(age.num_seconds().abs() < 30); Ok(()) } @@ -913,22 +699,22 @@ async fn test_stable_ordering_same_second_pagination() { "model_provider": "test-provider", })] }; + let updated_page1: Vec> = + page1.items.iter().map(|i| i.updated_at.clone()).collect(); let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap(); let expected_page1 = ConversationsPage { items: vec![ ConversationItem { path: p3, head: head(u3), - tail: Vec::new(), created_at: Some(ts.to_string()), - updated_at: Some(ts.to_string()), + updated_at: updated_page1.first().cloned().flatten(), }, ConversationItem { path: p2, head: head(u2), - tail: Vec::new(), created_at: Some(ts.to_string()), - updated_at: Some(ts.to_string()), + updated_at: updated_page1.get(1).cloned().flatten(), }, ], next_cursor: Some(expected_cursor1.clone()), @@ -953,13 +739,14 @@ async fn test_stable_ordering_same_second_pagination() { .join("07") .join("01") .join(format!("rollout-2025-07-01T00-00-00-{u1}.jsonl")); + let updated_page2: Vec> = + page2.items.iter().map(|i| i.updated_at.clone()).collect(); let expected_page2 = ConversationsPage { items: vec![ConversationItem { path: p1, head: head(u1), - tail: Vec::new(), created_at: Some(ts.to_string()), - updated_at: Some(ts.to_string()), + updated_at: updated_page2.first().cloned().flatten(), }], next_cursor: None, num_scanned_files: 3, // scanned u3, u2 (anchor), u1 diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index 6a21496d34..1cc9624ec3 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -1075,7 +1075,6 @@ mod tests { ConversationItem { path: PathBuf::from(path), head: head_with_ts_and_user_text(ts, &[preview]), - tail: Vec::new(), created_at: Some(ts.to_string()), updated_at: Some(ts.to_string()), } @@ -1150,14 +1149,12 @@ mod tests { let a = ConversationItem { path: PathBuf::from("/tmp/a.jsonl"), head: head_with_ts_and_user_text("2025-01-01T00:00:00Z", &["A"]), - tail: Vec::new(), created_at: Some("2025-01-01T00:00:00Z".into()), updated_at: Some("2025-01-01T00:00:00Z".into()), }; let b = ConversationItem { path: PathBuf::from("/tmp/b.jsonl"), head: head_with_ts_and_user_text("2025-01-02T00:00:00Z", &["B"]), - tail: Vec::new(), created_at: Some("2025-01-02T00:00:00Z".into()), updated_at: Some("2025-01-02T00:00:00Z".into()), }; @@ -1171,21 +1168,9 @@ mod tests { #[test] fn row_uses_tail_timestamp_for_updated_at() { let head = head_with_ts_and_user_text("2025-01-01T00:00:00Z", &["Hello"]); - let tail = vec![json!({ - "timestamp": "2025-01-01T01:00:00Z", - "type": "message", - "role": "assistant", - "content": [ - { - "type": "output_text", - "text": "hi", - } - ], - })]; let item = ConversationItem { path: PathBuf::from("/tmp/a.jsonl"), head, - tail, created_at: Some("2025-01-01T00:00:00Z".into()), updated_at: Some("2025-01-01T01:00:00Z".into()), };