Skip to content

Commit 2039dfe

Browse files
test(repo): fix flaky concurrent runtime test with multi-thread and timing adjustments (#3053)
1 parent 67d9f33 commit 2039dfe

File tree

1 file changed

+35
-11
lines changed

1 file changed

+35
-11
lines changed

crates/forge_repo/src/conversation/conversation_repo.rs

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,24 +1012,37 @@ mod tests {
10121012
);
10131013
}
10141014

1015-
#[tokio::test]
1015+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
10161016
async fn test_concurrent_operations_dont_block_runtime() -> anyhow::Result<()> {
10171017
use std::sync::atomic::{AtomicUsize, Ordering};
10181018
use std::time::{Duration, Instant};
10191019

1020+
// Heartbeat fires every `TICK`; we require a measurement window of at
1021+
// least `MIN_WINDOW` so the assertion is meaningful even when the DB
1022+
// workload finishes very quickly (e.g. on fast machines with the
1023+
// in-memory SQLite pool).
1024+
const TICK: Duration = Duration::from_millis(10);
1025+
const MIN_WINDOW: Duration = Duration::from_millis(200);
1026+
10201027
let repo = Arc::new(repository()?);
10211028
let heartbeat = Arc::new(AtomicUsize::new(0));
10221029

1023-
// Heartbeat task - if runtime is blocked, this won't increment
1030+
// Heartbeat task - if runtime is blocked, this won't increment.
10241031
let heartbeat_clone = heartbeat.clone();
10251032
let heartbeat_handle = tokio::spawn(async move {
10261033
loop {
1027-
tokio::time::sleep(Duration::from_millis(10)).await;
1034+
tokio::time::sleep(TICK).await;
10281035
heartbeat_clone.fetch_add(1, Ordering::Relaxed);
10291036
}
10301037
});
10311038

1032-
// Spawn many concurrent DB operations
1039+
// Warm up: let the heartbeat task get scheduled and complete its first
1040+
// tick before we start measuring, then reset the counter so timing
1041+
// begins from a clean state.
1042+
tokio::time::sleep(TICK * 3).await;
1043+
heartbeat.store(0, Ordering::Relaxed);
1044+
1045+
// Spawn many concurrent DB operations.
10331046
let mut handles = vec![];
10341047
let start = Instant::now();
10351048

@@ -1046,24 +1059,35 @@ mod tests {
10461059
handles.push(handle);
10471060
}
10481061

1049-
// Wait for all operations
1062+
// Wait for all operations.
10501063
for handle in handles {
10511064
handle.await??;
10521065
}
1066+
1067+
// Ensure the measurement window is long enough for heartbeat math to
1068+
// be meaningful regardless of how fast the DB workload completed.
1069+
let work_elapsed = start.elapsed();
1070+
if work_elapsed < MIN_WINDOW {
1071+
tokio::time::sleep(MIN_WINDOW - work_elapsed).await;
1072+
}
10531073
let elapsed = start.elapsed();
10541074

1055-
// Stop heartbeat
1075+
// Stop heartbeat.
10561076
heartbeat_handle.abort();
10571077

1058-
// Verify runtime wasn't blocked
1078+
// Verify runtime wasn't blocked: heartbeat should have fired at least
1079+
// 80% of the theoretical max for the elapsed window. The threshold is
1080+
// clamped to at least 1 to keep the assertion well-defined.
10591081
let heartbeat_count = heartbeat.load(Ordering::Relaxed);
1060-
let expected_heartbeats = elapsed.as_millis() as usize / 10;
1082+
let expected_heartbeats = (elapsed.as_millis() as usize) / (TICK.as_millis() as usize);
1083+
let threshold = (expected_heartbeats * 8 / 10).max(1);
10611084

1062-
// Heartbeat should have fired at least 80% of expected times
10631085
assert!(
1064-
heartbeat_count > expected_heartbeats * 8 / 10,
1065-
"Runtime was blocked! Expected ~{} heartbeats, got {}",
1086+
heartbeat_count >= threshold,
1087+
"Runtime was blocked! Expected at least {} heartbeats (~{} theoretical) in {:?}, got {}",
1088+
threshold,
10661089
expected_heartbeats,
1090+
elapsed,
10671091
heartbeat_count
10681092
);
10691093

0 commit comments

Comments
 (0)