Skip to content

Commit d1ec336

Browse files
committed
tests(net): Add a net test with many nodes (#65)
This adds a large integration test that bootstraps a swarm with many nodes and lets each node broadcast some messages. The test runs with 100 nodes and 2 messages per node. On my machine I ran it a couple of times with 1000 nodes in release mode. It completes fine, but takes a while in debug mode, thus limited to 100 nodes for now to not overwhelm CI.
1 parent 3cf2cd2 commit d1ec336

File tree

1 file changed

+264
-0
lines changed

1 file changed

+264
-0
lines changed

src/net.rs

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1357,10 +1357,13 @@ impl Dialer {
13571357
mod test {
13581358
use std::time::Duration;
13591359

1360+
use anyhow::{anyhow, bail};
13601361
use bytes::Bytes;
13611362
use futures_concurrency::future::TryJoin;
13621363
use iroh::{protocol::Router, RelayMap, RelayMode, SecretKey};
1364+
use n0_future::{FuturesUnordered, StreamExt};
13631365
use rand::Rng;
1366+
use testresult::TestResult;
13641367
use tokio::{spawn, time::timeout};
13651368
use tokio_util::sync::CancellationToken;
13661369
use tracing::{info, instrument};
@@ -2022,4 +2025,265 @@ mod test {
20222025

20232026
Ok(())
20242027
}
2028+
2029+
#[tokio::test(flavor = "multi_thread")]
2030+
async fn gossip_net_big() -> TestResult {
2031+
tracing_subscriber::fmt::try_init().ok();
2032+
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2033+
let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
2034+
let dns = iroh::test_utils::DnsPkarrServer::run().await?;
2035+
2036+
let node_count: usize = std::env::var("NODE_COUNT")
2037+
.map(|x| x.parse().unwrap())
2038+
.unwrap_or(100);
2039+
let message_count: usize = std::env::var("MESSAGE_COUNT")
2040+
.map(|x| x.parse().unwrap())
2041+
.unwrap_or(2);
2042+
2043+
let warmup_sleep_s = std::env::var("WARMUP_SLEEP")
2044+
.map(|x| x.parse().unwrap())
2045+
.unwrap_or(1);
2046+
2047+
let send_interval_ms = std::env::var("SEND_INTERVAL")
2048+
.map(|x| x.parse().unwrap())
2049+
.unwrap_or(50);
2050+
2051+
let timeout_ms = std::env::var("TIMEOUT")
2052+
.map(|x| x.parse().unwrap())
2053+
.unwrap_or(10000);
2054+
let timeout = Duration::from_millis(timeout_ms);
2055+
info!("recv timeout: {timeout:?}");
2056+
2057+
// spawn
2058+
info!("spawn {node_count} nodes");
2059+
let secret_keys = (0..node_count).map(|_i| SecretKey::generate(&mut rng));
2060+
let spawning = FuturesUnordered::from_iter(secret_keys.map(|secret_key| {
2061+
let relay_map = relay_map.clone();
2062+
let discovery = dns.discovery(secret_key.clone());
2063+
let dns_resolver = dns.dns_resolver();
2064+
task(async move {
2065+
let endpoint = Endpoint::builder()
2066+
.secret_key(secret_key)
2067+
.alpns(vec![GOSSIP_ALPN.to_vec()])
2068+
.relay_mode(RelayMode::Custom(relay_map))
2069+
.discovery(discovery)
2070+
.dns_resolver(dns_resolver)
2071+
.insecure_skip_relay_cert_verify(true)
2072+
.bind()
2073+
.await?;
2074+
let gossip = Gossip::builder().spawn(endpoint.clone()).await?;
2075+
let router = Router::builder(endpoint)
2076+
.accept(GOSSIP_ALPN, gossip.clone())
2077+
.spawn();
2078+
anyhow::Ok((router, gossip))
2079+
})
2080+
}));
2081+
let spawned: Vec<_> = spawning.try_collect().await?;
2082+
let (routers, gossips): (Vec<_>, Vec<_>) = spawned.into_iter().unzip();
2083+
info!("all spawned");
2084+
2085+
// wait for all nodes to be visible on the router
2086+
for router in routers.iter() {
2087+
let node_id = router.endpoint().node_id();
2088+
dns.on_node(&node_id, Duration::from_secs(1)).await?;
2089+
}
2090+
2091+
info!("all published to discovery");
2092+
2093+
// bootstrap
2094+
let topic_id = TopicId::from_bytes([0u8; 32]);
2095+
2096+
let bootstrap_node = routers[0].endpoint().node_id();
2097+
2098+
let mut senders = vec![];
2099+
2100+
let bootstrap_count = node_count.min(10).max(node_count / 50);
2101+
info!("start with {bootstrap_count} bootstrap nodes");
2102+
let mut joining = FuturesUnordered::new();
2103+
#[allow(clippy::needless_range_loop)]
2104+
for i in 0..bootstrap_count {
2105+
let bootstrap = if i == 0 { vec![] } else { vec![bootstrap_node] };
2106+
let (sender, mut receiver) = gossips[i].subscribe(topic_id, bootstrap)?.split();
2107+
let endpoint = routers[i].endpoint().clone();
2108+
senders.push((sender, endpoint.node_id()));
2109+
joining.push(
2110+
async move {
2111+
receiver.joined().await?;
2112+
Ok((receiver, endpoint))
2113+
}
2114+
.boxed(),
2115+
);
2116+
}
2117+
2118+
let joined: anyhow::Result<Vec<_>> = joining.try_collect().await;
2119+
let mut receivers = joined.context("failed to join all nodes")?;
2120+
info!("bootstrap nodes joined");
2121+
2122+
info!("sleep {warmup_sleep_s}s for swarm to stabilize");
2123+
tokio::time::sleep(Duration::from_secs(warmup_sleep_s)).await;
2124+
2125+
info!("join {} remaining nodes", node_count - bootstrap_count);
2126+
let chunks = node_count / bootstrap_count;
2127+
for chunk in 1..chunks {
2128+
let mut joining = FuturesUnordered::new();
2129+
#[allow(clippy::needless_range_loop)]
2130+
for j in 0..bootstrap_count {
2131+
let i = (chunk * bootstrap_count) + j;
2132+
if i >= node_count {
2133+
break;
2134+
}
2135+
let bootstrap = vec![routers[i % bootstrap_count].endpoint().node_id()];
2136+
let (sender, mut receiver) = gossips[i].subscribe(topic_id, bootstrap)?.split();
2137+
let endpoint = routers[i].endpoint().clone();
2138+
senders.push((sender, endpoint.node_id()));
2139+
joining.push(
2140+
async move {
2141+
receiver.joined().await?;
2142+
Ok((receiver, endpoint))
2143+
}
2144+
.boxed(),
2145+
);
2146+
}
2147+
2148+
let joined: anyhow::Result<Vec<_>> = joining.try_collect().await;
2149+
receivers.extend(joined.context("failed to join all nodes")?);
2150+
info!("joined chunk {chunk} of {chunks} with {bootstrap_count}");
2151+
}
2152+
2153+
info!("sleep {warmup_sleep_s}s for swarm to stabilize");
2154+
tokio::time::sleep(Duration::from_secs(warmup_sleep_s)).await;
2155+
2156+
info!("sending & receiving {message_count} messages on each node");
2157+
// spawn send tasks
2158+
let sending = senders.into_iter().enumerate().map(|(i, (sender, me))| {
2159+
task(async move {
2160+
for j in 0..message_count {
2161+
let message = format!("{}:{}", me.fmt_short(), j);
2162+
let message: Bytes = message.as_bytes().to_vec().into();
2163+
sender.broadcast(message).await?;
2164+
tokio::time::sleep(Duration::from_millis(send_interval_ms)).await
2165+
}
2166+
debug!("{i}: sent all");
2167+
anyhow::Ok((me, sender))
2168+
})
2169+
});
2170+
let sending = FuturesUnordered::from_iter(sending);
2171+
2172+
let all_messages: BTreeSet<Bytes> = routers
2173+
.iter()
2174+
.map(|r| r.endpoint().node_id())
2175+
.flat_map(|node_id| {
2176+
(0..message_count)
2177+
.map(move |i| format!("{}:{}", node_id.fmt_short(), i).into_bytes().into())
2178+
})
2179+
.collect();
2180+
let all_messages = Arc::new(all_messages);
2181+
2182+
// closure to create a set of expected messages at a peer
2183+
let expected = move |all_messages: &BTreeSet<Bytes>, me: NodeId| -> BTreeSet<Bytes> {
2184+
let me = me.fmt_short();
2185+
all_messages
2186+
.iter()
2187+
.filter(|m| !m.starts_with(me.as_bytes()))
2188+
.cloned()
2189+
.collect()
2190+
};
2191+
2192+
// spawn recv tasks
2193+
let receiving = receivers.into_iter().map(|(mut receiver, endpoint)| {
2194+
let all_messages = Arc::clone(&all_messages);
2195+
let me = endpoint.node_id();
2196+
task(async move {
2197+
let mut missing = expected(&all_messages, endpoint.node_id());
2198+
let timeout = tokio::time::sleep(timeout);
2199+
tokio::pin!(timeout);
2200+
let res = loop {
2201+
let event = tokio::select! {
2202+
res = receiver.next() => {
2203+
match res {
2204+
None => break Err(anyhow!("receiver closed")),
2205+
Some(Err(err)) => break Err(err.into()),
2206+
Some(Ok(event)) => event,
2207+
}
2208+
},
2209+
_ = &mut timeout => break Err(anyhow!("timeout"))
2210+
};
2211+
if let Event::Gossip(GossipEvent::Received(message)) = event {
2212+
if !missing.remove(&message.content) {
2213+
break Err(anyhow!(
2214+
"duplicate message: {:?} delivered from {}",
2215+
String::from_utf8_lossy(&message.content),
2216+
message.delivered_from.fmt_short()
2217+
));
2218+
}
2219+
if missing.is_empty() {
2220+
break Ok(());
2221+
}
2222+
}
2223+
};
2224+
(receiver, missing, res)
2225+
})
2226+
.map(move |res| (me, res))
2227+
});
2228+
let mut receiving = FuturesUnordered::from_iter(receiving);
2229+
2230+
let senders_fut = async move {
2231+
let senders: Vec<_> = sending.try_collect().await?;
2232+
anyhow::Ok(senders)
2233+
};
2234+
let expected_count = message_count * (node_count - 1);
2235+
let receivers_fut = task(async move {
2236+
let mut failed = 0;
2237+
let mut missing_total = 0;
2238+
let mut receivers = vec![];
2239+
while let Some(res) = receiving.next().await {
2240+
let (node_id, (receiver, missing, res)) = res;
2241+
receivers.push(receiver);
2242+
match res {
2243+
Err(err) => {
2244+
missing_total += missing.len();
2245+
failed += 1;
2246+
warn!(me=%node_id.fmt_short(), ?missing, "recv task failed: {err:#}");
2247+
for m in missing {
2248+
let hash = blake3::hash(&m);
2249+
warn!(me=%node_id.fmt_short(), ?hash, "missing");
2250+
}
2251+
}
2252+
Ok(()) => {
2253+
assert!(missing.is_empty());
2254+
}
2255+
}
2256+
}
2257+
if failed > 0 {
2258+
bail!("Receive side failed: {failed} nodes together missed {missing_total} messages of {expected_count}");
2259+
} else {
2260+
Ok(receivers)
2261+
}
2262+
});
2263+
2264+
let (senders, receivers) = (senders_fut, receivers_fut).try_join().await?;
2265+
info!("all done");
2266+
assert_eq!(senders.len(), node_count);
2267+
assert_eq!(receivers.len(), node_count);
2268+
drop(senders);
2269+
drop(receivers);
2270+
let _ = FuturesUnordered::from_iter(gossips.iter().map(|gossip| gossip.shutdown()))
2271+
.count()
2272+
.await;
2273+
let mut shutdown =
2274+
FuturesUnordered::from_iter(routers.into_iter().map(|router| async move {
2275+
(router.endpoint().node_id(), router.shutdown().await)
2276+
}));
2277+
while let Some((node_id, res)) = shutdown.next().await {
2278+
res.with_context(|| format!("shutdown failed for {}", node_id.fmt_short()))?;
2279+
}
2280+
2281+
Ok(())
2282+
}
2283+
2284+
async fn task<T: Send + 'static>(
2285+
fut: impl std::future::Future<Output = T> + Send + 'static,
2286+
) -> T {
2287+
n0_future::task::spawn(fut).await.unwrap()
2288+
}
20252289
}

0 commit comments

Comments
 (0)