diff --git a/src/rt/atomic.rs b/src/rt/atomic.rs index 7f63417..392f707 100644 --- a/src/rt/atomic.rs +++ b/src/rt/atomic.rs @@ -118,6 +118,12 @@ pub(super) struct State { /// Last time the atomic was accessed for a store or rmw operation. last_non_load_access: Option, + /// Last time the atomic was accessed using an RMW access that did not fail. + /// + /// This is used for applying the RMW atomicity constraint to other stores + /// (so they don't squeeze in between the read and write part of the RMW). + last_successful_rmw: VersionVec, + /// Currently tracked stored values. This is the `MAX_ATOMIC_HISTORY` most /// recent stores to the atomic cell in loom execution order. stores: [Store; MAX_ATOMIC_HISTORY], @@ -413,6 +419,7 @@ impl State { is_mutating: false, last_access: None, last_non_load_access: None, + last_successful_rmw: VersionVec::new(), stores: Default::default(), cnt: 0, }; @@ -482,6 +489,9 @@ impl State { } } + // RMW Atomicity. + modification_order.join(&self.last_successful_rmw); + sync.sync_store(threads, ordering); let mut first_seen = FirstSeen::new(); @@ -535,6 +545,10 @@ impl State { let sync = self.stores[index].sync; self.store(threads, sync, next, success); + // RMW Atomicity. Future stores must happen after this, because we + // already observed a value. + self.last_successful_rmw.join(&threads.active().causality); + Ok(prev) } Err(e) => { @@ -546,7 +560,7 @@ impl State { fn apply_load_coherence(&mut self, threads: &mut thread::Set, index: usize) { for i in 0..self.stores.len() { - // Skip if the is current. + // Skip if it is the current load. if index == i { continue; } @@ -730,18 +744,9 @@ impl State { // // Add all stores **unless** a newer store has already been seen by the // current thread's causality. - 'outer: for i in 0..self.stores.len() { - let store_i = &self.stores[i]; - - if i >= cnt { - // Not a real store - continue; - } - - for j in 0..self.stores.len() { - let store_j = &self.stores[j]; - - if i == j || j >= cnt { + 'outer: for (i, store_i) in self.stores.iter().take(cnt).enumerate() { + for (j, store_j) in self.stores.iter().take(cnt).enumerate() { + if i == j { continue; } @@ -784,18 +789,9 @@ impl State { // Unlike `match_load_to_stores`, rmw operations only load "newest" // stores, in terms of modification order. - 'outer: for i in 0..self.stores.len() { - let store_i = &self.stores[i]; - - if i >= cnt { - // Not a real store - continue; - } - - for j in 0..self.stores.len() { - let store_j = &self.stores[j]; - - if i == j || j >= cnt { + 'outer: for (i, store_i) in self.stores.iter().take(cnt).enumerate() { + for (j, store_j) in self.stores.iter().take(cnt).enumerate() { + if i == j { continue; } diff --git a/tests/atomic.rs b/tests/atomic.rs index 73a8636..3792f03 100644 --- a/tests/atomic.rs +++ b/tests/atomic.rs @@ -1,8 +1,9 @@ #![deny(warnings, rust_2018_idioms)] -use loom::sync::atomic::AtomicUsize; +use loom::sync::atomic::{AtomicBool, AtomicUsize}; use loom::thread; +use std::collections::HashSet; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use std::sync::Arc; @@ -111,7 +112,10 @@ fn compare_and_swap_reads_old_values() { #[test] fn fetch_add_atomic() { - loom::model(|| { + let solutions = std::sync::Arc::new(std::sync::Mutex::new(HashSet::new())); + let solutions1 = std::sync::Arc::clone(&solutions); + + loom::model(move || { let a1 = Arc::new(AtomicUsize::new(0)); let a2 = a1.clone(); @@ -119,7 +123,142 @@ fn fetch_add_atomic() { let v1 = a1.fetch_add(1, Relaxed); let v2 = th.join().unwrap(); + let v3 = a1.load(Relaxed); + + solutions1.lock().unwrap().insert((v1, v2, v3)); + }); + + let solutions = solutions.lock().unwrap(); + + assert!(solutions.contains(&(0, 1, 2))); + assert!(solutions.contains(&(1, 0, 2))); + assert_eq!(solutions.len(), 2); +} + +#[test] +fn store_does_not_squeeze_in_rmw() { + let solutions = std::sync::Arc::new(std::sync::Mutex::new(HashSet::new())); + let solutions1 = std::sync::Arc::clone(&solutions); + + loom::model(move || { + let a1 = Arc::new(AtomicUsize::new(0)); + let a2 = a1.clone(); + + let th = thread::spawn(move || { + a1.store(1, Relaxed); + }); + + let b1 = a2.swap(2, Relaxed); + + a2.store(3, Relaxed); + + let b2 = a2.swap(4, Relaxed); + + th.join().unwrap(); + + let b3 = a2.load(Relaxed); + + solutions1.lock().unwrap().insert((b1, b2, b3)); + }); + + let solutions = solutions.lock().unwrap(); + + // store(1) before swap(2). + assert!(solutions.contains(&(1, 3, 4))); + + // store(1) after swap(2), before store(3). + assert!(solutions.contains(&(0, 3, 4))); + + // store(1) after store(3), before swap(4). + assert!(solutions.contains(&(0, 1, 4))); + + // store(1) after swap(4) (but before join). + assert!(solutions.contains(&(0, 3, 1))); + + assert_eq!(solutions.len(), 4); +} + +#[test] +fn store_oncurrent_failed_rmw() { + let solutions = std::sync::Arc::new(std::sync::Mutex::new(HashSet::new())); + let solutions1 = std::sync::Arc::clone(&solutions); + + loom::model(move || { + let a1 = Arc::new(AtomicUsize::new(0)); + let a2 = a1.clone(); + + let th = thread::spawn(move || { + a1.store(1, Relaxed); + }); + + let b1 = a2.compare_exchange(0, 2, Relaxed, Relaxed); + + th.join().unwrap(); + + let b2 = a2.load(Relaxed); + + solutions1.lock().unwrap().insert((b1, b2)); + }); + + let solutions = solutions.lock().unwrap(); + + // store(1) before compare_exchange(0, 2). + assert!(solutions.contains(&(Err(1), 1))); + + // store(1) after compare_exchange(0, 2). + assert!(solutions.contains(&(Ok(0), 1))); + + assert_eq!(solutions.len(), 2, "{:?}", solutions); +} + +#[test] +fn unordered_stores() { + let solutions = std::sync::Arc::new(std::sync::Mutex::new(HashSet::new())); + let solutions1 = std::sync::Arc::clone(&solutions); + + loom::model(move || { + let a1 = Arc::new(AtomicUsize::new(0)); + let a2 = a1.clone(); + + let th = thread::spawn(move || { + a1.store(1, Relaxed); + }); + + a2.store(2, Relaxed); + + th.join().unwrap(); + + let b = a2.load(Relaxed); + + solutions1.lock().unwrap().insert(b); + }); + + let solutions = solutions.lock().unwrap(); + + assert!(solutions.contains(&1)); + assert!(solutions.contains(&2)); + + assert_eq!(solutions.len(), 2); +} + +// See issue https://github.com/tokio-rs/loom/issues/254 +#[test] +fn concurrent_rmw_store() { + loom::model(move || { + let flag = Arc::new(AtomicBool::new(false)); + + let th = thread::spawn({ + let flag = flag.clone(); + move || flag.store(true, Relaxed) // a.k.a. unlock() + }); + + if flag.swap(false, Relaxed) { + // a.k.a. if !try_lock { return } + return; + } + + th.join().unwrap(); - assert_ne!(v1, v2); + assert!(flag.load(Relaxed)); // a.k.a. is_locked(). }); }