Skip to content

TQ: Add NodeCtx for use in Node API #8629

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions trust-quorum/src/coordinator_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

//! State of a reconfiguration coordinator inside a [`crate::Node`]

use crate::NodeHandlerCtx;
use crate::crypto::{LrtqShare, Sha3_256Digest, ShareDigestLrtq};
use crate::messages::PeerMsg;
use crate::validators::{ReconfigurationError, ValidatedReconfigureMsg};
use crate::{Configuration, Envelope, Epoch, PeerMsgKind, PlatformId};
use crate::{Configuration, Epoch, PeerMsgKind, PlatformId};
use gfss::shamir::Share;
use slog::{Logger, o, warn};
use std::collections::{BTreeMap, BTreeSet};
Expand Down Expand Up @@ -147,7 +148,8 @@ impl CoordinatorState {
//
// This method is "in progress" - allow unused parameters for now
#[expect(unused)]
pub fn send_msgs(&mut self, now: Instant, outbox: &mut Vec<Envelope>) {
pub fn send_msgs(&mut self, ctx: &mut impl NodeHandlerCtx) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NodeHandlerCtx trait is not dyn safe (object safe) due to the generic parameter on update_persistent_state. Therefore we use a trait impl parameter. This also avoids dynamic dispatch, potentially at the cost of monomorphization during compilation.

let now = ctx.now();
if now < self.retry_deadline {
return;
}
Expand All @@ -165,14 +167,13 @@ impl CoordinatorState {
for (platform_id, (config, share)) in
prepares.clone().into_iter()
{
outbox.push(Envelope {
to: platform_id,
from: self.reconfigure_msg.coordinator_id().clone(),
msg: PeerMsg {
ctx.send(
platform_id,
PeerMsg {
rack_id,
kind: PeerMsgKind::Prepare { config, share },
},
});
);
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion trust-quorum/src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use crate::{Epoch, Threshold};
const LRTQ_SHARE_SIZE: usize = 33;

// The size in bytes of a single rack secret
const SECRET_LEN: usize = 32;
//
// Public only for docs
pub const SECRET_LEN: usize = 32;

// The size in bytes of an `Epoch`
const EPOCH_LEN: usize = size_of::<Epoch>();
Expand Down
4 changes: 4 additions & 0 deletions trust-quorum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ mod coordinator_state;
pub(crate) mod crypto;
mod messages;
mod node;
mod node_ctx;
mod persistent_state;
mod validators;
pub use configuration::Configuration;
pub(crate) use coordinator_state::CoordinatorState;
pub use crypto::RackSecret;
pub use messages::*;
pub use node::Node;
// public only for docs.
pub use node_ctx::NodeHandlerCtx;
pub use node_ctx::{NodeCallerCtx, NodeCommonCtx, NodeCtx};
pub use persistent_state::{PersistentState, PersistentStateSummary};

#[derive(
Expand Down
131 changes: 52 additions & 79 deletions trust-quorum/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
//! Node, and so this should not be problematic.

use crate::validators::{ReconfigurationError, ValidatedReconfigureMsg};
use crate::{
CoordinatorState, Envelope, Epoch, PersistentState, PlatformId, messages::*,
};
use crate::{CoordinatorState, Epoch, NodeHandlerCtx, PlatformId, messages::*};
use slog::{Logger, error, o, warn};
use std::time::Instant;

/// An entity capable of participating in trust quorum
///
Expand All @@ -30,30 +27,16 @@ use std::time::Instant;
pub struct Node {
log: Logger,

/// The unique hardware ID of a sled
platform_id: PlatformId,

/// State that gets persistenly stored in ledgers
persistent_state: PersistentState,

/// In memory state for when this node is coordinating a reconfiguration
coordinator_state: Option<CoordinatorState>,
}

impl Node {
pub fn new(
log: Logger,
platform_id: PlatformId,
persistent_state: PersistentState,
) -> Node {
let id_str = format!("{platform_id:?}");
pub fn new(log: Logger, ctx: &mut impl NodeHandlerCtx) -> Node {
let id_str = format!("{:?}", ctx.platform_id());
let log =
log.new(o!("component" => "trust-quorum", "platform_id" => id_str));
Node { log, platform_id, persistent_state, coordinator_state: None }
}

pub fn platform_id(&self) -> &PlatformId {
&self.platform_id
Node { log, coordinator_state: None }
}

/// Start coordinating a reconfiguration
Expand All @@ -64,58 +47,54 @@ impl Node {
/// For upgrading from LRTQ, use `coordinate_upgrade_from_lrtq`
pub fn coordinate_reconfiguration(
&mut self,
now: Instant,
outbox: &mut Vec<Envelope>,
ctx: &mut impl NodeHandlerCtx,
msg: ReconfigureMsg,
) -> Result<Option<PersistentState>, ReconfigurationError> {
) -> Result<(), ReconfigurationError> {
let Some(validated_msg) = ValidatedReconfigureMsg::new(
&self.log,
&self.platform_id,
ctx.platform_id(),
msg,
(&self.persistent_state).into(),
ctx.persistent_state().into(),
self.coordinator_state.as_ref().map(|cs| cs.reconfigure_msg()),
)?
else {
// This was an idempotent (duplicate) request.
return Ok(None);
return Ok(());
};

let persistent_state =
self.set_coordinator_state(now, validated_msg)?;
self.send_coordinator_msgs(now, outbox);
Ok(persistent_state)
self.set_coordinator_state(ctx, validated_msg)?;
self.send_coordinator_msgs(ctx);
Ok(())
}

/// Process a timer tick
///
/// Ticks are issued by the caller in order to move the protocol forward.
/// The current time is passed in to make the calls deterministic.
pub fn tick(&mut self, now: Instant, outbox: &mut Vec<Envelope>) {
self.send_coordinator_msgs(now, outbox);
pub fn tick(&mut self, ctx: &mut impl NodeHandlerCtx) {
self.send_coordinator_msgs(ctx);
}

/// Handle a message from another node
pub fn handle(
&mut self,
_now: Instant,
_outbox: &mut Vec<Envelope>,
ctx: &mut impl NodeHandlerCtx,
from: PlatformId,
msg: PeerMsg,
) -> Option<PersistentState> {
if let Some(rack_id) = self.persistent_state.rack_id() {
) {
if let Some(rack_id) = ctx.persistent_state().rack_id() {
if rack_id != msg.rack_id {
error!(self.log, "Mismatched rack id";
"from" => %from,
"msg" => msg.kind.name(),
"expected" => %rack_id,
"got" => %msg.rack_id);
return None;
return;
}
}
match msg.kind {
PeerMsgKind::PrepareAck(epoch) => {
self.handle_prepare_ack(from, epoch);
None
}
_ => todo!(
"cannot handle message variant yet - not implemented: {msg:?}"
Expand Down Expand Up @@ -154,16 +133,12 @@ impl Node {
}

// Send any required messages as a reconfiguration coordinator
fn send_coordinator_msgs(
&mut self,
now: Instant,
outbox: &mut Vec<Envelope>,
) {
fn send_coordinator_msgs(&mut self, ctx: &mut impl NodeHandlerCtx) {
// This function is called unconditionally in `tick` callbacks. In this
// case we may not actually be a coordinator. We ignore the call in
// that case.
if let Some(c) = self.coordinator_state.as_mut() {
c.send_msgs(now, outbox);
c.send_msgs(ctx);
}
}

Expand All @@ -175,53 +150,51 @@ impl Node {
/// we have a `ValidatedReconfigureMsg`.
fn set_coordinator_state(
&mut self,
now: Instant,
ctx: &mut impl NodeHandlerCtx,
msg: ValidatedReconfigureMsg,
) -> Result<Option<PersistentState>, ReconfigurationError> {
) -> Result<(), ReconfigurationError> {
// We have no committed configuration or lrtq ledger
if self.persistent_state.is_uninitialized() {
if ctx.persistent_state().is_uninitialized() {
let (coordinator_state, my_config, my_share) =
CoordinatorState::new_uninitialized(
self.log.clone(),
now,
ctx.now(),
msg,
)?;
self.coordinator_state = Some(coordinator_state);
self.persistent_state.shares.insert(my_config.epoch, my_share);
self.persistent_state
.configs
.insert_unique(my_config)
.expect("empty state");
ctx.update_persistent_state(move |ps| {
ps.shares.insert(my_config.epoch, my_share);
ps.configs.insert_unique(my_config).expect("empty state");
true
});

return Ok(Some(self.persistent_state.clone()));
return Ok(());
}

// We have a committed configuration that is not LRTQ
let config =
self.persistent_state.latest_committed_configuration().unwrap();
ctx.persistent_state().latest_committed_configuration().unwrap();

self.coordinator_state = Some(CoordinatorState::new_reconfiguration(
self.log.clone(),
now,
ctx.now(),
msg,
&config,
)?);

Ok(None)
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use crate::{Epoch, Threshold};

use super::*;
use crate::{Epoch, NodeCallerCtx, NodeCommonCtx, NodeCtx, Threshold};
use assert_matches::assert_matches;
use omicron_test_utils::dev::test_setup_log;
use omicron_uuid_kinds::RackUuid;
use proptest::prelude::*;
use std::time::Duration;
use test_strategy::{Arbitrary, proptest};

fn arb_member() -> impl Strategy<Value = PlatformId> {
Expand Down Expand Up @@ -259,21 +232,21 @@ mod tests {
let logctx = test_setup_log("initial_configuration");
let my_platform_id =
input.reconfigure_msg.members.first().unwrap().clone();
let mut node = Node::new(
logctx.log.clone(),
my_platform_id.clone(),
PersistentState::empty(),
);
let mut ctx = NodeCtx::new(my_platform_id.clone());
let mut node = Node::new(logctx.log.clone(), &mut ctx);

node.coordinate_reconfiguration(
&mut ctx,
input.reconfigure_msg.clone(),
)
.expect("success");

// An initial configuraration always causes a change to persistent state
assert!(ctx.persistent_state_change_check_and_reset());
// Checking if the persistent state has changed above cleared the bit
assert!(!ctx.persistent_state_change_check_and_reset());

let mut outbox = Vec::new();
let persistent_state = node
.coordinate_reconfiguration(
Instant::now(),
&mut outbox,
input.reconfigure_msg.clone(),
)
.expect("success")
.expect("persistent state");
let persistent_state = ctx.persistent_state().clone();

// A PersistentState should always be returned
// It should include the `PrepareMsg` for this node.
Expand All @@ -288,15 +261,15 @@ mod tests {
persistent_state.configs.get(&input.reconfigure_msg.epoch).unwrap();

assert_eq!(config.epoch, input.reconfigure_msg.epoch);
assert_eq!(config.coordinator, *node.platform_id());
assert_eq!(config.coordinator, *ctx.platform_id());
assert_eq!(config.members.len(), input.reconfigure_msg.members.len());
assert_eq!(config.threshold, input.reconfigure_msg.threshold);
assert!(config.encrypted_rack_secrets.is_none());

// Ensure that prepare messages are properly put in the outbox to be
// sent by the I/O parts of the codebase
assert_eq!(outbox.len(), config.members.len() - 1);
for envelope in outbox {
assert_eq!(ctx.num_envelopes(), config.members.len() - 1);
for envelope in ctx.drain_envelopes() {
assert_matches!(
envelope.msg.kind,
PeerMsgKind::Prepare{ config: prepare_config, .. } => {
Expand Down
Loading
Loading