diff --git a/crutest/src/main.rs b/crutest/src/main.rs index 9dc747022..623cbd1ab 100644 --- a/crutest/src/main.rs +++ b/crutest/src/main.rs @@ -2516,6 +2516,15 @@ async fn replace_before_active( // that the initial downstairs are all synced up on the same flush and // generation numbers. fill_workload(volume, di, true).await?; + + // Track which SocketAddr corresponds to which region. This shifts over + // time as the test runs, so we have to track it correctly disable 2 + // downstairs for a given region. + let mut regions = vec![]; + for i in 0..targets.len() - 1 { + regions.push(Some(i as u32 / 3)); + } + regions.push(None); let ds_total = targets.len() - 1; let mut old_ds = 0; let mut new_ds = targets.len() - 1; @@ -2533,26 +2542,42 @@ async fn replace_before_active( tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; } - // Stop a downstairs, wait for dsc to confirm it is stopped. - dsc_client.dsc_stop(old_ds).await.unwrap(); - loop { - let res = dsc_client.dsc_get_ds_state(old_ds).await.unwrap(); - let state = res.into_inner(); - if state == DownstairsState::Exit { - break; + // Pick a second downstairs that's in the same region, so we can stop + // two downstairs and prevent activation. This is linear-time with the + // number of targets, but that's fine (so is writing to every block). + assert!(regions[new_ds].is_none()); + let region = regions[old_ds].unwrap(); + let (other_ds, _) = regions + .iter() + .enumerate() + .find(|(i, d)| *i != old_ds && **d == Some(region)) + .unwrap(); + + // Stop two downstairs in the same region, then wait for dsc to confirm + // they are stopped. Having two downstairs stopped blocks activation. + for old_ds in [old_ds, other_ds] { + dsc_client.dsc_stop(old_ds as u32).await.unwrap(); + loop { + let res = + dsc_client.dsc_get_ds_state(old_ds as u32).await.unwrap(); + let state = res.into_inner(); + if state == DownstairsState::Exit { + break; + } + tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; } - tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; } info!(log, "[{c}] Request the upstairs activate"); - // Spawn a task to re-activate, this will not finish till all three - // downstairs respond. + // Spawn a task to re-activate, this will not finish until 2-3 + // downstairs respond (and we have disabled all but 1) gen += 1; let gc = volume.clone(); let handle = tokio::spawn(async move { gc.activate_with_gen(gen).await }); - // Give the activation request time to percolate in the upstairs. + // Give the activation request time to percolate in the upstairs; it + // shouldn't get anywhere because we don't have enough downstairs tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; let is_active = volume.query_is_active().await.unwrap(); info!(log, "[{c}] activate should now be waiting {:?}", is_active); @@ -2561,13 +2586,13 @@ async fn replace_before_active( info!( log, "[{c}] Replacing DS {old_ds}:{} with {new_ds}:{}", - targets[old_ds as usize], + targets[old_ds], targets[new_ds], ); match volume .replace_downstairs( Uuid::new_v4(), - targets[old_ds as usize], + targets[old_ds], targets[new_ds], ) .await @@ -2578,6 +2603,9 @@ async fn replace_before_active( } } + // At this point, we've got two Downstairs (one of which was provided + // initially, and one of which has just been replaced), so activation + // should happen! info!(log, "[{c}] Wait for activation after replacement"); loop { let is_active = volume.query_is_active().await.unwrap(); @@ -2601,9 +2629,12 @@ async fn replace_before_active( bail!("Requested volume verify failed: {:?}", e) } - // Start up the old downstairs so it is ready for the next loop. - let res = dsc_client.dsc_start(old_ds).await; - info!(log, "[{c}] Replay: started {old_ds}, returned:{:?}", res); + // Start up all the stopped downstairs so they are ready for the next + // loop. + for old_ds in [old_ds, other_ds] { + let res = dsc_client.dsc_start(old_ds as u32).await; + info!(log, "[{c}] Replay: started {old_ds}, returned:{:?}", res); + } // Wait for all IO to finish before we continue loop { @@ -2622,12 +2653,13 @@ async fn replace_before_active( tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; } - old_ds = (old_ds + 1) % (ds_total as u32 + 1); + regions.swap(old_ds, new_ds); + old_ds = (old_ds + 1) % (ds_total + 1); new_ds = (new_ds + 1) % (ds_total + 1); match wtq { WhenToQuit::Count { count } => { - if c > count { + if c >= count { break; } } diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 164345d57..ac60b8d49 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -123,7 +123,24 @@ mod test { Ok(()) } + /// Stops the downstairs task, return a `(port, rport)` tuple + pub async fn stop(&mut self) -> Result<(u16, u16)> { + let ds = self.downstairs.take().unwrap(); + let port = ds.address().port(); + let rport = ds.repair_address().port(); + ds.stop().await?; + Ok((port, rport)) + } + pub async fn reboot_read_write(&mut self) -> Result<()> { + self.reboot_read_write_with_ports(0, 0).await + } + + pub async fn reboot_read_write_with_ports( + &mut self, + port: u16, + rport: u16, + ) -> Result<()> { let downstairs = Downstairs::new_builder(self.tempdir.path(), false) .set_logger(csl()) @@ -134,6 +151,8 @@ mod test { downstairs, DownstairsClientSettings { address: self.address, + port, + rport, ..DownstairsClientSettings::default() }, ) @@ -5839,4 +5858,156 @@ mod test { // Make sure everything worked volume.activate().await.unwrap(); } + + #[tokio::test] + async fn connect_two_ds_then_deactivate() { + const BLOCK_SIZE: usize = 512; + + // Spin off three downstairs, build our Crucible struct. + let mut tds = TestDownstairsSet::small(false).await.unwrap(); + let opts = tds.opts(); + tds.downstairs1.stop().await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + let (guest, io) = Guest::new(None); + let _join_handle = up_main(opts, 1, None, io, None).unwrap(); + guest.activate().await.unwrap(); + + let res = guest + .write( + BlockIndex(0), + BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()), + ) + .await; + assert!(res.is_ok()); + + guest.deactivate().await.unwrap(); + } + + #[tokio::test] + async fn connect_two_ds_then_another() { + const BLOCK_SIZE: usize = 512; + + // Spin off three downstairs, build our Crucible struct. + let mut tds = TestDownstairsSet::small(false).await.unwrap(); + let opts = tds.opts(); + let (ds1_port, ds1_rport) = tds.downstairs1.stop().await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + let (guest, io) = Guest::new(None); + let _join_handle = up_main(opts, 1, None, io, None).unwrap(); + guest.activate().await.unwrap(); + + let res = guest + .write( + BlockIndex(0), + BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()), + ) + .await; + assert!(res.is_ok()); + + // Restart downstairs1, which should use live-repair to join the quorum + // + // We have to wait a while here, because there's a 10-second reconnect + // delay. + tds.downstairs1 + .reboot_read_write_with_ports(ds1_port, ds1_rport) + .await + .unwrap(); + tokio::time::sleep(tokio::time::Duration::from_secs(15)).await; + guest.deactivate().await.unwrap(); + + // Reconnect with only ds1 running, then confirm that it received the + // writes. We'll come up in read-only mode so that we can connect with + // just a single Downstairs, to make sure the reads go to DS1. + tds.downstairs1.reboot_read_only().await.unwrap(); + tds.downstairs2.stop().await.unwrap(); + tds.downstairs3.stop().await.unwrap(); + tds.crucible_opts.read_only = true; + tds.crucible_opts.target[0] = tds.downstairs1.address(); + let opts = tds.opts(); + let (guest, io) = Guest::new(None); + let _join_handle = up_main(opts, 1, None, io, None).unwrap(); + guest.activate().await.unwrap(); + let mut buf = Buffer::new(2, BLOCK_SIZE); + guest.read(BlockIndex(0), &mut buf).await.unwrap(); + + assert_eq!(buf.to_vec(), vec![0x55; BLOCK_SIZE * 2]); + } + + #[tokio::test] + async fn min_quorum_live_repair() { + const BLOCK_SIZE: usize = 512; + + // Spin off three downstairs, build our Crucible struct. + let mut tds = TestDownstairsSet::small(false).await.unwrap(); + + // Stop downstairs 1 before constructing the guest, so it won't be + // included and we'll do min-quorum reconciliation. + let (port, rport) = tds.downstairs1.stop().await.unwrap(); + + // Start the guest and do a write to ds 2 and 3. + let (guest, io) = Guest::new(None); + let opts = tds.opts(); + let _join_handle = up_main(opts, 1, None, io, None).unwrap(); + guest.activate().await.unwrap(); + let res = guest + .write( + BlockIndex(0), + BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()), + ) + .await; + assert!(res.is_ok()); + + // Deactivate the guest, all without downstairs 1 participating + guest.deactivate().await.unwrap(); + + // At this point, the data has been written to DS 2 and 3. We'll start + // up again with DS 1 and 2, so min-quorum should do reconciliation. + + tds.downstairs1 + .reboot_read_write_with_ports(port, rport) + .await + .unwrap(); + tds.downstairs2.stop().await.unwrap(); + guest.activate_with_gen(2).await.unwrap(); + + let mut buf = Buffer::new(2, BLOCK_SIZE); + guest.read(BlockIndex(0), &mut buf).await.unwrap(); + + assert_eq!(buf.to_vec(), vec![0x55; BLOCK_SIZE * 2]); + } + + #[tokio::test] + async fn min_quorum_cancel() { + // Spin off three downstairs, build our Crucible struct. + let mut tds = TestDownstairsSet::small(false).await.unwrap(); + + // Stop downstairs 1 before constructing the guest, so it won't be + // included and we'll do min-quorum reconciliation. + let (port, rport) = tds.downstairs1.stop().await.unwrap(); + + // Start the guest and do a write to ds 2 and 3. + let (guest, io) = Guest::new(None); + let opts = tds.opts(); + let _join_handle = up_main(opts, 1, None, io, None).unwrap(); + let s = tokio::spawn(async move { guest.activate().await }); + + // Get into our min-quorum wait, which is 500 ms + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Stop DS2 + tds.downstairs2.stop().await.unwrap(); + + // Wait for the min-quorum timer to go off; it shouldn't panic! + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // Restart DS1, we're now eligible for min-quorum negotiation again + tds.downstairs1 + .reboot_read_write_with_ports(port, rport) + .await + .unwrap(); + + s.await.unwrap().unwrap() + } } diff --git a/tools/test_repair.sh b/tools/test_repair.sh index 5f50b5451..d3564c379 100755 --- a/tools/test_repair.sh +++ b/tools/test_repair.sh @@ -213,6 +213,9 @@ while [[ $count -lt $loops ]]; do ds2_pid=$! fi + # Wait for it to start up + sleep 10 + cp "$verify_file" ${verify_file}.last echo "Verifying data now" echo ${ct} verify ${target_args} --verify-out "$verify_file" --verify-in "$verify_file" --range -q -g "$generation" > "$test_log" @@ -220,7 +223,7 @@ while [[ $count -lt $loops ]]; do then echo "Exit on verify fail, loop: $count, choice: $choice" echo "Check $test_log for details" - cleanup + cleanup exit 1 fi set +o errexit diff --git a/tools/test_up.sh b/tools/test_up.sh index 807f5c292..6a55bccdf 100755 --- a/tools/test_up.sh +++ b/tools/test_up.sh @@ -289,6 +289,13 @@ if ! "$dsc" cmd start -c 2; then echo "Failed repair test part 1, starting downstairs 2" >> "$fail_log" echo fi +state=$("$dsc" cmd state -c 2) +while [[ "$state" != "Running" ]]; do + echo "downstairs 2 not restarted yet, waiting" + sleep 5 + state=$("$dsc" cmd state -c 2) +done +echo "Downstairs 2 restarted" # Put a dump test in the middle of the repair test, so we # can see both a mismatch and that dump works. diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index 485efb5b5..867228824 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -435,19 +435,30 @@ impl DownstairsClient { /// Sets our state to `DsStateData::Reconcile` /// /// # Panics - /// If the current state is invalid + /// If we are not currently in `WaitQuorum` pub(crate) fn begin_reconcile(&mut self) { - info!(self.log, "Transition from {:?} to Reconcile", self.state()); - let DsStateData::Connecting { state, mode } = &mut self.state else { - panic!( - "invalid state {:?} for client {}", - self.state(), - self.client_id - ); - }; - assert_eq!(state.discriminant(), NegotiationState::WaitQuorum); - assert_eq!(mode, &ConnectionMode::New); - *state = NegotiationStateData::Reconcile; + info!( + self.log, + "setting state to reconcile from {:?}", + self.state() + ); + if let DsStateData::Connecting { + state, + mode: ConnectionMode::New, + .. + } = &mut self.state + { + if matches!(state, NegotiationStateData::WaitQuorum(..)) { + *state = NegotiationStateData::Reconcile; + } else { + panic!( + "invalid negotiation state {:?}", + NegotiationState::from(&*state) + ); + } + } else { + panic!("invalid state {:?}", self.state()); + } } /// Checks whether this Downstairs is ready for the upstairs to deactivate @@ -498,7 +509,7 @@ impl DownstairsClient { // If the upstairs is already active (or trying to go active), then we // should automatically connect to the Downstairs. let auto_connect = match up_state { - UpstairsState::Active | UpstairsState::GoActive(..) => true, + UpstairsState::Active | UpstairsState::GoActive { .. } => true, UpstairsState::Disabled(..) | UpstairsState::Initializing | UpstairsState::Deactivating { .. } => false, @@ -539,7 +550,7 @@ impl DownstairsClient { match up_state { // If we haven't activated yet (or we're deactivating) then // start from New - UpstairsState::GoActive(..) + UpstairsState::GoActive { .. } | UpstairsState::Initializing | UpstairsState::Disabled(..) | UpstairsState::Deactivating { .. } => ConnectionMode::New, @@ -553,7 +564,7 @@ impl DownstairsClient { match up_state { // If we haven't activated yet (or we're deactivating), then // start from New - UpstairsState::GoActive(..) + UpstairsState::GoActive { .. } | UpstairsState::Initializing | UpstairsState::Disabled(..) | UpstairsState::Deactivating { .. } => ConnectionMode::New, @@ -749,17 +760,18 @@ impl DownstairsClient { /// /// This changes the subsequent path through negotiation, without restarting /// the client IO task. Doing so is safe because the faulted path is - /// a superset of the offline path. + /// a superset of all other paths. /// /// # Panics - /// If we are not in `DsStateData::Connecting { mode: ConnectionMode::Offline, - /// .. }` + /// If we are not in `DsStateData::Connecting { .. }` pub(crate) fn set_connection_mode_faulted(&mut self) { - let DsStateData::Connecting { mode, .. } = &mut self.state else { + let DsStateData::Connecting { mode, state } = &mut self.state else { panic!("not connecting"); }; - assert_eq!(*mode, ConnectionMode::Offline); - *mode = ConnectionMode::Faulted + *mode = ConnectionMode::Faulted; + if matches!(*state, NegotiationStateData::WaitQuorum(..)) { + *state = NegotiationStateData::LiveRepairReady; + } } /// Applies an [`EnqueueResult`] for the given job @@ -1445,7 +1457,7 @@ impl DownstairsClient { // downstairs here. match up_state { UpstairsState::Initializing - | UpstairsState::GoActive(_) => { + | UpstairsState::GoActive { .. } => { warn!( self.log, "Replace {} with {} before active", @@ -1578,7 +1590,7 @@ impl DownstairsClient { if matches!( up_state, UpstairsState::Initializing - | UpstairsState::GoActive(..) + | UpstairsState::GoActive { .. } ) => { *state = NegotiationStateData::WaitQuorum(dsr); diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index dae685e8a..786348d01 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -293,16 +293,23 @@ pub(crate) struct ReconcileData { /// Number of extents needing repair during initial activation reconcile_repair_needed: usize, + + /// Flags indicating whether a client is participating + participating: ClientData, } impl ReconcileData { - fn new>>(task_list: V) -> Self { + fn new>>( + task_list: V, + participating: ClientData, + ) -> Self { let task_list = task_list.into(); Self { id: Uuid::new_v4(), current_work: None, reconcile_repair_needed: task_list.len(), task_list, + participating, } } } @@ -426,7 +433,10 @@ impl Downstairs { /// Helper function to set all 3x clients as active, legally #[cfg(test)] pub fn force_active(&mut self) { - let up_state = UpstairsState::GoActive(BlockRes::dummy()); + let up_state = UpstairsState::GoActive { + res: BlockRes::dummy(), + min_quorum_deadline: None, + }; for cid in ClientId::iter() { for state in [ NegotiationStateData::Start, @@ -886,8 +896,8 @@ impl Downstairs { * that code yet, we are making use of this loop to find our * max. */ - let mut max_flush = 0; - let mut max_gen = 0; + let mut max_flush = None; + let mut max_gen = None; for (cid, rec) in ClientId::iter().filter_map(|i| { if let DsStateData::Connecting { state: NegotiationStateData::WaitQuorum(r), @@ -906,8 +916,8 @@ impl Downstairs { let mut dirty_log = Vec::with_capacity(MAX_LOG); for (i, m) in rec.iter().enumerate() { - max_flush = max_flush.max(m.flush + 1); - max_gen = max_gen.max(m.gen + 1); + max_flush = Some(max_flush.unwrap_or(0).max(m.flush + 1)); + max_gen = Some(max_gen.unwrap_or(0).max(m.gen + 1)); if i < MAX_LOG { flush_log.push(m.flush); gen_log.push(m.gen); @@ -924,6 +934,8 @@ impl Downstairs { info!(self.log, "[{cid}]R dirty{slice}: {dirty_log:?}",); } + let max_gen = max_gen.expect("no clients in WaitQuorum?"); + let max_flush = max_flush.expect("no clients in WaitQuorum?"); info!(self.log, "Max found gen is {}", max_gen); /* * Verify that the generation number that the guest has requested @@ -973,9 +985,11 @@ impl Downstairs { * Determine what extents don't match and what to do * about that */ - if let Some(reconcile_list) = self.mismatch_list() { - for c in self.clients.iter_mut() { - c.begin_reconcile(); + if let Some((reconcile_list, participating)) = self.mismatch_list() { + for i in ClientId::iter() { + if participating[i] { + self.clients[i].begin_reconcile(); + } } let task_list = self.convert_rc_to_messages( @@ -984,7 +998,7 @@ impl Downstairs { max_gen, ); - let reconcile = ReconcileData::new(task_list); + let reconcile = ReconcileData::new(task_list, participating); info!( self.log, @@ -1776,8 +1790,12 @@ impl Downstairs { reconcile.reconcile_repair_needed, ); - for c in self.clients.iter_mut() { - c.send_next_reconciliation_req(&mut next); + for i in ClientId::iter() { + if reconcile.participating[i] { + self.clients[i].send_next_reconciliation_req(&mut next); + } else { + next.skip(i); + } } reconcile.current_work = Some(next); @@ -1824,19 +1842,24 @@ impl Downstairs { return false; }; + let Some(reconcile) = self.reconcile.as_mut() else { + unreachable!(); // checked above + }; + // Check to make sure that we're still in a repair-ready state // // If any client have dropped out of repair-readiness (e.g. due to // failed reconciliation, timeouts, etc), then we have to kick // everything else back to the beginning. - if self.clients.iter().any(|c| { - !matches!( - c.state(), - DsState::Connecting { - state: NegotiationState::Reconcile, - .. - } - ) + if ClientId::iter().any(|i| { + reconcile.participating[i] + && !matches!( + self.clients[i].state(), + DsState::Connecting { + state: NegotiationState::Reconcile, + .. + } + ) }) { // Something has changed, so abort this repair. // Mark any downstairs that have not changed as failed and disable @@ -1845,10 +1868,6 @@ impl Downstairs { return false; } - let Some(reconcile) = self.reconcile.as_mut() else { - unreachable!(); // checked above - }; - let next = reconcile.current_work.as_mut().unwrap(); if self.clients[client_id].on_reconciliation_job_done(repair_id, next) { reconcile.current_work = None; @@ -1943,13 +1962,37 @@ impl Downstairs { /// Asserts that initial reconciliation is done, and sets clients as Active /// + /// Specifically, clients that were in `NegotiationState::Reconcile` are set + /// as active; clients that are in other negotiation states are marked as + /// faulted. + /// /// # Panics /// If that isn't the case! pub(crate) fn on_reconciliation_done(&mut self, did_work: bool) { assert!(self.ds_active.is_empty()); - for c in self.clients.iter_mut() { - c.set_active(); + for (i, c) in self.clients.iter_mut().enumerate() { + match c.state() { + DsState::Connecting { + state: NegotiationState::WaitQuorum, + .. + } => { + assert!(!did_work); + c.set_active(); + } + DsState::Connecting { + state: NegotiationState::Reconcile, + .. + } => { + assert!(did_work); + c.set_active(); + } + DsState::Connecting { .. } => c.set_connection_mode_faulted(), + s => panic!( + "invalid state in on_reconciliation_done \ + for client {i}: {s:?}" + ), + } } if did_work { @@ -1964,20 +2007,21 @@ impl Downstairs { } /// Compares region metadata from all three clients and builds a mend list - fn mismatch_list(&self) -> Option { + fn mismatch_list(&self) -> Option<(DownstairsMend, ClientData)> { let log = self.log.new(o!("" => "mend".to_string())); let mut meta = ClientMap::new(); + let mut participating = ClientData::new(false); for i in ClientId::iter() { - let DsStateData::Connecting { + if let DsStateData::Connecting { state: NegotiationStateData::WaitQuorum(r), .. } = self.clients[i].state_data() - else { - panic!("client {i} is not in WaitQuorum"); - }; - meta.insert(i, r); + { + meta.insert(i, r); + participating[i] = true; + } } - DownstairsMend::new(&meta, log) + DownstairsMend::new(&meta, log).map(|m| (m, participating)) } pub(crate) fn submit_flush( @@ -3964,8 +4008,9 @@ struct DownstairsBackpressureConfig { #[cfg(test)] pub(crate) mod test { use super::{ - ClientFaultReason, ClientNegotiationFailed, ClientStopReason, - ConnectionMode, Downstairs, DsState, NegotiationStateData, PendingJob, + ClientData, ClientFaultReason, ClientNegotiationFailed, + ClientStopReason, ConnectionMode, Downstairs, DsState, + NegotiationStateData, PendingJob, }; use crate::{ downstairs::{LiveRepairData, LiveRepairState, ReconcileData}, @@ -4063,7 +4108,10 @@ pub(crate) mod test { fn set_all_reconcile(ds: &mut Downstairs) { let mode = ConnectionMode::New; - let up_state = UpstairsState::GoActive(BlockRes::dummy()); + let up_state = UpstairsState::GoActive { + res: BlockRes::dummy(), + min_quorum_deadline: None, + }; for cid in ClientId::iter() { for state in [ NegotiationStateData::Start, @@ -5701,7 +5749,7 @@ pub(crate) mod test { // No repairs on the queue, should return None let mut ds = Downstairs::test_default(); - ds.reconcile = Some(ReconcileData::new([])); + ds.reconcile = Some(ReconcileData::new([], ClientData::new(true))); let w = ds.send_next_reconciliation_req(); assert!(w); // reconciliation is "done", because there's nothing there @@ -5719,22 +5767,25 @@ pub(crate) mod test { let rep_id = ReconciliationId(1); // Put two jobs on the todo list - ds.reconcile = Some(ReconcileData::new([ - ReconcileIO::new( - close_id, - Message::ExtentClose { - repair_id: close_id, - extent_id: ExtentId(1), - }, - ), - ReconcileIO::new( - rep_id, - Message::ExtentClose { - repair_id: rep_id, - extent_id: ExtentId(1), - }, - ), - ])); + ds.reconcile = Some(ReconcileData::new( + [ + ReconcileIO::new( + close_id, + Message::ExtentClose { + repair_id: close_id, + extent_id: ExtentId(1), + }, + ), + ReconcileIO::new( + rep_id, + Message::ExtentClose { + repair_id: rep_id, + extent_id: ExtentId(1), + }, + ), + ], + ClientData::new(true), // all clients are participating + )); // Send the first reconciliation req assert!(!ds.send_next_reconciliation_req()); @@ -5775,13 +5826,16 @@ pub(crate) mod test { let up_state = UpstairsState::Active; let rep_id = ReconciliationId(0); - ds.reconcile = Some(ReconcileData::new([ReconcileIO::new( - rep_id, - Message::ExtentClose { - repair_id: rep_id, - extent_id: ExtentId(1), - }, - )])); + ds.reconcile = Some(ReconcileData::new( + [ReconcileIO::new( + rep_id, + Message::ExtentClose { + repair_id: rep_id, + extent_id: ExtentId(1), + }, + )], + ClientData::new(true), // all clients are participating + )); // Send that job ds.send_next_reconciliation_req(); @@ -5814,13 +5868,16 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); let rep_id = ReconciliationId(0); - ds.reconcile = Some(ReconcileData::new([ReconcileIO::new( - rep_id, - Message::ExtentClose { - repair_id: rep_id, - extent_id: ExtentId(1), - }, - )])); + ds.reconcile = Some(ReconcileData::new( + [ReconcileIO::new( + rep_id, + Message::ExtentClose { + repair_id: rep_id, + extent_id: ExtentId(1), + }, + )], + ClientData::new(true), // all clients are participating + )); // Send that req assert!(!ds.send_next_reconciliation_req()); @@ -5836,22 +5893,25 @@ pub(crate) mod test { let close_id = ReconciliationId(0); let rep_id = ReconciliationId(1); - ds.reconcile = Some(ReconcileData::new([ - ReconcileIO::new( - close_id, - Message::ExtentClose { - repair_id: close_id, - extent_id: ExtentId(1), - }, - ), - ReconcileIO::new( - rep_id, - Message::ExtentClose { - repair_id: rep_id, - extent_id: ExtentId(1), - }, - ), - ])); + ds.reconcile = Some(ReconcileData::new( + [ + ReconcileIO::new( + close_id, + Message::ExtentClose { + repair_id: close_id, + extent_id: ExtentId(1), + }, + ), + ReconcileIO::new( + rep_id, + Message::ExtentClose { + repair_id: rep_id, + extent_id: ExtentId(1), + }, + ), + ], + ClientData::new(true), // all clients are participating + )); // Send the close job. Reconciliation isn't done at this point! assert!(!ds.send_next_reconciliation_req()); @@ -5884,19 +5944,22 @@ pub(crate) mod test { let rep_id = ReconciliationId(1); // Queue up a repair message, which will be skiped for client 0 - ds.reconcile = Some(ReconcileData::new([ReconcileIO::new( - rep_id, - Message::ExtentRepair { - repair_id: rep_id, - extent_id: ExtentId(1), - source_client_id: ClientId::new(0), - source_repair_address: SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - 803, - ), - dest_clients: vec![ClientId::new(1), ClientId::new(2)], - }, - )])); + ds.reconcile = Some(ReconcileData::new( + [ReconcileIO::new( + rep_id, + Message::ExtentRepair { + repair_id: rep_id, + extent_id: ExtentId(1), + source_client_id: ClientId::new(0), + source_repair_address: SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 803, + ), + dest_clients: vec![ClientId::new(1), ClientId::new(2)], + }, + )], + ClientData::new(true), // all clients are participating + )); // Send the job. Reconciliation isn't done at this point! assert!(!ds.send_next_reconciliation_req()); @@ -5927,19 +5990,22 @@ pub(crate) mod test { let rep_id = ReconciliationId(1); // Queue up a repair message, which will be skiped for client 0 - ds.reconcile = Some(ReconcileData::new([ReconcileIO::new( - rep_id, - Message::ExtentRepair { - repair_id: rep_id, - extent_id: ExtentId(1), - source_client_id: ClientId::new(0), - source_repair_address: SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - 803, - ), - dest_clients: vec![ClientId::new(1), ClientId::new(2)], - }, - )])); + ds.reconcile = Some(ReconcileData::new( + [ReconcileIO::new( + rep_id, + Message::ExtentRepair { + repair_id: rep_id, + extent_id: ExtentId(1), + source_client_id: ClientId::new(0), + source_repair_address: SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 803, + ), + dest_clients: vec![ClientId::new(1), ClientId::new(2)], + }, + )], + ClientData::new(true), // all clients are participating + )); // Send the job. Reconciliation isn't done at this point! assert!(!ds.send_next_reconciliation_req()); @@ -5962,22 +6028,25 @@ pub(crate) mod test { let rep_id = ReconciliationId(1); // Queue up a repair message, which will be skiped for client 0 - ds.reconcile = Some(ReconcileData::new([ - ReconcileIO::new( - close_id, - Message::ExtentClose { - repair_id: close_id, - extent_id: ExtentId(1), - }, - ), - ReconcileIO::new( - rep_id, - Message::ExtentClose { - repair_id: rep_id, - extent_id: ExtentId(1), - }, - ), - ])); + ds.reconcile = Some(ReconcileData::new( + [ + ReconcileIO::new( + close_id, + Message::ExtentClose { + repair_id: close_id, + extent_id: ExtentId(1), + }, + ), + ReconcileIO::new( + rep_id, + Message::ExtentClose { + repair_id: rep_id, + extent_id: ExtentId(1), + }, + ), + ], + ClientData::new(true), // all clients are participating + )); // Send the first req; reconciliation is not yet done assert!(!ds.send_next_reconciliation_req()); diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index a6cc28269..f3ef7b965 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -1049,6 +1049,11 @@ impl ReconcileIO { state: ClientData::new(ReconcileIOState::New), } } + + /// Marks the job as skipped for the given client + fn skip(&mut self, i: ClientId) { + self.state[i] = ReconcileIOState::Skipped; + } } /* diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index a7bae7c69..917f77acd 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -42,6 +42,9 @@ use uuid::Uuid; /// How often to log stats for DTrace const STAT_INTERVAL: Duration = Duration::from_secs(1); +/// How long to delay before negotiating with 2/3 Downstairs +const NEGOTIATION_DELAY: Duration = Duration::from_millis(500); + /// Minimum IO size (in bytes) before encryption / decryption is done off-thread const MIN_DEFER_SIZE_BYTES: u64 = 8192; @@ -66,7 +69,12 @@ pub(crate) enum UpstairsState { /// The guest has requested that the upstairs go active /// /// We should reply on the provided channel - GoActive(BlockRes), + GoActive { + res: BlockRes, + + /// Time at which to perform min-quorum negotiation + min_quorum_deadline: Option, + }, /// The upstairs is fully online and accepting guest IO Active, @@ -271,6 +279,9 @@ pub(crate) enum UpstairsAction { StatUpdate, Control(ControlRequest), + /// The timer has fired for reconciliation, with 2/3 downstairs + MinQuorumReconciliation, + /// The guest connection has been dropped GuestDropped, @@ -513,6 +524,16 @@ impl Upstairs { } } } + _ = if let UpstairsState::GoActive { + min_quorum_deadline: Some(time), .. + } = &self.state { + futures::future::Either::Left(sleep_until(*time)) + } else { + futures::future::Either::Right(std::future::pending()) + } + => { + UpstairsAction::MinQuorumReconciliation + } m = self.deferred_msgs.next(), if !self.deferred_msgs.is_empty() => { // The outer Option is None if the queue is empty. If this is @@ -559,6 +580,9 @@ impl Upstairs { UpstairsAction::GuestDropped => { self.guest_dropped = true; } + UpstairsAction::MinQuorumReconciliation => { + self.on_min_quorum(); + } UpstairsAction::DeferredBlockOp(req) => { self.counters.action_deferred_block += 1; cdt::up__action_deferred_block!(|| (self @@ -862,7 +886,7 @@ impl Upstairs { let state = match &self.state { UpstairsState::Initializing | UpstairsState::Disabled(..) - | UpstairsState::GoActive(..) => { + | UpstairsState::GoActive { .. } => { crate::UpState::Initializing } UpstairsState::Active => crate::UpState::Active, @@ -1003,7 +1027,7 @@ impl Upstairs { // We allow this if we are not active yet, or we are active // with the requested generation number. match &self.state { - UpstairsState::Active | UpstairsState::GoActive(..) => { + UpstairsState::Active | UpstairsState::GoActive { .. } => { if self.cfg.generation() == gen { // Okay, we want to activate with what we already // have, that's valid; let the set_active_request @@ -1229,7 +1253,10 @@ impl Upstairs { fn set_active_request(&mut self, res: BlockRes) { match &self.state { UpstairsState::Initializing | UpstairsState::Disabled(..) => { - self.state = UpstairsState::GoActive(res); + self.state = UpstairsState::GoActive { + res, + min_quorum_deadline: None, + }; info!(self.log, "{} active request set", self.cfg.upstairs_id); // Notify all clients that they should go active when they hit @@ -1238,7 +1265,7 @@ impl Upstairs { c.set_active_request(); } } - UpstairsState::GoActive(..) => { + UpstairsState::GoActive { .. } => { // We have already been sent a request to go active, but we // are not active yet and will respond (on the original // BlockRes) when we do become active. @@ -1280,7 +1307,7 @@ impl Upstairs { match &self.state { UpstairsState::Initializing | UpstairsState::Disabled(..) - | UpstairsState::GoActive(..) => { + | UpstairsState::GoActive { .. } => { res.send_err(CrucibleError::UpstairsInactive); return; } @@ -1755,7 +1782,7 @@ impl Upstairs { self.connect_ro_region_set(); } else { // See if we have a quorum - self.connect_region_set(); + self.on_wait_quorum(); } } Ok(NegotiationResult::Replay) => { @@ -1858,7 +1885,7 @@ impl Upstairs { self.on_reconciliation_skipped() } - /// Checks whether we can connect all three regions + /// Response to a client entering `NegotiationState::WaitQuorum` /// /// Returns `false` if we aren't ready, or if things failed. If there's a /// failure, then we also update the client state. @@ -1868,100 +1895,117 @@ impl Upstairs { /// **can't** activate, then we should notify the requestor of failure. /// /// If we have a problem here, we can't activate the upstairs. - fn connect_region_set(&mut self) -> bool { - /* - * If reconciliation is required, it happens in three phases. - * Typically an interruption of reconciliation will result in things - * starting over, but if actual repair work to an extent is - * completed, that extent won't need to be repaired again. - * - * The three phases are: - * - * Collect: - * When a Downstairs connects, the Upstairs collects the gen/flush/dirty - * (GFD) info from all extents. This GFD information is stored and the - * Upstairs waits for all three Downstairs to attach. - * - * Compare: - * In the compare phase, the upstairs will walk the list of all extents - * and compare the G/F/D from each of the downstairs. When there is a - * mismatch between downstairs (The dirty bit counts as a mismatch and - * will force a repair even if generation and flush numbers agree). For - * each mismatch, the upstairs determines which downstairs has the - * extent that should be the source, and which of the other downstairs - * extents needs repair. This list of mismatches (source, - * destination(s)) is collected. Once an upstairs has compiled its - * repair list, it will then generates a sequence of Upstairs -> - * Downstairs repair commands to repair each extent that needs to be - * fixed. For a given piece of repair work, the commands are: - * - Send a flush to source extent. - * - Close extent on all downstairs. - * - Send repair command to destination extents (with source extent - * IP/Port). - * (See DS-DS Repair) - * - Reopen all extents. - * - * Repair: - * During repair Each command issued from the upstairs must be completed - * before the next will be sent. The Upstairs is responsible for walking - * the repair commands and sending them to the required downstairs, and - * waiting for them to finish. The actual repair work for an extent - * takes place on the downstairs being repaired. - * - * Repair (ds to ds) - * Each downstairs runs a repair server (Dropshot) that listens for - * repair requests from other downstairs. A downstairs with an extent - * that needs repair will contact the source downstairs and request the - * list of files for an extent, then request each file. Once all files - * are local to the downstairs needing repair, it will replace the - * existing extent files with the new ones. - */ - let collate_status = { - /* - * Reconciliation only happens during initialization. - * Look at all three downstairs region information collected. - * Determine the highest flush number and make sure our generation - * is high enough. - */ - if !matches!(&self.state, UpstairsState::GoActive(..)) { - info!( - self.log, - "could not connect region set due to bad state: {:?}", - self.state - ); - return false; + /// + /// # Notes on reconciliation + /// If reconciliation is required, it happens in three phases. Failure + /// during reconciliation will result in the upstairs being disabled (and + /// replying to the activation request with an error); if actual repair work + /// to an extent has been completed before the failure, that extent won't + /// need to be repaired again. + /// + /// The three phases are as follows: + /// + /// ## Region metadata collection: + /// When a Downstairs connects, the Upstairs collects the gen/flush/dirty + /// (GFD) info from all extents. This GFD information is stored in + /// `NegotiationState::WaitQuorum`, and the Upstairs waits for more + /// Downstairs to attach. + /// + /// ## Comparing metadata + /// In the compare phase, the upstairs will walk the list of all extents and + /// compare the G/F/D from each of the downstairs. Mismatches are detected + /// if the gen or flush values differ, or if the dirty bit is set (which + /// forces a repair even if generation and flush numbers agree). + /// + /// For each mismatch, the upstairs determines which downstairs has the + /// extent that should be the source, and which of the other downstairs + /// extents needs repair. This list of mismatches (source, destination(s)) + /// is collected. + /// + /// Once an upstairs has compiled its repair list, it will then generates a + /// sequence of Upstairs -> Downstairs repair commands to repair each extent + /// that needs to be fixed. For a given piece of repair work, the commands + /// are: + /// + /// - Send a flush to source extent. + /// - Close extent on all downstairs. + /// - Send repair command to destination extents (with source extent + /// IP / port). + /// - Reopen all extents. + /// + /// ## Repair (upstairs) + /// During repair, each command issued from the upstairs must be completed + /// before the next will be sent. The Upstairs is responsible for walking + /// the repair commands and sending them to the required downstairs, and + /// waiting for them to finish. The actual repair work for an extent takes + /// place on the downstairs being repaired. + /// + /// ## Repair (downstairs-to-downstairs) + /// Each downstairs runs a repair server (Dropshot) that listens for + /// repair requests from other downstairs. A downstairs with an extent + /// that needs repair will contact the source downstairs and request the + /// list of files for an extent, then request each file. Once all files + /// are local to the downstairs needing repair, it will replace the + /// existing extent files with the new ones. + fn on_wait_quorum(&mut self) { + // Reconciliation only happens during initialization. + let UpstairsState::GoActive { + min_quorum_deadline, + .. + } = &mut self.state + else { + // XXX should this panic instead? + info!( + self.log, + "could not connect region set due to bad state: {:?}", + self.state + ); + return; + }; + + let is_ready = self.downstairs.clients.map_ref(|c| { + matches!( + c.state(), + DsState::Connecting { + state: NegotiationState::WaitQuorum, + .. + } + ) + }); + let ready_count = is_ready.iter().filter(|c| **c).count(); + + match ready_count { + 0 => panic!("called on_wait_quorum with no WaitQuorum downstairs"), + 1 => return, // nothing to do yet + 2 => { + // Print a warning if `min_quorum` is already present. This + // would be a little weird, because we're just now entering the + // min-quorum state, but could be possible if a Downstairs has + // disconnected (i.e. we've gone from 2 -> 1 -> 2 downstairs in + // WaitQuorum). + if min_quorum_deadline.is_some() { + warn!( + self.log, + "entered min-quorum state with \ + `min_quorum` already present" + ) + } + *min_quorum_deadline = Some(Instant::now() + NEGOTIATION_DELAY); + return; // nothing to do yet } - /* - * Make sure all downstairs are in the correct state before we - * proceed. - */ - let ready = self.downstairs.clients.iter().all(|c| { - matches!( - c.state(), - DsState::Connecting { - state: NegotiationState::WaitQuorum, - .. - } - ) - }); - if !ready { - info!(self.log, "Waiting for more clients to be ready"); - return false; + 3 => { + if min_quorum_deadline.is_some() { + info!(self.log, "cancelling min-quorum reconciliation"); + *min_quorum_deadline = None; + } } - - /* - * We figure out if there is any reconciliation to do, and if so, we - * build the list of operations that will repair the extents that - * are not in sync. - * - * If we fail to collate, then we need to kick out all the - * downstairs out, forget any activation requests, and the - * upstairs goes back to waiting for another activation request. - */ - self.downstairs.collate() + _ => unreachable!(), }; - match collate_status { + // At this point, we know that all 3x Downstairs are ready: + assert_eq!(ready_count, 3); + + match self.downstairs.collate() { Err(e) => { error!(self.log, "Failed downstairs collate with: {}", e); // We failed to collate the three downstairs, so we need @@ -1970,20 +2014,90 @@ impl Upstairs { // clients. self.set_disabled(e.into()); self.downstairs.abort_reconciliation(&self.state); - false } Ok(true) => { // We have populated all of the reconciliation requests in // `Downstairs::reconcile_task_list`. Start reconciliation by // sending the first request. self.downstairs.send_next_reconciliation_req(); - true } Ok(false) => { info!(self.log, "No downstairs reconciliation required"); self.on_reconciliation_done(false); info!(self.log, "Set Active after no reconciliation"); - true + } + } + } + + /// Start min-quorum reconciliation + fn on_min_quorum(&mut self) { + // Reconciliation only happens during initialization. + let UpstairsState::GoActive { + min_quorum_deadline, + .. + } = &mut self.state + else { + warn!( + self.log, + "min-quorum negotiation found upstairs state {:?}; cancelling", + self.state + ); + return; + }; + assert!(min_quorum_deadline.is_some()); + *min_quorum_deadline = None; + + let is_ready = self.downstairs.clients.map_ref(|c| { + matches!( + c.state(), + DsState::Connecting { + state: NegotiationState::WaitQuorum, + .. + } + ) + }); + let ready_count = is_ready.iter().filter(|c| **c).count(); + if ready_count != 2 { + warn!( + self.log, + "min-quorum negotiation found {ready_count} \ + ready downstairs; cancelling" + ); + return; + } + info!(self.log, "Starting min-quorum negotiation"); + + match self.downstairs.collate() { + Err(e) => { + error!(self.log, "Failed downstairs collate with: {e}"); + // We failed to collate the two downstairs, so we need to reset + // that activation request. Call `abort_reconciliation` to + // abort reconciliation for all clients. + self.set_disabled(e.into()); + self.downstairs.abort_reconciliation(&self.state); + } + Ok(true) => { + // We have populated all of the reconciliation requests in + // `Downstairs::reconcile_task_list`. Start reconciliation by + // sending the first request. + self.downstairs.send_next_reconciliation_req(); + + // Move the unready Downstairs into the "connect through + // live-repair path" for negotiation. + let unready_ds = + ClientId::iter().find(|i| !is_ready[*i]).unwrap(); + + info!( + self.log, + "Requiring {unready_ds} to connect through live-repair" + ); + self.downstairs.clients[unready_ds] + .set_connection_mode_faulted(); + } + Ok(false) => { + info!(self.log, "No downstairs reconciliation required"); + self.on_reconciliation_done(false); + info!(self.log, "Set Active after no reconciliation"); } } } @@ -2001,14 +2115,14 @@ impl Upstairs { info!(self.log, "Reconciliation skipped"); match &self.state { - UpstairsState::GoActive(..) => { + UpstairsState::GoActive { .. } => { // If we are not active yet (this is the first downstairs) then // go ahead and set ourselves active. info!(self.log, "Set Downstairs and Upstairs active"); self.downstairs.on_reconciliation_skipped(true); // Swap out the state for UpstairsState::Active - let UpstairsState::GoActive(res) = + let UpstairsState::GoActive { res, .. } = std::mem::replace(&mut self.state, UpstairsState::Active) else { unreachable!(); // We just matched! @@ -2051,7 +2165,7 @@ impl Upstairs { "Set Downstairs and Upstairs active after reconciliation" ); - if !matches!(self.state, UpstairsState::GoActive(..)) { + if !matches!(self.state, UpstairsState::GoActive { .. }) { error!( self.log, "reconciliation done, but upstairs is no longer GoActive: {:?}", @@ -2061,7 +2175,7 @@ impl Upstairs { } // Swap out the state for UpstairsState::Active - let UpstairsState::GoActive(res) = + let UpstairsState::GoActive { res, .. } = std::mem::replace(&mut self.state, UpstairsState::Active) else { unreachable!(); // checked above @@ -2155,7 +2269,7 @@ impl Upstairs { Ok(()) } UpstairsState::Active => Ok(()), - UpstairsState::GoActive(..) => { + UpstairsState::GoActive { .. } => { Err(CrucibleError::UpstairsActivateInProgress) } UpstairsState::Deactivating(..) => { @@ -2175,7 +2289,7 @@ impl Upstairs { &mut self.state, UpstairsState::Disabled(err.clone()), ); - if let UpstairsState::GoActive(res) = prev { + if let UpstairsState::GoActive { res, .. } = prev { res.send_err(err); } for c in ClientId::iter() { @@ -2310,7 +2424,7 @@ pub(crate) mod test { #[test] fn reconcile_not_ready() { - // Verify reconcile returns false when a downstairs is not ready + // Verify reconcile doesn't start when a downstairs is not ready let mut up = Upstairs::test_default(None, false); for cid in [ClientId::new(0), ClientId::new(1)] { for state in [ @@ -2330,10 +2444,32 @@ pub(crate) mod test { } } let (_rx, done) = BlockOpWaiter::pair(); - up.state = UpstairsState::GoActive(done); + up.state = UpstairsState::GoActive { + res: done, + min_quorum_deadline: None, + }; + + up.on_wait_quorum(); - let res = up.connect_region_set(); - assert!(!res); + // DS0 and DS1 are still in WaitQuorum + for cid in [ClientId::new(0), ClientId::new(1)] { + assert!(matches!( + up.ds_state(cid), + DsState::Connecting { + state: NegotiationState::WaitQuorum, + .. + } + )); + } + // DS2 is still in WaitConnect + assert!(matches!( + up.ds_state(ClientId::new(2)), + DsState::Connecting { + state: NegotiationState::WaitConnect, + .. + } + )); + // The activation hasn't happened assert!(!matches!(&up.state, &UpstairsState::Active)) } @@ -2359,7 +2495,10 @@ pub(crate) mod test { ); } let (_rx, done) = BlockOpWaiter::pair(); - up.state = UpstairsState::GoActive(done); + up.state = UpstairsState::GoActive { + res: done, + min_quorum_deadline: None, + }; up.connect_ro_region_set(); assert!(matches!(&up.state, &UpstairsState::Active)); @@ -2389,7 +2528,10 @@ pub(crate) mod test { } } let (_rx, done) = BlockOpWaiter::pair(); - up.state = UpstairsState::GoActive(done); + up.state = UpstairsState::GoActive { + res: done, + min_quorum_deadline: None, + }; up.connect_ro_region_set(); assert!(matches!(&up.state, &UpstairsState::Active));