From c673a8d800f3a8163586e8a6177ddafd01f2e665 Mon Sep 17 00:00:00 2001 From: Tobias Loose Date: Fri, 26 Sep 2025 12:04:47 +0200 Subject: [PATCH] Fix #254 The issue was that we did not apply the RMW Atomicity constraint to the store, so it squeezed in between the read and the write part of the RMW. Now, as soon as a RMW has happened (and observed the current value), all future stores must come after that in the modification order of that variable. --- src/rt/atomic.rs | 46 +++++++-------- tests/atomic.rs | 145 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 163 insertions(+), 28 deletions(-) diff --git a/src/rt/atomic.rs b/src/rt/atomic.rs index 7f634170..392f7072 100644 --- a/src/rt/atomic.rs +++ b/src/rt/atomic.rs @@ -118,6 +118,12 @@ pub(super) struct State { /// Last time the atomic was accessed for a store or rmw operation. last_non_load_access: Option, + /// Last time the atomic was accessed using an RMW access that did not fail. + /// + /// This is used for applying the RMW atomicity constraint to other stores + /// (so they don't squeeze in between the read and write part of the RMW). + last_successful_rmw: VersionVec, + /// Currently tracked stored values. This is the `MAX_ATOMIC_HISTORY` most /// recent stores to the atomic cell in loom execution order. stores: [Store; MAX_ATOMIC_HISTORY], @@ -413,6 +419,7 @@ impl State { is_mutating: false, last_access: None, last_non_load_access: None, + last_successful_rmw: VersionVec::new(), stores: Default::default(), cnt: 0, }; @@ -482,6 +489,9 @@ impl State { } } + // RMW Atomicity. + modification_order.join(&self.last_successful_rmw); + sync.sync_store(threads, ordering); let mut first_seen = FirstSeen::new(); @@ -535,6 +545,10 @@ impl State { let sync = self.stores[index].sync; self.store(threads, sync, next, success); + // RMW Atomicity. Future stores must happen after this, because we + // already observed a value. + self.last_successful_rmw.join(&threads.active().causality); + Ok(prev) } Err(e) => { @@ -546,7 +560,7 @@ impl State { fn apply_load_coherence(&mut self, threads: &mut thread::Set, index: usize) { for i in 0..self.stores.len() { - // Skip if the is current. + // Skip if it is the current load. if index == i { continue; } @@ -730,18 +744,9 @@ impl State { // // Add all stores **unless** a newer store has already been seen by the // current thread's causality. - 'outer: for i in 0..self.stores.len() { - let store_i = &self.stores[i]; - - if i >= cnt { - // Not a real store - continue; - } - - for j in 0..self.stores.len() { - let store_j = &self.stores[j]; - - if i == j || j >= cnt { + 'outer: for (i, store_i) in self.stores.iter().take(cnt).enumerate() { + for (j, store_j) in self.stores.iter().take(cnt).enumerate() { + if i == j { continue; } @@ -784,18 +789,9 @@ impl State { // Unlike `match_load_to_stores`, rmw operations only load "newest" // stores, in terms of modification order. - 'outer: for i in 0..self.stores.len() { - let store_i = &self.stores[i]; - - if i >= cnt { - // Not a real store - continue; - } - - for j in 0..self.stores.len() { - let store_j = &self.stores[j]; - - if i == j || j >= cnt { + 'outer: for (i, store_i) in self.stores.iter().take(cnt).enumerate() { + for (j, store_j) in self.stores.iter().take(cnt).enumerate() { + if i == j { continue; } diff --git a/tests/atomic.rs b/tests/atomic.rs index 73a8636a..3792f03f 100644 --- a/tests/atomic.rs +++ b/tests/atomic.rs @@ -1,8 +1,9 @@ #![deny(warnings, rust_2018_idioms)] -use loom::sync::atomic::AtomicUsize; +use loom::sync::atomic::{AtomicBool, AtomicUsize}; use loom::thread; +use std::collections::HashSet; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use std::sync::Arc; @@ -111,7 +112,10 @@ fn compare_and_swap_reads_old_values() { #[test] fn fetch_add_atomic() { - loom::model(|| { + let solutions = std::sync::Arc::new(std::sync::Mutex::new(HashSet::new())); + let solutions1 = std::sync::Arc::clone(&solutions); + + loom::model(move || { let a1 = Arc::new(AtomicUsize::new(0)); let a2 = a1.clone(); @@ -119,7 +123,142 @@ fn fetch_add_atomic() { let v1 = a1.fetch_add(1, Relaxed); let v2 = th.join().unwrap(); + let v3 = a1.load(Relaxed); + + solutions1.lock().unwrap().insert((v1, v2, v3)); + }); + + let solutions = solutions.lock().unwrap(); + + assert!(solutions.contains(&(0, 1, 2))); + assert!(solutions.contains(&(1, 0, 2))); + assert_eq!(solutions.len(), 2); +} + +#[test] +fn store_does_not_squeeze_in_rmw() { + let solutions = std::sync::Arc::new(std::sync::Mutex::new(HashSet::new())); + let solutions1 = std::sync::Arc::clone(&solutions); + + loom::model(move || { + let a1 = Arc::new(AtomicUsize::new(0)); + let a2 = a1.clone(); + + let th = thread::spawn(move || { + a1.store(1, Relaxed); + }); + + let b1 = a2.swap(2, Relaxed); + + a2.store(3, Relaxed); + + let b2 = a2.swap(4, Relaxed); + + th.join().unwrap(); + + let b3 = a2.load(Relaxed); + + solutions1.lock().unwrap().insert((b1, b2, b3)); + }); + + let solutions = solutions.lock().unwrap(); + + // store(1) before swap(2). + assert!(solutions.contains(&(1, 3, 4))); + + // store(1) after swap(2), before store(3). + assert!(solutions.contains(&(0, 3, 4))); + + // store(1) after store(3), before swap(4). + assert!(solutions.contains(&(0, 1, 4))); + + // store(1) after swap(4) (but before join). + assert!(solutions.contains(&(0, 3, 1))); + + assert_eq!(solutions.len(), 4); +} + +#[test] +fn store_oncurrent_failed_rmw() { + let solutions = std::sync::Arc::new(std::sync::Mutex::new(HashSet::new())); + let solutions1 = std::sync::Arc::clone(&solutions); + + loom::model(move || { + let a1 = Arc::new(AtomicUsize::new(0)); + let a2 = a1.clone(); + + let th = thread::spawn(move || { + a1.store(1, Relaxed); + }); + + let b1 = a2.compare_exchange(0, 2, Relaxed, Relaxed); + + th.join().unwrap(); + + let b2 = a2.load(Relaxed); + + solutions1.lock().unwrap().insert((b1, b2)); + }); + + let solutions = solutions.lock().unwrap(); + + // store(1) before compare_exchange(0, 2). + assert!(solutions.contains(&(Err(1), 1))); + + // store(1) after compare_exchange(0, 2). + assert!(solutions.contains(&(Ok(0), 1))); + + assert_eq!(solutions.len(), 2, "{:?}", solutions); +} + +#[test] +fn unordered_stores() { + let solutions = std::sync::Arc::new(std::sync::Mutex::new(HashSet::new())); + let solutions1 = std::sync::Arc::clone(&solutions); + + loom::model(move || { + let a1 = Arc::new(AtomicUsize::new(0)); + let a2 = a1.clone(); + + let th = thread::spawn(move || { + a1.store(1, Relaxed); + }); + + a2.store(2, Relaxed); + + th.join().unwrap(); + + let b = a2.load(Relaxed); + + solutions1.lock().unwrap().insert(b); + }); + + let solutions = solutions.lock().unwrap(); + + assert!(solutions.contains(&1)); + assert!(solutions.contains(&2)); + + assert_eq!(solutions.len(), 2); +} + +// See issue https://github.com/tokio-rs/loom/issues/254 +#[test] +fn concurrent_rmw_store() { + loom::model(move || { + let flag = Arc::new(AtomicBool::new(false)); + + let th = thread::spawn({ + let flag = flag.clone(); + move || flag.store(true, Relaxed) // a.k.a. unlock() + }); + + if flag.swap(false, Relaxed) { + // a.k.a. if !try_lock { return } + return; + } + + th.join().unwrap(); - assert_ne!(v1, v2); + assert!(flag.load(Relaxed)); // a.k.a. is_locked(). }); }