diff --git a/trust-quorum/src/coordinator_state.rs b/trust-quorum/src/coordinator_state.rs index 89781dabbc7..ac556ac02ca 100644 --- a/trust-quorum/src/coordinator_state.rs +++ b/trust-quorum/src/coordinator_state.rs @@ -4,10 +4,11 @@ //! State of a reconfiguration coordinator inside a [`crate::Node`] +use crate::NodeHandlerCtx; use crate::crypto::{LrtqShare, Sha3_256Digest, ShareDigestLrtq}; use crate::messages::PeerMsg; use crate::validators::{ReconfigurationError, ValidatedReconfigureMsg}; -use crate::{Configuration, Envelope, Epoch, PeerMsgKind, PlatformId}; +use crate::{Configuration, Epoch, PeerMsgKind, PlatformId}; use gfss::shamir::Share; use slog::{Logger, o, warn}; use std::collections::{BTreeMap, BTreeSet}; @@ -147,7 +148,8 @@ impl CoordinatorState { // // This method is "in progress" - allow unused parameters for now #[expect(unused)] - pub fn send_msgs(&mut self, now: Instant, outbox: &mut Vec) { + pub fn send_msgs(&mut self, ctx: &mut impl NodeHandlerCtx) { + let now = ctx.now(); if now < self.retry_deadline { return; } @@ -165,14 +167,13 @@ impl CoordinatorState { for (platform_id, (config, share)) in prepares.clone().into_iter() { - outbox.push(Envelope { - to: platform_id, - from: self.reconfigure_msg.coordinator_id().clone(), - msg: PeerMsg { + ctx.send( + platform_id, + PeerMsg { rack_id, kind: PeerMsgKind::Prepare { config, share }, }, - }); + ); } } } diff --git a/trust-quorum/src/crypto.rs b/trust-quorum/src/crypto.rs index 33f61a4713a..616e51014bf 100644 --- a/trust-quorum/src/crypto.rs +++ b/trust-quorum/src/crypto.rs @@ -32,7 +32,9 @@ use crate::{Epoch, Threshold}; const LRTQ_SHARE_SIZE: usize = 33; // The size in bytes of a single rack secret -const SECRET_LEN: usize = 32; +// +// Public only for docs +pub const SECRET_LEN: usize = 32; // The size in bytes of an `Epoch` const EPOCH_LEN: usize = size_of::(); diff --git a/trust-quorum/src/lib.rs b/trust-quorum/src/lib.rs index b1dfe75d553..b4c3b73af5d 100644 --- a/trust-quorum/src/lib.rs +++ b/trust-quorum/src/lib.rs @@ -17,6 +17,7 @@ mod coordinator_state; pub(crate) mod crypto; mod messages; mod node; +mod node_ctx; mod persistent_state; mod validators; pub use configuration::Configuration; @@ -24,6 +25,9 @@ pub(crate) use coordinator_state::CoordinatorState; pub use crypto::RackSecret; pub use messages::*; pub use node::Node; +// public only for docs. +pub use node_ctx::NodeHandlerCtx; +pub use node_ctx::{NodeCallerCtx, NodeCommonCtx, NodeCtx}; pub use persistent_state::{PersistentState, PersistentStateSummary}; #[derive( diff --git a/trust-quorum/src/node.rs b/trust-quorum/src/node.rs index 246d086adf5..1c076e679fd 100644 --- a/trust-quorum/src/node.rs +++ b/trust-quorum/src/node.rs @@ -16,11 +16,8 @@ //! Node, and so this should not be problematic. use crate::validators::{ReconfigurationError, ValidatedReconfigureMsg}; -use crate::{ - CoordinatorState, Envelope, Epoch, PersistentState, PlatformId, messages::*, -}; +use crate::{CoordinatorState, Epoch, NodeHandlerCtx, PlatformId, messages::*}; use slog::{Logger, error, o, warn}; -use std::time::Instant; /// An entity capable of participating in trust quorum /// @@ -30,30 +27,16 @@ use std::time::Instant; pub struct Node { log: Logger, - /// The unique hardware ID of a sled - platform_id: PlatformId, - - /// State that gets persistenly stored in ledgers - persistent_state: PersistentState, - /// In memory state for when this node is coordinating a reconfiguration coordinator_state: Option, } impl Node { - pub fn new( - log: Logger, - platform_id: PlatformId, - persistent_state: PersistentState, - ) -> Node { - let id_str = format!("{platform_id:?}"); + pub fn new(log: Logger, ctx: &mut impl NodeHandlerCtx) -> Node { + let id_str = format!("{:?}", ctx.platform_id()); let log = log.new(o!("component" => "trust-quorum", "platform_id" => id_str)); - Node { log, platform_id, persistent_state, coordinator_state: None } - } - - pub fn platform_id(&self) -> &PlatformId { - &self.platform_id + Node { log, coordinator_state: None } } /// Start coordinating a reconfiguration @@ -64,58 +47,54 @@ impl Node { /// For upgrading from LRTQ, use `coordinate_upgrade_from_lrtq` pub fn coordinate_reconfiguration( &mut self, - now: Instant, - outbox: &mut Vec, + ctx: &mut impl NodeHandlerCtx, msg: ReconfigureMsg, - ) -> Result, ReconfigurationError> { + ) -> Result<(), ReconfigurationError> { let Some(validated_msg) = ValidatedReconfigureMsg::new( &self.log, - &self.platform_id, + ctx.platform_id(), msg, - (&self.persistent_state).into(), + ctx.persistent_state().into(), self.coordinator_state.as_ref().map(|cs| cs.reconfigure_msg()), )? else { // This was an idempotent (duplicate) request. - return Ok(None); + return Ok(()); }; - let persistent_state = - self.set_coordinator_state(now, validated_msg)?; - self.send_coordinator_msgs(now, outbox); - Ok(persistent_state) + self.set_coordinator_state(ctx, validated_msg)?; + self.send_coordinator_msgs(ctx); + Ok(()) } /// Process a timer tick /// /// Ticks are issued by the caller in order to move the protocol forward. /// The current time is passed in to make the calls deterministic. - pub fn tick(&mut self, now: Instant, outbox: &mut Vec) { - self.send_coordinator_msgs(now, outbox); + pub fn tick(&mut self, ctx: &mut impl NodeHandlerCtx) { + self.send_coordinator_msgs(ctx); } /// Handle a message from another node pub fn handle( &mut self, - _now: Instant, - _outbox: &mut Vec, + ctx: &mut impl NodeHandlerCtx, from: PlatformId, msg: PeerMsg, - ) -> Option { - if let Some(rack_id) = self.persistent_state.rack_id() { + ) { + if let Some(rack_id) = ctx.persistent_state().rack_id() { if rack_id != msg.rack_id { error!(self.log, "Mismatched rack id"; "from" => %from, "msg" => msg.kind.name(), "expected" => %rack_id, "got" => %msg.rack_id); - return None; + return; } } match msg.kind { PeerMsgKind::PrepareAck(epoch) => { self.handle_prepare_ack(from, epoch); - None } _ => todo!( "cannot handle message variant yet - not implemented: {msg:?}" @@ -154,16 +133,12 @@ impl Node { } // Send any required messages as a reconfiguration coordinator - fn send_coordinator_msgs( - &mut self, - now: Instant, - outbox: &mut Vec, - ) { + fn send_coordinator_msgs(&mut self, ctx: &mut impl NodeHandlerCtx) { // This function is called unconditionally in `tick` callbacks. In this // case we may not actually be a coordinator. We ignore the call in // that case. if let Some(c) = self.coordinator_state.as_mut() { - c.send_msgs(now, outbox); + c.send_msgs(ctx); } } @@ -175,53 +150,51 @@ impl Node { /// we have a `ValidatedReconfigureMsg`. fn set_coordinator_state( &mut self, - now: Instant, + ctx: &mut impl NodeHandlerCtx, msg: ValidatedReconfigureMsg, - ) -> Result, ReconfigurationError> { + ) -> Result<(), ReconfigurationError> { // We have no committed configuration or lrtq ledger - if self.persistent_state.is_uninitialized() { + if ctx.persistent_state().is_uninitialized() { let (coordinator_state, my_config, my_share) = CoordinatorState::new_uninitialized( self.log.clone(), - now, + ctx.now(), msg, )?; self.coordinator_state = Some(coordinator_state); - self.persistent_state.shares.insert(my_config.epoch, my_share); - self.persistent_state - .configs - .insert_unique(my_config) - .expect("empty state"); + ctx.update_persistent_state(move |ps| { + ps.shares.insert(my_config.epoch, my_share); + ps.configs.insert_unique(my_config).expect("empty state"); + true + }); - return Ok(Some(self.persistent_state.clone())); + return Ok(()); } // We have a committed configuration that is not LRTQ let config = - self.persistent_state.latest_committed_configuration().unwrap(); + ctx.persistent_state().latest_committed_configuration().unwrap(); self.coordinator_state = Some(CoordinatorState::new_reconfiguration( self.log.clone(), - now, + ctx.now(), msg, &config, )?); - Ok(None) + Ok(()) } } #[cfg(test)] mod tests { - use std::time::Duration; - - use crate::{Epoch, Threshold}; - use super::*; + use crate::{Epoch, NodeCallerCtx, NodeCommonCtx, NodeCtx, Threshold}; use assert_matches::assert_matches; use omicron_test_utils::dev::test_setup_log; use omicron_uuid_kinds::RackUuid; use proptest::prelude::*; + use std::time::Duration; use test_strategy::{Arbitrary, proptest}; fn arb_member() -> impl Strategy { @@ -259,21 +232,21 @@ mod tests { let logctx = test_setup_log("initial_configuration"); let my_platform_id = input.reconfigure_msg.members.first().unwrap().clone(); - let mut node = Node::new( - logctx.log.clone(), - my_platform_id.clone(), - PersistentState::empty(), - ); + let mut ctx = NodeCtx::new(my_platform_id.clone()); + let mut node = Node::new(logctx.log.clone(), &mut ctx); + + node.coordinate_reconfiguration( + &mut ctx, + input.reconfigure_msg.clone(), + ) + .expect("success"); + + // An initial configuraration always causes a change to persistent state + assert!(ctx.persistent_state_change_check_and_reset()); + // Checking if the persistent state has changed above cleared the bit + assert!(!ctx.persistent_state_change_check_and_reset()); - let mut outbox = Vec::new(); - let persistent_state = node - .coordinate_reconfiguration( - Instant::now(), - &mut outbox, - input.reconfigure_msg.clone(), - ) - .expect("success") - .expect("persistent state"); + let persistent_state = ctx.persistent_state().clone(); // A PersistentState should always be returned // It should include the `PrepareMsg` for this node. @@ -288,15 +261,15 @@ mod tests { persistent_state.configs.get(&input.reconfigure_msg.epoch).unwrap(); assert_eq!(config.epoch, input.reconfigure_msg.epoch); - assert_eq!(config.coordinator, *node.platform_id()); + assert_eq!(config.coordinator, *ctx.platform_id()); assert_eq!(config.members.len(), input.reconfigure_msg.members.len()); assert_eq!(config.threshold, input.reconfigure_msg.threshold); assert!(config.encrypted_rack_secrets.is_none()); // Ensure that prepare messages are properly put in the outbox to be // sent by the I/O parts of the codebase - assert_eq!(outbox.len(), config.members.len() - 1); - for envelope in outbox { + assert_eq!(ctx.num_envelopes(), config.members.len() - 1); + for envelope in ctx.drain_envelopes() { assert_matches!( envelope.msg.kind, PeerMsgKind::Prepare{ config: prepare_config, .. } => { diff --git a/trust-quorum/src/node_ctx.rs b/trust-quorum/src/node_ctx.rs new file mode 100644 index 00000000000..b50fcc9889e --- /dev/null +++ b/trust-quorum/src/node_ctx.rs @@ -0,0 +1,146 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Parameter to Node API calls that allows interaction with the system at large + +use crate::{Envelope, PeerMsg, PersistentState, PlatformId}; +use std::time::Instant; + +/// An API shared by [`NodeCallerCtx`] and [`NodeHandlerCtx`] +pub trait NodeCommonCtx { + fn platform_id(&self) -> &PlatformId; + fn now(&self) -> Instant; + fn persistent_state(&self) -> &PersistentState; +} + +/// An API for an [`NodeCtx`] usable from a [`crate::Node`] +pub trait NodeCallerCtx: NodeCommonCtx { + fn set_time(&mut self, now: Instant); + fn num_envelopes(&self) -> usize; + fn drain_envelopes(&mut self) -> impl Iterator; + fn envelopes(&self) -> impl Iterator; + + /// Check if the contained `PersistentState` has been mutated + /// + /// IMPORTANT: Calling this method resets the state of mutation to + /// `false`. This means that callers should only call this once after each + /// [`crate::Node`] API call and cache the result as necessary. This is also + /// why this method takes an + /// `&mut self`. + fn persistent_state_change_check_and_reset(&mut self) -> bool; +} + +/// An API for an [`NodeCtx`] usable from inside FSM states +pub trait NodeHandlerCtx: NodeCommonCtx { + fn send(&mut self, to: PlatformId, msg: PeerMsg); + + /// Attempt to update the persistent state inside the callback `f`. If + /// the state is updated, then `f` should return `true`, otherwise it should + /// return `false`. + /// + /// IMPORTANT: This method sets a bit indicating whether or not the + /// underlying `PersistentState` was mutated, for use by callers. This + /// method can safely be called multiple times. If any call mutates the + /// persistent state, then the bit will remain set. The bit is only cleared + /// when a caller calls `persistent_state_change_check_and_reset`. + fn update_persistent_state(&mut self, f: F) + where + F: FnOnce(&mut PersistentState) -> bool; +} + +/// Common parameter to [`crate::Node`] methods +/// +/// We separate access to this context via different APIs; namely [`NodeCallerCtx`] +/// and [`NodeHandlerCtx`]. This statically prevents both the caller and +/// [`crate::Node`] internals from performing improper mutations. +pub struct NodeCtx { + /// The unique hardware ID of a sled + platform_id: PlatformId, + + /// State that gets persistenly stored in ledgers + persistent_state: PersistentState, + + /// Was persistent_state modified by a call to `update_persistent_state`? + /// + /// This gets reset by reading the persistent state with + /// [`NodeCallerCtx::persistent_state_change_check_and_reset`]. + persistent_state_changed: bool, + + /// Outgoing messages destined for other peers + outgoing: Vec, + + /// The current time + now: Instant, +} + +impl NodeCtx { + pub fn new(platform_id: PlatformId) -> NodeCtx { + NodeCtx { + platform_id, + persistent_state: PersistentState::empty(), + persistent_state_changed: false, + outgoing: Vec::new(), + now: Instant::now(), + } + } +} + +impl NodeCommonCtx for NodeCtx { + fn platform_id(&self) -> &PlatformId { + &self.platform_id + } + + fn now(&self) -> Instant { + self.now + } + + fn persistent_state(&self) -> &PersistentState { + &self.persistent_state + } +} + +impl NodeHandlerCtx for NodeCtx { + fn send(&mut self, to: PlatformId, msg: PeerMsg) { + self.outgoing.push(Envelope { + to, + from: self.platform_id.clone(), + msg, + }); + } + + fn update_persistent_state(&mut self, f: F) + where + F: FnOnce(&mut PersistentState) -> bool, + { + // We don't ever revert from true to false, which allows calling this + // method multiple times in handler context. + if f(&mut self.persistent_state) { + self.persistent_state_changed = true + } + } +} + +impl NodeCallerCtx for NodeCtx { + fn set_time(&mut self, now: Instant) { + self.now = now; + } + + fn num_envelopes(&self) -> usize { + self.outgoing.len() + } + + fn drain_envelopes(&mut self) -> impl Iterator { + self.outgoing.drain(..) + } + + fn envelopes(&self) -> impl Iterator { + self.outgoing.iter() + } + + fn persistent_state_change_check_and_reset(&mut self) -> bool { + let changed = self.persistent_state_changed; + self.persistent_state_changed = false; + changed + } +} diff --git a/trust-quorum/tests/coordinator.rs b/trust-quorum/tests/coordinator.rs index 27531721dd5..84fc9bb17dd 100644 --- a/trust-quorum/tests/coordinator.rs +++ b/trust-quorum/tests/coordinator.rs @@ -16,8 +16,8 @@ use std::collections::{BTreeMap, BTreeSet}; use std::time::{Duration, Instant}; use test_strategy::{Arbitrary, proptest}; use trust_quorum::{ - Envelope, Epoch, Node, PeerMsg, PeerMsgKind, PersistentState, PlatformId, - ReconfigureMsg, Threshold, + Epoch, Node, NodeCallerCtx, NodeCommonCtx, NodeCtx, PeerMsg, PeerMsgKind, + PersistentState, PlatformId, ReconfigureMsg, Threshold, }; /// The system under test @@ -25,21 +25,19 @@ pub struct Sut { // The coordinator node which is the system under test (SUT) pub node: Node, - // The saved persistent state returned by the last Node operation - pub persistent_state: PersistentState, + // The context passed to `Node` api operations + pub ctx: NodeCtx, } impl Sut { pub fn action_coordinate_reconfiguration( &mut self, - now: Instant, - outbox: &mut Vec, msg: ReconfigureMsg, - ) -> Result, TestCaseError> { + ) -> Result<(), TestCaseError> { // We only generate valid configurations when calling this method. Any // failure of this method should be considered a test failure. - let output = self.node.coordinate_reconfiguration(now, outbox, msg)?; - Ok(output) + self.node.coordinate_reconfiguration(&mut self.ctx, msg)?; + Ok(()) } } @@ -184,11 +182,9 @@ struct TestState { impl TestState { pub fn new(log: Logger, coordinator_id: PlatformId) -> TestState { + let mut ctx = NodeCtx::new(coordinator_id); TestState { - sut: Sut { - node: Node::new(log, coordinator_id, PersistentState::empty()), - persistent_state: PersistentState::empty(), - }, + sut: Sut { node: Node::new(log, &mut ctx), ctx }, model: Model::new(), network_msgs: BTreeMap::new(), delivered_msgs: BTreeMap::new(), @@ -199,46 +195,35 @@ impl TestState { &mut self, msg: ReconfigureMsg, ) -> Result<(), TestCaseError> { - let mut outbox = Vec::new(); - // Update the model state self.model.action_coordinate_reconfiguration(msg.clone()); + // Save the prior persistent state before we coordinate and possibly + // mutate it + let prior_persistent_state = self.sut.ctx.persistent_state().clone(); + // Update the SUT state // // We only generate valid configurations when calling this method. Any // failure of this method should be considered a test failure. - let output = self.sut.action_coordinate_reconfiguration( - self.model.now, - &mut outbox, - msg, - )?; - - match output { - Some(persistent_state) => { - // The request succeeded - self.assert_persistent_state_after_coordinate_reconfiguration( - &persistent_state, - )?; - - // We validated our persistent state is correct. Save it and - // move on. - self.sut.persistent_state = persistent_state; - - // The correct messages were sent - self.assert_envelopes_after_coordinate_reconfiguration( - &outbox, - )?; - - // We validated our messages. Let's put them into our test state - // as "in-flight". - self.send(outbox.into_iter()); - } - None => { - // The request is idempotent - // No action should have been taken - prop_assert!(outbox.is_empty()); - } + self.sut.action_coordinate_reconfiguration(msg)?; + + if self.sut.ctx.persistent_state_change_check_and_reset() { + // The request succeeded + self.assert_persistent_state_after_coordinate_reconfiguration( + prior_persistent_state, + )?; + + // The correct messages were sent + self.assert_envelopes_after_coordinate_reconfiguration()?; + + // We validated our messages. Let's put them into our test state + // as "in-flight". + self.send_all_msgs(); + } else { + // The request is idempotent + // No action should have been taken + prop_assert_eq!(self.sut.ctx.num_envelopes(), 0); } Ok(()) @@ -327,11 +312,10 @@ impl TestState { &mut self, time_jump: Duration, ) -> Result<(), TestCaseError> { - let mut outbox = Vec::new(); - // Tell our model and the SUT that time has advanced let timer_expired = self.model.advance_time(time_jump); - self.sut.node.tick(self.model.now, &mut outbox); + self.sut.ctx.set_time(self.model.now); + self.sut.node.tick(&mut self.sut.ctx); // If time has advanced past the coordinator's retry deadline // then we must see if we expected any retries to be sent. @@ -341,7 +325,7 @@ impl TestState { // We aren't coordinating if members.is_empty() { - prop_assert!(outbox.is_empty()); + prop_assert_eq!(self.sut.ctx.num_envelopes(), 0); return Ok(()); } @@ -350,17 +334,17 @@ impl TestState { // has not received acks for. let expected: BTreeSet<_> = members.difference(acked_members).collect(); - for envelope in &outbox { + for envelope in self.sut.ctx.envelopes() { prop_assert!(expected.contains(&envelope.to)); } } else { // We aren't waiting on acks, so won't retry sending prepares - prop_assert!(outbox.is_empty()); + prop_assert_eq!(self.sut.ctx.num_envelopes(), 0); } } // Put any output messages onto the network - self.send(outbox.into_iter()); + self.send_all_msgs(); Ok(()) } @@ -379,15 +363,9 @@ impl TestState { // In any case, we don't keep enough state at the fake follower replicas // to check this. let reply = PeerMsg { rack_id, kind: PeerMsgKind::PrepareAck(epoch) }; - let mut outbox = Vec::new(); - let output = self.sut.node.handle( - self.model.now, - &mut outbox, - from.clone(), - reply, - ); - prop_assert!(output.is_none()); - prop_assert!(outbox.is_empty()); + self.sut.node.handle(&mut self.sut.ctx, from.clone(), reply); + prop_assert!(!self.sut.ctx.persistent_state_change_check_and_reset()); + prop_assert_eq!(self.sut.ctx.num_envelopes(), 0); // Also update the model state self.model.ack_prepare(from.clone(), epoch); @@ -401,13 +379,13 @@ impl TestState { Ok(()) } - /// Ensure that the output of `Node::coordinate_reconfiguration` - /// is valid given the `TestState`. + /// Ensure that the `PersistentState` modified as a result of + /// `Node::coordinate_reconfiguration` is valid given the `TestState`. /// /// This is essentially a "postcondition" check. fn assert_persistent_state_after_coordinate_reconfiguration( &self, - persistent_state: &PersistentState, + prior_persistent_state: PersistentState, ) -> Result<(), TestCaseError> { let sut = &self.sut; let msg = &self @@ -419,23 +397,31 @@ impl TestState { ))? .msg; - prop_assert!(persistent_state.lrtq.is_none()); - prop_assert_eq!( - &sut.persistent_state.commits, - &persistent_state.commits - ); - prop_assert!(persistent_state.expunged.is_none()); + // We aren't using lrtq + prop_assert!(prior_persistent_state.lrtq.is_none()); + + // No commits have occurred prop_assert_eq!( - sut.persistent_state.configs.len() + 1, - persistent_state.configs.len() + &sut.ctx.persistent_state().commits, + &prior_persistent_state.commits ); + prop_assert!(prior_persistent_state.expunged.is_none()); + + // A new configuration has been added prop_assert_eq!( - persistent_state.latest_config().unwrap().epoch, - msg.epoch + sut.ctx.persistent_state().configs.len(), + prior_persistent_state.configs.len() + 1 ); - let config = persistent_state.configuration(msg.epoch).unwrap(); + // The configuration epoch has advanced by 1 + let prior_config_epoch = + prior_persistent_state.latest_config().map_or(0, |c| c.epoch.0); + prop_assert_eq!(prior_config_epoch + 1, msg.epoch.0); + + // Our persistent state has been appropriately updated with a new config + let config = + sut.ctx.persistent_state().configuration(msg.epoch).unwrap(); prop_assert_eq!(config.epoch, msg.epoch); for member in config.members.keys() { prop_assert!(msg.members.contains(member)); @@ -452,10 +438,9 @@ impl TestState { /// Verify the expected messages are sent after calling /// `Node::coordinate_reconfiguration`. fn assert_envelopes_after_coordinate_reconfiguration( - &self, - outbox: &[Envelope], + &mut self, ) -> Result<(), TestCaseError> { - let sut = &self.sut; + let sut = &mut self.sut; let msg = &self .model .coordinator_state @@ -465,7 +450,12 @@ impl TestState { ))? .msg; - let config = sut.persistent_state.configuration(msg.epoch).unwrap(); + let config = sut + .ctx + .persistent_state() + .configuration(msg.epoch) + .unwrap() + .clone(); // Ensure the members of the configuration match the model msg prop_assert_eq!( @@ -474,15 +464,15 @@ impl TestState { ); // The coordinator should send messages to every node but itself - assert_eq!(outbox.len(), config.members.len() - 1); - for envelope in outbox { + assert_eq!(sut.ctx.num_envelopes(), config.members.len() - 1); + for envelope in sut.ctx.drain_envelopes() { assert_matches!( &envelope.msg, PeerMsg{ kind: PeerMsgKind::Prepare{config: prepare_config, .. }, ..} => { - assert_eq!(*config, *prepare_config); + assert_eq!(config, *prepare_config); } ); prop_assert_eq!(&envelope.from, &config.coordinator); @@ -495,8 +485,8 @@ impl TestState { Ok(()) } - fn send(&mut self, envelopes: impl Iterator) { - for envelope in envelopes { + fn send_all_msgs(&mut self) { + for envelope in self.sut.ctx.drain_envelopes() { let msgs = self.network_msgs.entry(envelope.to).or_default(); msgs.push(envelope.msg); }