Skip to content

Commit 1c75dc7

Browse files
committed
Min-quorum negotiation
1 parent 1b91f6c commit 1c75dc7

File tree

5 files changed

+629
-256
lines changed

5 files changed

+629
-256
lines changed

integration_tests/src/lib.rs

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,24 @@ mod test {
123123
Ok(())
124124
}
125125

126+
/// Stops the downstairs task, return a `(port, rport)` tuple
127+
pub async fn stop(&mut self) -> Result<(u16, u16)> {
128+
let ds = self.downstairs.take().unwrap();
129+
let port = ds.address().port();
130+
let rport = ds.repair_address().port();
131+
ds.stop().await?;
132+
Ok((port, rport))
133+
}
134+
126135
pub async fn reboot_read_write(&mut self) -> Result<()> {
136+
self.reboot_read_write_with_ports(0, 0).await
137+
}
138+
139+
pub async fn reboot_read_write_with_ports(
140+
&mut self,
141+
port: u16,
142+
rport: u16,
143+
) -> Result<()> {
127144
let downstairs =
128145
Downstairs::new_builder(self.tempdir.path(), false)
129146
.set_logger(csl())
@@ -134,6 +151,8 @@ mod test {
134151
downstairs,
135152
DownstairsClientSettings {
136153
address: self.address,
154+
port,
155+
rport,
137156
..DownstairsClientSettings::default()
138157
},
139158
)
@@ -5839,4 +5858,156 @@ mod test {
58395858
// Make sure everything worked
58405859
volume.activate().await.unwrap();
58415860
}
5861+
5862+
#[tokio::test]
5863+
async fn connect_two_ds_then_deactivate() {
5864+
const BLOCK_SIZE: usize = 512;
5865+
5866+
// Spin off three downstairs, build our Crucible struct.
5867+
let mut tds = TestDownstairsSet::small(false).await.unwrap();
5868+
let opts = tds.opts();
5869+
tds.downstairs1.stop().await.unwrap();
5870+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
5871+
5872+
let (guest, io) = Guest::new(None);
5873+
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
5874+
guest.activate().await.unwrap();
5875+
5876+
let res = guest
5877+
.write(
5878+
BlockIndex(0),
5879+
BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()),
5880+
)
5881+
.await;
5882+
assert!(res.is_ok());
5883+
5884+
guest.deactivate().await.unwrap();
5885+
}
5886+
5887+
#[tokio::test]
5888+
async fn connect_two_ds_then_another() {
5889+
const BLOCK_SIZE: usize = 512;
5890+
5891+
// Spin off three downstairs, build our Crucible struct.
5892+
let mut tds = TestDownstairsSet::small(false).await.unwrap();
5893+
let opts = tds.opts();
5894+
let (ds1_port, ds1_rport) = tds.downstairs1.stop().await.unwrap();
5895+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
5896+
5897+
let (guest, io) = Guest::new(None);
5898+
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
5899+
guest.activate().await.unwrap();
5900+
5901+
let res = guest
5902+
.write(
5903+
BlockIndex(0),
5904+
BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()),
5905+
)
5906+
.await;
5907+
assert!(res.is_ok());
5908+
5909+
// Restart downstairs1, which should use live-repair to join the quorum
5910+
//
5911+
// We have to wait a while here, because there's a 10-second reconnect
5912+
// delay.
5913+
tds.downstairs1
5914+
.reboot_read_write_with_ports(ds1_port, ds1_rport)
5915+
.await
5916+
.unwrap();
5917+
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;
5918+
guest.deactivate().await.unwrap();
5919+
5920+
// Reconnect with only ds1 running, then confirm that it received the
5921+
// writes. We'll come up in read-only mode so that we can connect with
5922+
// just a single Downstairs, to make sure the reads go to DS1.
5923+
tds.downstairs1.reboot_read_only().await.unwrap();
5924+
tds.downstairs2.stop().await.unwrap();
5925+
tds.downstairs3.stop().await.unwrap();
5926+
tds.crucible_opts.read_only = true;
5927+
tds.crucible_opts.target[0] = tds.downstairs1.address();
5928+
let opts = tds.opts();
5929+
let (guest, io) = Guest::new(None);
5930+
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
5931+
guest.activate().await.unwrap();
5932+
let mut buf = Buffer::new(2, BLOCK_SIZE);
5933+
guest.read(BlockIndex(0), &mut buf).await.unwrap();
5934+
5935+
assert_eq!(buf.to_vec(), vec![0x55; BLOCK_SIZE * 2]);
5936+
}
5937+
5938+
#[tokio::test]
5939+
async fn min_quorum_live_repair() {
5940+
const BLOCK_SIZE: usize = 512;
5941+
5942+
// Spin off three downstairs, build our Crucible struct.
5943+
let mut tds = TestDownstairsSet::small(false).await.unwrap();
5944+
5945+
// Stop downstairs 1 before constructing the guest, so it won't be
5946+
// included and we'll do min-quorum reconciliation.
5947+
let (port, rport) = tds.downstairs1.stop().await.unwrap();
5948+
5949+
// Start the guest and do a write to ds 2 and 3.
5950+
let (guest, io) = Guest::new(None);
5951+
let opts = tds.opts();
5952+
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
5953+
guest.activate().await.unwrap();
5954+
let res = guest
5955+
.write(
5956+
BlockIndex(0),
5957+
BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()),
5958+
)
5959+
.await;
5960+
assert!(res.is_ok());
5961+
5962+
// Deactivate the guest, all without downstairs 1 participating
5963+
guest.deactivate().await.unwrap();
5964+
5965+
// At this point, the data has been written to DS 2 and 3. We'll start
5966+
// up again with DS 1 and 2, so min-quorum should do reconciliation.
5967+
5968+
tds.downstairs1
5969+
.reboot_read_write_with_ports(port, rport)
5970+
.await
5971+
.unwrap();
5972+
tds.downstairs2.stop().await.unwrap();
5973+
guest.activate_with_gen(2).await.unwrap();
5974+
5975+
let mut buf = Buffer::new(2, BLOCK_SIZE);
5976+
guest.read(BlockIndex(0), &mut buf).await.unwrap();
5977+
5978+
assert_eq!(buf.to_vec(), vec![0x55; BLOCK_SIZE * 2]);
5979+
}
5980+
5981+
#[tokio::test]
5982+
async fn min_quorum_cancel() {
5983+
// Spin off three downstairs, build our Crucible struct.
5984+
let mut tds = TestDownstairsSet::small(false).await.unwrap();
5985+
5986+
// Stop downstairs 1 before constructing the guest, so it won't be
5987+
// included and we'll do min-quorum reconciliation.
5988+
let (port, rport) = tds.downstairs1.stop().await.unwrap();
5989+
5990+
// Start the guest and do a write to ds 2 and 3.
5991+
let (guest, io) = Guest::new(None);
5992+
let opts = tds.opts();
5993+
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
5994+
let s = tokio::spawn(async move { guest.activate().await });
5995+
5996+
// Get into our min-quorum wait, which is 500 ms
5997+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
5998+
5999+
// Stop DS2
6000+
tds.downstairs2.stop().await.unwrap();
6001+
6002+
// Wait for the min-quorum timer to go off; it shouldn't panic!
6003+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
6004+
6005+
// Restart DS1, we're now eligible for min-quorum negotiation again
6006+
tds.downstairs1
6007+
.reboot_read_write_with_ports(port, rport)
6008+
.await
6009+
.unwrap();
6010+
6011+
s.await.unwrap().unwrap()
6012+
}
58426013
}

upstairs/src/client.rs

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -432,22 +432,29 @@ impl DownstairsClient {
432432
self.skipped_jobs.insert(ds_id);
433433
}
434434

435-
/// Sets our state to `DsStateData::Reconcile`
436-
///
437-
/// # Panics
438-
/// If the current state is invalid
439-
pub(crate) fn begin_reconcile(&mut self) {
440-
info!(self.log, "Transition from {:?} to Reconcile", self.state());
441-
let DsStateData::Connecting { state, mode } = &mut self.state else {
442-
panic!(
443-
"invalid state {:?} for client {}",
444-
self.state(),
445-
self.client_id
446-
);
447-
};
448-
assert_eq!(state.discriminant(), NegotiationState::WaitQuorum);
449-
assert_eq!(mode, &ConnectionMode::New);
450-
*state = NegotiationStateData::Reconcile;
435+
/// Sets our state to `DsStateData::Reconcile` if we are in `WaitQuorum`
436+
pub(crate) fn begin_reconcile(&mut self) -> bool {
437+
info!(
438+
self.log,
439+
"setting state to reconcile from {:?}",
440+
self.state()
441+
);
442+
let mut changed = false;
443+
if let DsStateData::Connecting {
444+
state,
445+
mode: ConnectionMode::New,
446+
..
447+
} = &mut self.state
448+
{
449+
if matches!(state, NegotiationStateData::WaitQuorum(..)) {
450+
*state = NegotiationStateData::Reconcile;
451+
changed = true;
452+
}
453+
}
454+
if !changed {
455+
warn!(self.log, "invalid state for reconciliation, skipping");
456+
}
457+
changed
451458
}
452459

453460
/// Checks whether this Downstairs is ready for the upstairs to deactivate
@@ -498,7 +505,7 @@ impl DownstairsClient {
498505
// If the upstairs is already active (or trying to go active), then we
499506
// should automatically connect to the Downstairs.
500507
let auto_connect = match up_state {
501-
UpstairsState::Active | UpstairsState::GoActive(..) => true,
508+
UpstairsState::Active | UpstairsState::GoActive { .. } => true,
502509
UpstairsState::Disabled(..)
503510
| UpstairsState::Initializing
504511
| UpstairsState::Deactivating { .. } => false,
@@ -539,7 +546,7 @@ impl DownstairsClient {
539546
match up_state {
540547
// If we haven't activated yet (or we're deactivating) then
541548
// start from New
542-
UpstairsState::GoActive(..)
549+
UpstairsState::GoActive { .. }
543550
| UpstairsState::Initializing
544551
| UpstairsState::Disabled(..)
545552
| UpstairsState::Deactivating { .. } => ConnectionMode::New,
@@ -553,7 +560,7 @@ impl DownstairsClient {
553560
match up_state {
554561
// If we haven't activated yet (or we're deactivating), then
555562
// start from New
556-
UpstairsState::GoActive(..)
563+
UpstairsState::GoActive { .. }
557564
| UpstairsState::Initializing
558565
| UpstairsState::Disabled(..)
559566
| UpstairsState::Deactivating { .. } => ConnectionMode::New,
@@ -749,17 +756,18 @@ impl DownstairsClient {
749756
///
750757
/// This changes the subsequent path through negotiation, without restarting
751758
/// the client IO task. Doing so is safe because the faulted path is
752-
/// a superset of the offline path.
759+
/// a superset of all other paths.
753760
///
754761
/// # Panics
755-
/// If we are not in `DsStateData::Connecting { mode: ConnectionMode::Offline,
756-
/// .. }`
762+
/// If we are not in `DsStateData::Connecting { .. }`
757763
pub(crate) fn set_connection_mode_faulted(&mut self) {
758-
let DsStateData::Connecting { mode, .. } = &mut self.state else {
764+
let DsStateData::Connecting { mode, state } = &mut self.state else {
759765
panic!("not connecting");
760766
};
761-
assert_eq!(*mode, ConnectionMode::Offline);
762-
*mode = ConnectionMode::Faulted
767+
*mode = ConnectionMode::Faulted;
768+
if matches!(*state, NegotiationStateData::WaitQuorum(..)) {
769+
*state = NegotiationStateData::LiveRepairReady;
770+
}
763771
}
764772

765773
/// Applies an [`EnqueueResult`] for the given job
@@ -1445,7 +1453,7 @@ impl DownstairsClient {
14451453
// downstairs here.
14461454
match up_state {
14471455
UpstairsState::Initializing
1448-
| UpstairsState::GoActive(_) => {
1456+
| UpstairsState::GoActive { .. } => {
14491457
warn!(
14501458
self.log,
14511459
"Replace {} with {} before active",
@@ -1578,7 +1586,7 @@ impl DownstairsClient {
15781586
if matches!(
15791587
up_state,
15801588
UpstairsState::Initializing
1581-
| UpstairsState::GoActive(..)
1589+
| UpstairsState::GoActive { .. }
15821590
) =>
15831591
{
15841592
*state = NegotiationStateData::WaitQuorum(dsr);

0 commit comments

Comments
 (0)