Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 21 additions & 25 deletions src/rt/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ pub(super) struct State {
/// Last time the atomic was accessed for a store or rmw operation.
last_non_load_access: Option<Access>,

/// Last time the atomic was accessed using an RMW access that did not fail.
///
/// This is used for applying the RMW atomicity constraint to other stores
/// (so they don't squeeze in between the read and write part of the RMW).
last_successful_rmw: VersionVec,

/// Currently tracked stored values. This is the `MAX_ATOMIC_HISTORY` most
/// recent stores to the atomic cell in loom execution order.
stores: [Store; MAX_ATOMIC_HISTORY],
Expand Down Expand Up @@ -413,6 +419,7 @@ impl State {
is_mutating: false,
last_access: None,
last_non_load_access: None,
last_successful_rmw: VersionVec::new(),
stores: Default::default(),
cnt: 0,
};
Expand Down Expand Up @@ -482,6 +489,9 @@ impl State {
}
}

// RMW Atomicity.
modification_order.join(&self.last_successful_rmw);

sync.sync_store(threads, ordering);

let mut first_seen = FirstSeen::new();
Expand Down Expand Up @@ -535,6 +545,10 @@ impl State {
let sync = self.stores[index].sync;
self.store(threads, sync, next, success);

// RMW Atomicity. Future stores must happen after this, because we
// already observed a value.
self.last_successful_rmw.join(&threads.active().causality);

Ok(prev)
}
Err(e) => {
Expand All @@ -546,7 +560,7 @@ impl State {

fn apply_load_coherence(&mut self, threads: &mut thread::Set, index: usize) {
for i in 0..self.stores.len() {
// Skip if the is current.
// Skip if it is the current load.
if index == i {
continue;
}
Expand Down Expand Up @@ -730,18 +744,9 @@ impl State {
//
// Add all stores **unless** a newer store has already been seen by the
// current thread's causality.
'outer: for i in 0..self.stores.len() {
let store_i = &self.stores[i];

if i >= cnt {
// Not a real store
continue;
}

for j in 0..self.stores.len() {
let store_j = &self.stores[j];

if i == j || j >= cnt {
'outer: for (i, store_i) in self.stores.iter().take(cnt).enumerate() {
for (j, store_j) in self.stores.iter().take(cnt).enumerate() {
if i == j {
continue;
}

Expand Down Expand Up @@ -784,18 +789,9 @@ impl State {

// Unlike `match_load_to_stores`, rmw operations only load "newest"
// stores, in terms of modification order.
'outer: for i in 0..self.stores.len() {
let store_i = &self.stores[i];

if i >= cnt {
// Not a real store
continue;
}

for j in 0..self.stores.len() {
let store_j = &self.stores[j];

if i == j || j >= cnt {
'outer: for (i, store_i) in self.stores.iter().take(cnt).enumerate() {
for (j, store_j) in self.stores.iter().take(cnt).enumerate() {
if i == j {
continue;
}

Expand Down
145 changes: 142 additions & 3 deletions tests/atomic.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#![deny(warnings, rust_2018_idioms)]

use loom::sync::atomic::AtomicUsize;
use loom::sync::atomic::{AtomicBool, AtomicUsize};
use loom::thread;

use std::collections::HashSet;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use std::sync::Arc;

Expand Down Expand Up @@ -111,15 +112,153 @@ fn compare_and_swap_reads_old_values() {

#[test]
fn fetch_add_atomic() {
loom::model(|| {
let solutions = std::sync::Arc::new(std::sync::Mutex::new(HashSet::new()));
let solutions1 = std::sync::Arc::clone(&solutions);

loom::model(move || {
let a1 = Arc::new(AtomicUsize::new(0));
let a2 = a1.clone();

let th = thread::spawn(move || a2.fetch_add(1, Relaxed));

let v1 = a1.fetch_add(1, Relaxed);
let v2 = th.join().unwrap();
let v3 = a1.load(Relaxed);

solutions1.lock().unwrap().insert((v1, v2, v3));
});

let solutions = solutions.lock().unwrap();

assert!(solutions.contains(&(0, 1, 2)));
assert!(solutions.contains(&(1, 0, 2)));
assert_eq!(solutions.len(), 2);
}

#[test]
fn store_does_not_squeeze_in_rmw() {
let solutions = std::sync::Arc::new(std::sync::Mutex::new(HashSet::new()));
let solutions1 = std::sync::Arc::clone(&solutions);

loom::model(move || {
let a1 = Arc::new(AtomicUsize::new(0));
let a2 = a1.clone();

let th = thread::spawn(move || {
a1.store(1, Relaxed);
});

let b1 = a2.swap(2, Relaxed);

a2.store(3, Relaxed);

let b2 = a2.swap(4, Relaxed);

th.join().unwrap();

let b3 = a2.load(Relaxed);

solutions1.lock().unwrap().insert((b1, b2, b3));
});

let solutions = solutions.lock().unwrap();

// store(1) before swap(2).
assert!(solutions.contains(&(1, 3, 4)));

// store(1) after swap(2), before store(3).
assert!(solutions.contains(&(0, 3, 4)));

// store(1) after store(3), before swap(4).
assert!(solutions.contains(&(0, 1, 4)));

// store(1) after swap(4) (but before join).
assert!(solutions.contains(&(0, 3, 1)));

assert_eq!(solutions.len(), 4);
}

#[test]
fn store_oncurrent_failed_rmw() {
let solutions = std::sync::Arc::new(std::sync::Mutex::new(HashSet::new()));
let solutions1 = std::sync::Arc::clone(&solutions);

loom::model(move || {
let a1 = Arc::new(AtomicUsize::new(0));
let a2 = a1.clone();

let th = thread::spawn(move || {
a1.store(1, Relaxed);
});

let b1 = a2.compare_exchange(0, 2, Relaxed, Relaxed);

th.join().unwrap();

let b2 = a2.load(Relaxed);

solutions1.lock().unwrap().insert((b1, b2));
});

let solutions = solutions.lock().unwrap();

// store(1) before compare_exchange(0, 2).
assert!(solutions.contains(&(Err(1), 1)));

// store(1) after compare_exchange(0, 2).
assert!(solutions.contains(&(Ok(0), 1)));

assert_eq!(solutions.len(), 2, "{:?}", solutions);
}

#[test]
fn unordered_stores() {
let solutions = std::sync::Arc::new(std::sync::Mutex::new(HashSet::new()));
let solutions1 = std::sync::Arc::clone(&solutions);

loom::model(move || {
let a1 = Arc::new(AtomicUsize::new(0));
let a2 = a1.clone();

let th = thread::spawn(move || {
a1.store(1, Relaxed);
});

a2.store(2, Relaxed);

th.join().unwrap();

let b = a2.load(Relaxed);

solutions1.lock().unwrap().insert(b);
});

let solutions = solutions.lock().unwrap();

assert!(solutions.contains(&1));
assert!(solutions.contains(&2));

assert_eq!(solutions.len(), 2);
}

// See issue https://github.com/tokio-rs/loom/issues/254
#[test]
fn concurrent_rmw_store() {
loom::model(move || {
let flag = Arc::new(AtomicBool::new(false));

let th = thread::spawn({
let flag = flag.clone();
move || flag.store(true, Relaxed) // a.k.a. unlock()
});

if flag.swap(false, Relaxed) {
// a.k.a. if !try_lock { return }
return;
}

th.join().unwrap();

assert_ne!(v1, v2);
assert!(flag.load(Relaxed)); // a.k.a. is_locked().
});
}
Loading