Skip to content

Min-quorum reconciliation #1733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 50 additions & 18 deletions crutest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2516,6 +2516,15 @@ async fn replace_before_active(
// that the initial downstairs are all synced up on the same flush and
// generation numbers.
fill_workload(volume, di, true).await?;

// Track which SocketAddr corresponds to which region. This shifts over
// time as the test runs, so we have to track it correctly disable 2
// downstairs for a given region.
let mut regions = vec![];
for i in 0..targets.len() - 1 {
regions.push(Some(i as u32 / 3));
}
regions.push(None);
let ds_total = targets.len() - 1;
let mut old_ds = 0;
let mut new_ds = targets.len() - 1;
Expand All @@ -2533,26 +2542,42 @@ async fn replace_before_active(
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
}

// Stop a downstairs, wait for dsc to confirm it is stopped.
dsc_client.dsc_stop(old_ds).await.unwrap();
loop {
let res = dsc_client.dsc_get_ds_state(old_ds).await.unwrap();
let state = res.into_inner();
if state == DownstairsState::Exit {
break;
// Pick a second downstairs that's in the same region, so we can stop
// two downstairs and prevent activation. This is linear-time with the
// number of targets, but that's fine (so is writing to every block).
assert!(regions[new_ds].is_none());
let region = regions[old_ds].unwrap();
let (other_ds, _) = regions
.iter()
.enumerate()
.find(|(i, d)| *i != old_ds && **d == Some(region))
.unwrap();

// Stop two downstairs in the same region, then wait for dsc to confirm
// they are stopped. Having two downstairs stopped blocks activation.
for old_ds in [old_ds, other_ds] {
dsc_client.dsc_stop(old_ds as u32).await.unwrap();
loop {
let res =
dsc_client.dsc_get_ds_state(old_ds as u32).await.unwrap();
let state = res.into_inner();
if state == DownstairsState::Exit {
break;
}
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
}
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
}

info!(log, "[{c}] Request the upstairs activate");
// Spawn a task to re-activate, this will not finish till all three
// downstairs respond.
// Spawn a task to re-activate, this will not finish until 2-3
// downstairs respond (and we have disabled all but 1)
gen += 1;
let gc = volume.clone();
let handle =
tokio::spawn(async move { gc.activate_with_gen(gen).await });

// Give the activation request time to percolate in the upstairs.
// Give the activation request time to percolate in the upstairs; it
// shouldn't get anywhere because we don't have enough downstairs
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
let is_active = volume.query_is_active().await.unwrap();
info!(log, "[{c}] activate should now be waiting {:?}", is_active);
Expand All @@ -2561,13 +2586,13 @@ async fn replace_before_active(
info!(
log,
"[{c}] Replacing DS {old_ds}:{} with {new_ds}:{}",
targets[old_ds as usize],
targets[old_ds],
targets[new_ds],
);
match volume
.replace_downstairs(
Uuid::new_v4(),
targets[old_ds as usize],
targets[old_ds],
targets[new_ds],
)
.await
Expand All @@ -2578,6 +2603,9 @@ async fn replace_before_active(
}
}

// At this point, we've got two Downstairs (one of which was provided
// initially, and one of which has just been replaced), so activation
// should happen!
info!(log, "[{c}] Wait for activation after replacement");
loop {
let is_active = volume.query_is_active().await.unwrap();
Expand All @@ -2601,9 +2629,12 @@ async fn replace_before_active(
bail!("Requested volume verify failed: {:?}", e)
}

// Start up the old downstairs so it is ready for the next loop.
let res = dsc_client.dsc_start(old_ds).await;
info!(log, "[{c}] Replay: started {old_ds}, returned:{:?}", res);
// Start up all the stopped downstairs so they are ready for the next
// loop.
for old_ds in [old_ds, other_ds] {
let res = dsc_client.dsc_start(old_ds as u32).await;
info!(log, "[{c}] Replay: started {old_ds}, returned:{:?}", res);
}

// Wait for all IO to finish before we continue
loop {
Expand All @@ -2622,12 +2653,13 @@ async fn replace_before_active(
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
}

old_ds = (old_ds + 1) % (ds_total as u32 + 1);
regions.swap(old_ds, new_ds);
old_ds = (old_ds + 1) % (ds_total + 1);
new_ds = (new_ds + 1) % (ds_total + 1);

match wtq {
WhenToQuit::Count { count } => {
if c > count {
if c >= count {
break;
}
}
Expand Down
171 changes: 171 additions & 0 deletions integration_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,24 @@ mod test {
Ok(())
}

/// Stops the downstairs task, return a `(port, rport)` tuple
pub async fn stop(&mut self) -> Result<(u16, u16)> {
let ds = self.downstairs.take().unwrap();
let port = ds.address().port();
let rport = ds.repair_address().port();
ds.stop().await?;
Ok((port, rport))
}

pub async fn reboot_read_write(&mut self) -> Result<()> {
self.reboot_read_write_with_ports(0, 0).await
}

pub async fn reboot_read_write_with_ports(
&mut self,
port: u16,
rport: u16,
) -> Result<()> {
let downstairs =
Downstairs::new_builder(self.tempdir.path(), false)
.set_logger(csl())
Expand All @@ -134,6 +151,8 @@ mod test {
downstairs,
DownstairsClientSettings {
address: self.address,
port,
rport,
..DownstairsClientSettings::default()
},
)
Expand Down Expand Up @@ -5839,4 +5858,156 @@ mod test {
// Make sure everything worked
volume.activate().await.unwrap();
}

#[tokio::test]
async fn connect_two_ds_then_deactivate() {
const BLOCK_SIZE: usize = 512;

// Spin off three downstairs, build our Crucible struct.
let mut tds = TestDownstairsSet::small(false).await.unwrap();
let opts = tds.opts();
tds.downstairs1.stop().await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let (guest, io) = Guest::new(None);
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
guest.activate().await.unwrap();

let res = guest
.write(
BlockIndex(0),
BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()),
)
.await;
assert!(res.is_ok());

guest.deactivate().await.unwrap();
}

#[tokio::test]
async fn connect_two_ds_then_another() {
const BLOCK_SIZE: usize = 512;

// Spin off three downstairs, build our Crucible struct.
let mut tds = TestDownstairsSet::small(false).await.unwrap();
let opts = tds.opts();
let (ds1_port, ds1_rport) = tds.downstairs1.stop().await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let (guest, io) = Guest::new(None);
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
guest.activate().await.unwrap();

let res = guest
.write(
BlockIndex(0),
BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()),
)
.await;
assert!(res.is_ok());

// Restart downstairs1, which should use live-repair to join the quorum
//
// We have to wait a while here, because there's a 10-second reconnect
// delay.
tds.downstairs1
.reboot_read_write_with_ports(ds1_port, ds1_rport)
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;
guest.deactivate().await.unwrap();

// Reconnect with only ds1 running, then confirm that it received the
// writes. We'll come up in read-only mode so that we can connect with
// just a single Downstairs, to make sure the reads go to DS1.
tds.downstairs1.reboot_read_only().await.unwrap();
tds.downstairs2.stop().await.unwrap();
tds.downstairs3.stop().await.unwrap();
tds.crucible_opts.read_only = true;
tds.crucible_opts.target[0] = tds.downstairs1.address();
let opts = tds.opts();
let (guest, io) = Guest::new(None);
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
guest.activate().await.unwrap();
let mut buf = Buffer::new(2, BLOCK_SIZE);
guest.read(BlockIndex(0), &mut buf).await.unwrap();

assert_eq!(buf.to_vec(), vec![0x55; BLOCK_SIZE * 2]);
}

#[tokio::test]
async fn min_quorum_live_repair() {
const BLOCK_SIZE: usize = 512;

// Spin off three downstairs, build our Crucible struct.
let mut tds = TestDownstairsSet::small(false).await.unwrap();

// Stop downstairs 1 before constructing the guest, so it won't be
// included and we'll do min-quorum reconciliation.
let (port, rport) = tds.downstairs1.stop().await.unwrap();

// Start the guest and do a write to ds 2 and 3.
let (guest, io) = Guest::new(None);
let opts = tds.opts();
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
guest.activate().await.unwrap();
let res = guest
.write(
BlockIndex(0),
BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()),
)
.await;
assert!(res.is_ok());

// Deactivate the guest, all without downstairs 1 participating
guest.deactivate().await.unwrap();

// At this point, the data has been written to DS 2 and 3. We'll start
// up again with DS 1 and 2, so min-quorum should do reconciliation.

tds.downstairs1
.reboot_read_write_with_ports(port, rport)
.await
.unwrap();
tds.downstairs2.stop().await.unwrap();
guest.activate_with_gen(2).await.unwrap();

let mut buf = Buffer::new(2, BLOCK_SIZE);
guest.read(BlockIndex(0), &mut buf).await.unwrap();

assert_eq!(buf.to_vec(), vec![0x55; BLOCK_SIZE * 2]);
}

#[tokio::test]
async fn min_quorum_cancel() {
// Spin off three downstairs, build our Crucible struct.
let mut tds = TestDownstairsSet::small(false).await.unwrap();

// Stop downstairs 1 before constructing the guest, so it won't be
// included and we'll do min-quorum reconciliation.
let (port, rport) = tds.downstairs1.stop().await.unwrap();

// Start the guest and do a write to ds 2 and 3.
let (guest, io) = Guest::new(None);
let opts = tds.opts();
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
let s = tokio::spawn(async move { guest.activate().await });

// Get into our min-quorum wait, which is 500 ms
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// Stop DS2
tds.downstairs2.stop().await.unwrap();

// Wait for the min-quorum timer to go off; it shouldn't panic!
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// Restart DS1, we're now eligible for min-quorum negotiation again
tds.downstairs1
.reboot_read_write_with_ports(port, rport)
.await
.unwrap();

s.await.unwrap().unwrap()
}
}
5 changes: 4 additions & 1 deletion tools/test_repair.sh
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,17 @@ while [[ $count -lt $loops ]]; do
ds2_pid=$!
fi

# Wait for it to start up
sleep 10

cp "$verify_file" ${verify_file}.last
echo "Verifying data now"
echo ${ct} verify ${target_args} --verify-out "$verify_file" --verify-in "$verify_file" --range -q -g "$generation" > "$test_log"
if ! ${ct} verify ${target_args} --verify-out "$verify_file" --verify-in "$verify_file" --range -q -g "$generation" >> "$test_log" 2>&1
then
echo "Exit on verify fail, loop: $count, choice: $choice"
echo "Check $test_log for details"
cleanup
cleanup
exit 1
fi
set +o errexit
Expand Down
7 changes: 7 additions & 0 deletions tools/test_up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@ if ! "$dsc" cmd start -c 2; then
echo "Failed repair test part 1, starting downstairs 2" >> "$fail_log"
echo
fi
state=$("$dsc" cmd state -c 2)
while [[ "$state" != "Running" ]]; do
echo "downstairs 2 not restarted yet, waiting"
sleep 5
state=$("$dsc" cmd state -c 2)
done
echo "Downstairs 2 restarted"

# Put a dump test in the middle of the repair test, so we
# can see both a mismatch and that dump works.
Expand Down
Loading