Skip to content

Commit 087f5ed

Browse files
committed
feat(swim): add spawn entry-point, SwimHandle, and optional UDP config field
Callers previously had to wire up MembershipList, DisseminationQueue, FailureDetector, and the run-loop task themselves — a four-step sequence with easy ordering mistakes. `spawn_swim` collapses this into one call: it seeds the membership list from the provided address list, validates config, starts the detector task, and returns a `SwimHandle` that owns shutdown plumbing and exposes the shared membership and dissemination queue. `BootstrapConfig` gains `swim_udp_addr: Option<SocketAddr>` so operators can opt into SWIM by supplying a bind address. `None` keeps the existing behaviour (cluster boots without SWIM; membership is observed only through Raft). All existing callsites are updated with `swim_udp_addr: None`. `SwimHandle` and `spawn_swim` are re-exported from `nodedb_cluster` so dependent crates do not need to reach into swim internals. Adds a real-UDP integration test (`swim_udp_convergence`) that boots three SWIM nodes on ephemeral loopback ports and asserts they converge to a full Alive view before a member is shut down and the remainder observe it as Suspect/Dead.
1 parent 44a0d4b commit 087f5ed

11 files changed

Lines changed: 409 additions & 2 deletions

File tree

nodedb-cluster/src/bootstrap/bootstrap_fn.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ mod tests {
101101
data_dir: _dir.path().to_path_buf(),
102102
force_bootstrap: false,
103103
join_retry: Default::default(),
104+
swim_udp_addr: None,
104105
};
105106

106107
let state = bootstrap(&config, &catalog).unwrap();

nodedb-cluster/src/bootstrap/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ pub struct ClusterConfig {
8484
/// (`8` attempts, `32 s` ceiling). Tests override this with a
8585
/// faster policy.
8686
pub join_retry: JoinRetryPolicy,
87+
/// Optional UDP bind address for the SWIM failure detector. `None`
88+
/// disables SWIM entirely — cluster startup then relies solely on
89+
/// the existing raft transport for membership observations. When
90+
/// `Some`, the operator is expected to spawn SWIM separately via
91+
/// [`crate::spawn_swim`] after the cluster is up and feed the
92+
/// seed list from `seed_nodes`.
93+
pub swim_udp_addr: Option<SocketAddr>,
8794
}
8895

8996
/// Result of cluster startup — everything needed to run the Raft loop.

nodedb-cluster/src/bootstrap/join.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,7 @@ mod tests {
449449
data_dir: _dir1.path().to_path_buf(),
450450
force_bootstrap: false,
451451
join_retry: Default::default(),
452+
swim_udp_addr: None,
452453
};
453454
let state1 = bootstrap(&config1, &catalog1).unwrap();
454455

@@ -497,6 +498,7 @@ mod tests {
497498
data_dir: _dir2.path().to_path_buf(),
498499
force_bootstrap: false,
499500
join_retry: Default::default(),
501+
swim_udp_addr: None,
500502
};
501503

502504
let lifecycle = ClusterLifecycleTracker::new();

nodedb-cluster/src/bootstrap/probe.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ mod tests {
222222
data_dir: std::env::temp_dir(),
223223
force_bootstrap: false,
224224
join_retry: Default::default(),
225+
swim_udp_addr: None,
225226
}
226227
}
227228

nodedb-cluster/src/bootstrap/restart.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ mod tests {
111111
data_dir: _dir.path().to_path_buf(),
112112
force_bootstrap: false,
113113
join_retry: Default::default(),
114+
swim_udp_addr: None,
114115
};
115116

116117
// Bootstrap first.

nodedb-cluster/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,7 @@ pub use lifecycle::{
7878
pub use rdma_transport::{RdmaConfig, RdmaTransport};
7979
pub use rebalance_scheduler::{NodeMetrics, RebalanceScheduler, RebalanceTrigger, SchedulerConfig};
8080
pub use shard_split::{SplitPlan, SplitStrategy, plan_graph_split, plan_vector_split};
81-
pub use swim::{Incarnation, Member, MemberState, MembershipList, SwimConfig, SwimError};
81+
pub use swim::{
82+
Incarnation, Member, MemberState, MembershipList, SwimConfig, SwimError, SwimHandle,
83+
UdpTransport, spawn as spawn_swim,
84+
};
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
//! SWIM subsystem bootstrap.
2+
//!
3+
//! [`spawn`] is the one-stop entry point callers (cluster startup or
4+
//! tests) use to stand up a running failure detector:
5+
//!
6+
//! 1. Constructs a [`MembershipList`] containing the local node at
7+
//! incarnation 0.
8+
//! 2. Seeds the list with an `Alive` entry for every address in
9+
//! `seeds`, using a synthetic `NodeId` of the form `"seed:<addr>"`.
10+
//! The first successful probe replaces the placeholder with the
11+
//! peer's real node id via the normal merge path.
12+
//! 3. Validates [`SwimConfig`] and constructs a [`FailureDetector`].
13+
//! 4. Spawns the detector's run loop on a fresh tokio task.
14+
//! 5. Returns a [`SwimHandle`] the caller can use to read membership,
15+
//! access the dissemination queue, and shut the detector down.
16+
17+
use std::net::SocketAddr;
18+
use std::sync::Arc;
19+
20+
use nodedb_types::NodeId;
21+
use tokio::sync::watch;
22+
use tokio::task::JoinHandle;
23+
24+
use super::config::SwimConfig;
25+
use super::detector::{FailureDetector, ProbeScheduler, Transport};
26+
use super::dissemination::DisseminationQueue;
27+
use super::error::SwimError;
28+
use super::incarnation::Incarnation;
29+
use super::member::MemberState;
30+
use super::member::record::MemberUpdate;
31+
use super::membership::MembershipList;
32+
33+
/// Owns a running SWIM detector and its shutdown plumbing.
34+
///
35+
/// Dropping `SwimHandle` leaks the background task — callers should
36+
/// always invoke [`SwimHandle::shutdown`] to request graceful drain.
37+
pub struct SwimHandle {
38+
detector: Arc<FailureDetector>,
39+
membership: Arc<MembershipList>,
40+
shutdown_tx: watch::Sender<bool>,
41+
join: JoinHandle<()>,
42+
}
43+
44+
impl SwimHandle {
45+
/// Shared reference to the detector (for metrics, debugging, or
46+
/// injecting synthetic rumours in tests).
47+
pub fn detector(&self) -> &Arc<FailureDetector> {
48+
&self.detector
49+
}
50+
51+
/// Shared reference to the membership list. Clone cheaply; the
52+
/// underlying `Arc` is identical to the detector's view.
53+
pub fn membership(&self) -> &Arc<MembershipList> {
54+
&self.membership
55+
}
56+
57+
/// Shared reference to the dissemination queue. Used by callers
58+
/// that want to enqueue rumours from outside SWIM (e.g. the raft
59+
/// layer announcing a conf change).
60+
pub fn dissemination(&self) -> &Arc<DisseminationQueue> {
61+
self.detector.dissemination()
62+
}
63+
64+
/// Signal the detector to shut down and await its task to finish.
65+
/// Returns whatever error the join handle surfaced (normally none).
66+
pub async fn shutdown(self) {
67+
let _ = self.shutdown_tx.send(true);
68+
let _ = self.join.await;
69+
}
70+
}
71+
72+
/// Bring up a SWIM failure detector.
73+
///
74+
/// * `cfg` — validated [`SwimConfig`]. An invalid config returns
75+
/// [`SwimError::InvalidConfig`] before any task is spawned.
76+
/// * `local_id` — this node's canonical id.
77+
/// * `local_addr` — the socket address the transport is already bound
78+
/// to. The membership list stores it verbatim for peers to echo back
79+
/// in probe responses.
80+
/// * `seeds` — initial peer addresses. Empty list is legal and yields a
81+
/// solo-cluster detector that does nothing interesting until a peer
82+
/// arrives via an external join.
83+
/// * `transport` — any [`Transport`] impl (UDP in production, the
84+
/// in-memory fabric in tests).
85+
pub async fn spawn(
86+
cfg: SwimConfig,
87+
local_id: NodeId,
88+
local_addr: SocketAddr,
89+
seeds: Vec<SocketAddr>,
90+
transport: Arc<dyn Transport>,
91+
) -> Result<SwimHandle, SwimError> {
92+
cfg.validate()?;
93+
94+
let membership = Arc::new(MembershipList::new_local(
95+
local_id.clone(),
96+
local_addr,
97+
cfg.initial_incarnation,
98+
));
99+
100+
// Seed the membership table so the first probe round has somewhere
101+
// to go. Placeholder ids are replaced on the first ack.
102+
for seed_addr in &seeds {
103+
if *seed_addr == local_addr {
104+
continue;
105+
}
106+
membership.apply(&MemberUpdate {
107+
node_id: NodeId::new(format!("seed:{seed_addr}")),
108+
addr: seed_addr.to_string(),
109+
state: MemberState::Alive,
110+
incarnation: Incarnation::ZERO,
111+
});
112+
}
113+
114+
let initial_inc = cfg.initial_incarnation;
115+
let detector = Arc::new(FailureDetector::new(
116+
cfg,
117+
Arc::clone(&membership),
118+
transport,
119+
ProbeScheduler::new(),
120+
));
121+
122+
// Prime the dissemination queue with our own Alive record so the
123+
// first outgoing probes advertise our canonical NodeId + addr to
124+
// every seed. Without this, seed placeholders would never be
125+
// replaced with real ids until some peer independently learned
126+
// our identity — which is not reliable from seed bootstrap alone.
127+
detector.dissemination().enqueue(MemberUpdate {
128+
node_id: local_id.clone(),
129+
addr: local_addr.to_string(),
130+
state: MemberState::Alive,
131+
incarnation: initial_inc,
132+
});
133+
134+
let (shutdown_tx, shutdown_rx) = watch::channel(false);
135+
let join = tokio::spawn({
136+
let detector = Arc::clone(&detector);
137+
async move { detector.run(shutdown_rx).await }
138+
});
139+
140+
Ok(SwimHandle {
141+
detector,
142+
membership,
143+
shutdown_tx,
144+
join,
145+
})
146+
}
147+
148+
#[cfg(test)]
149+
mod tests {
150+
use super::*;
151+
use crate::swim::detector::TransportFabric;
152+
use std::net::{IpAddr, Ipv4Addr};
153+
use std::time::Duration;
154+
155+
fn addr(p: u16) -> SocketAddr {
156+
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), p)
157+
}
158+
159+
fn cfg() -> SwimConfig {
160+
SwimConfig {
161+
probe_interval: Duration::from_millis(100),
162+
probe_timeout: Duration::from_millis(40),
163+
indirect_probes: 2,
164+
suspicion_mult: 4,
165+
min_suspicion: Duration::from_millis(500),
166+
initial_incarnation: Incarnation::ZERO,
167+
max_piggyback: 6,
168+
fanout_lambda: 3,
169+
}
170+
}
171+
172+
#[tokio::test]
173+
async fn spawn_solo_cluster_has_only_local() {
174+
let fab = TransportFabric::new();
175+
let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7100)).await);
176+
let handle = spawn(cfg(), NodeId::new("a"), addr(7100), vec![], transport)
177+
.await
178+
.expect("spawn");
179+
assert_eq!(handle.membership().len(), 1);
180+
assert!(handle.membership().is_solo());
181+
handle.shutdown().await;
182+
}
183+
184+
#[tokio::test]
185+
async fn spawn_seeds_populates_membership() {
186+
let fab = TransportFabric::new();
187+
let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7110)).await);
188+
let handle = spawn(
189+
cfg(),
190+
NodeId::new("a"),
191+
addr(7110),
192+
vec![addr(7111), addr(7112)],
193+
transport,
194+
)
195+
.await
196+
.expect("spawn");
197+
assert_eq!(handle.membership().len(), 3);
198+
handle.shutdown().await;
199+
}
200+
201+
#[tokio::test]
202+
async fn spawn_skips_local_addr_in_seeds() {
203+
let fab = TransportFabric::new();
204+
let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7120)).await);
205+
let handle = spawn(
206+
cfg(),
207+
NodeId::new("a"),
208+
addr(7120),
209+
vec![addr(7120), addr(7121)],
210+
transport,
211+
)
212+
.await
213+
.expect("spawn");
214+
// Local + one real seed = 2.
215+
assert_eq!(handle.membership().len(), 2);
216+
handle.shutdown().await;
217+
}
218+
219+
#[tokio::test]
220+
async fn invalid_config_rejected_before_task_spawned() {
221+
let fab = TransportFabric::new();
222+
let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7130)).await);
223+
let mut bad = cfg();
224+
bad.probe_timeout = bad.probe_interval; // violates the strict-less rule
225+
let res = spawn(bad, NodeId::new("a"), addr(7130), vec![], transport).await;
226+
match res {
227+
Err(SwimError::InvalidConfig { .. }) => {}
228+
Err(other) => panic!("expected InvalidConfig, got {other:?}"),
229+
Ok(_) => panic!("expected InvalidConfig error"),
230+
}
231+
}
232+
233+
#[tokio::test]
234+
async fn shutdown_joins_promptly() {
235+
let fab = TransportFabric::new();
236+
let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7140)).await);
237+
let handle = spawn(cfg(), NodeId::new("a"), addr(7140), vec![], transport)
238+
.await
239+
.expect("spawn");
240+
let start = std::time::Instant::now();
241+
tokio::time::timeout(Duration::from_millis(500), handle.shutdown())
242+
.await
243+
.expect("shutdown did not join within budget");
244+
assert!(start.elapsed() < Duration::from_millis(500));
245+
}
246+
}

nodedb-cluster/src/swim/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
//! It exposes the pure data model — member states, incarnation numbers, and
2121
//! the state-merge rule — that every later sub-batch builds on.
2222
23+
pub mod bootstrap;
2324
pub mod config;
2425
pub mod detector;
2526
pub mod dissemination;
@@ -29,9 +30,10 @@ pub mod member;
2930
pub mod membership;
3031
pub mod wire;
3132

33+
pub use bootstrap::{SwimHandle, spawn};
3234
pub use config::SwimConfig;
3335
pub use detector::{
34-
FailureDetector, InMemoryTransport, ProbeScheduler, Transport, TransportFabric,
36+
FailureDetector, InMemoryTransport, ProbeScheduler, Transport, TransportFabric, UdpTransport,
3537
};
3638
pub use dissemination::{DisseminationQueue, PendingUpdate, apply_and_disseminate};
3739
pub use error::SwimError;

nodedb-cluster/tests/common/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ impl TestNode {
186186
max_attempts: 8,
187187
max_backoff_secs: 2,
188188
},
189+
swim_udp_addr: None,
189190
};
190191

191192
let lifecycle = ClusterLifecycleTracker::new();

0 commit comments

Comments
 (0)