Skip to content

Commit a80bf1b

Browse files
committed
tests: add big test
1 parent 1c521b9 commit a80bf1b

File tree

1 file changed

+264
-0
lines changed

1 file changed

+264
-0
lines changed

src/net.rs

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,10 +1348,13 @@ impl Dialer {
13481348
mod test {
13491349
use std::time::Duration;
13501350

1351+
use anyhow::{anyhow, bail};
13511352
use bytes::Bytes;
13521353
use futures_concurrency::future::TryJoin;
13531354
use iroh::{protocol::Router, RelayMap, RelayMode, SecretKey};
1355+
use n0_future::{FuturesUnordered, StreamExt};
13541356
use rand::Rng;
1357+
use testresult::TestResult;
13551358
use tokio::{spawn, time::timeout};
13561359
use tokio_util::sync::CancellationToken;
13571360
use tracing::{info, instrument};
@@ -2013,4 +2016,265 @@ mod test {
20132016

20142017
Ok(())
20152018
}
2019+
2020+
#[tokio::test(flavor = "multi_thread")]
2021+
async fn gossip_net_big() -> TestResult {
2022+
tracing_subscriber::fmt::try_init().ok();
2023+
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2024+
let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
2025+
let dns = iroh::test_utils::DnsPkarrServer::run().await?;
2026+
2027+
let node_count: usize = std::env::var("NODE_COUNT")
2028+
.map(|x| x.parse().unwrap())
2029+
.unwrap_or(100);
2030+
let message_count: usize = std::env::var("MESSAGE_COUNT")
2031+
.map(|x| x.parse().unwrap())
2032+
.unwrap_or(2);
2033+
2034+
let warmup_sleep_s = std::env::var("WARMUP_SLEEP")
2035+
.map(|x| x.parse().unwrap())
2036+
.unwrap_or(1);
2037+
2038+
let send_interval_ms = std::env::var("SEND_INTERVAL")
2039+
.map(|x| x.parse().unwrap())
2040+
.unwrap_or(50);
2041+
2042+
let timeout_ms = std::env::var("TIMEOUT")
2043+
.map(|x| x.parse().unwrap())
2044+
.unwrap_or(10000);
2045+
let timeout = Duration::from_millis(timeout_ms);
2046+
info!("recv timeout: {timeout:?}");
2047+
2048+
// spawn
2049+
info!("spawn {node_count} nodes");
2050+
let secret_keys = (0..node_count).map(|_i| SecretKey::generate(&mut rng));
2051+
let spawning = FuturesUnordered::from_iter(secret_keys.map(|secret_key| {
2052+
let relay_map = relay_map.clone();
2053+
let discovery = dns.discovery(secret_key.clone());
2054+
let dns_resolver = dns.dns_resolver();
2055+
task(async move {
2056+
let endpoint = Endpoint::builder()
2057+
.secret_key(secret_key)
2058+
.alpns(vec![GOSSIP_ALPN.to_vec()])
2059+
.relay_mode(RelayMode::Custom(relay_map))
2060+
.discovery(discovery)
2061+
.dns_resolver(dns_resolver)
2062+
.insecure_skip_relay_cert_verify(true)
2063+
.bind()
2064+
.await?;
2065+
let gossip = Gossip::builder().spawn(endpoint.clone()).await?;
2066+
let router = Router::builder(endpoint)
2067+
.accept(GOSSIP_ALPN, gossip.clone())
2068+
.spawn();
2069+
anyhow::Ok((router, gossip))
2070+
})
2071+
}));
2072+
let spawned: Vec<_> = spawning.try_collect().await?;
2073+
let (routers, gossips): (Vec<_>, Vec<_>) = spawned.into_iter().unzip();
2074+
info!("all spawned");
2075+
2076+
// wait for all nodes to be visible on the router
2077+
for router in routers.iter() {
2078+
let node_id = router.endpoint().node_id();
2079+
dns.on_node(&node_id, Duration::from_secs(1)).await?;
2080+
}
2081+
2082+
info!("all published to discovery");
2083+
2084+
// bootstrap
2085+
let topic_id = TopicId::from_bytes([0u8; 32]);
2086+
2087+
let bootstrap_node = routers[0].endpoint().node_id();
2088+
2089+
let mut senders = vec![];
2090+
2091+
let bootstrap_count = node_count.min(10).max(node_count / 50);
2092+
info!("start with {bootstrap_count} bootstrap nodes");
2093+
let mut joining = FuturesUnordered::new();
2094+
#[allow(clippy::needless_range_loop)]
2095+
for i in 0..bootstrap_count {
2096+
let bootstrap = if i == 0 { vec![] } else { vec![bootstrap_node] };
2097+
let (sender, mut receiver) = gossips[i].subscribe(topic_id, bootstrap)?.split();
2098+
let endpoint = routers[i].endpoint().clone();
2099+
senders.push((sender, endpoint.node_id()));
2100+
joining.push(
2101+
async move {
2102+
receiver.joined().await?;
2103+
Ok((receiver, endpoint))
2104+
}
2105+
.boxed(),
2106+
);
2107+
}
2108+
2109+
let joined: anyhow::Result<Vec<_>> = joining.try_collect().await;
2110+
let mut receivers = joined.context("failed to join all nodes")?;
2111+
info!("bootstrap nodes joined");
2112+
2113+
info!("sleep {warmup_sleep_s}s for swarm to stabilize");
2114+
tokio::time::sleep(Duration::from_secs(warmup_sleep_s)).await;
2115+
2116+
info!("join {} remaining nodes", node_count - bootstrap_count);
2117+
let chunks = node_count / bootstrap_count;
2118+
for chunk in 1..chunks {
2119+
let mut joining = FuturesUnordered::new();
2120+
#[allow(clippy::needless_range_loop)]
2121+
for j in 0..bootstrap_count {
2122+
let i = (chunk * bootstrap_count) + j;
2123+
if i >= node_count {
2124+
break;
2125+
}
2126+
let bootstrap = vec![routers[i % bootstrap_count].endpoint().node_id()];
2127+
let (sender, mut receiver) = gossips[i].subscribe(topic_id, bootstrap)?.split();
2128+
let endpoint = routers[i].endpoint().clone();
2129+
senders.push((sender, endpoint.node_id()));
2130+
joining.push(
2131+
async move {
2132+
receiver.joined().await?;
2133+
Ok((receiver, endpoint))
2134+
}
2135+
.boxed(),
2136+
);
2137+
}
2138+
2139+
let joined: anyhow::Result<Vec<_>> = joining.try_collect().await;
2140+
receivers.extend(joined.context("failed to join all nodes")?);
2141+
info!("joined chunk {chunk} of {chunks} with {bootstrap_count}");
2142+
}
2143+
2144+
info!("sleep {warmup_sleep_s}s for swarm to stabilize");
2145+
tokio::time::sleep(Duration::from_secs(warmup_sleep_s)).await;
2146+
2147+
info!("sending & receiving {message_count} messages on each node");
2148+
// spawn send tasks
2149+
let sending = senders.into_iter().enumerate().map(|(i, (sender, me))| {
2150+
task(async move {
2151+
for j in 0..message_count {
2152+
let message = format!("{}:{}", me.fmt_short(), j);
2153+
let message: Bytes = message.as_bytes().to_vec().into();
2154+
sender.broadcast(message).await?;
2155+
tokio::time::sleep(Duration::from_millis(send_interval_ms)).await
2156+
}
2157+
debug!("{i}: sent all");
2158+
anyhow::Ok((me, sender))
2159+
})
2160+
});
2161+
let sending = FuturesUnordered::from_iter(sending);
2162+
2163+
let all_messages: BTreeSet<Bytes> = routers
2164+
.iter()
2165+
.map(|r| r.endpoint().node_id())
2166+
.flat_map(|node_id| {
2167+
(0..message_count)
2168+
.map(move |i| format!("{}:{}", node_id.fmt_short(), i).into_bytes().into())
2169+
})
2170+
.collect();
2171+
let all_messages = Arc::new(all_messages);
2172+
2173+
// closure to create a set of expected messages at a peer
2174+
let expected = move |all_messages: &BTreeSet<Bytes>, me: NodeId| -> BTreeSet<Bytes> {
2175+
let me = me.fmt_short();
2176+
all_messages
2177+
.iter()
2178+
.filter(|m| !m.starts_with(me.as_bytes()))
2179+
.cloned()
2180+
.collect()
2181+
};
2182+
2183+
// spawn recv tasks
2184+
let receiving = receivers.into_iter().map(|(mut receiver, endpoint)| {
2185+
let all_messages = Arc::clone(&all_messages);
2186+
let me = endpoint.node_id();
2187+
task(async move {
2188+
let mut missing = expected(&all_messages, endpoint.node_id());
2189+
let timeout = tokio::time::sleep(timeout);
2190+
tokio::pin!(timeout);
2191+
let res = loop {
2192+
let event = tokio::select! {
2193+
res = receiver.next() => {
2194+
match res {
2195+
None => break Err(anyhow!("receiver closed")),
2196+
Some(Err(err)) => break Err(err.into()),
2197+
Some(Ok(event)) => event,
2198+
}
2199+
},
2200+
_ = &mut timeout => break Err(anyhow!("timeout"))
2201+
};
2202+
if let Event::Gossip(GossipEvent::Received(message)) = event {
2203+
if !missing.remove(&message.content) {
2204+
break Err(anyhow!(
2205+
"duplicate message: {:?} delivered from {}",
2206+
String::from_utf8_lossy(&message.content),
2207+
message.delivered_from.fmt_short()
2208+
));
2209+
}
2210+
if missing.is_empty() {
2211+
break Ok(());
2212+
}
2213+
}
2214+
};
2215+
(receiver, missing, res)
2216+
})
2217+
.map(move |res| (me, res))
2218+
});
2219+
let mut receiving = FuturesUnordered::from_iter(receiving);
2220+
2221+
let senders_fut = async move {
2222+
let senders: Vec<_> = sending.try_collect().await?;
2223+
anyhow::Ok(senders)
2224+
};
2225+
let expected_count = message_count * (node_count - 1);
2226+
let receivers_fut = task(async move {
2227+
let mut failed = 0;
2228+
let mut missing_total = 0;
2229+
let mut receivers = vec![];
2230+
while let Some(res) = receiving.next().await {
2231+
let (node_id, (receiver, missing, res)) = res;
2232+
receivers.push(receiver);
2233+
match res {
2234+
Err(err) => {
2235+
missing_total += missing.len();
2236+
failed += 1;
2237+
warn!(me=%node_id.fmt_short(), ?missing, "recv task failed: {err:#}");
2238+
for m in missing {
2239+
let hash = blake3::hash(&m);
2240+
warn!(me=%node_id.fmt_short(), ?hash, "missing");
2241+
}
2242+
}
2243+
Ok(()) => {
2244+
assert!(missing.is_empty());
2245+
}
2246+
}
2247+
}
2248+
if failed > 0 {
2249+
bail!("Receive side failed: {failed} nodes together missed {missing_total} messages of {expected_count}");
2250+
} else {
2251+
Ok(receivers)
2252+
}
2253+
});
2254+
2255+
let (senders, receivers) = (senders_fut, receivers_fut).try_join().await?;
2256+
info!("all done");
2257+
assert_eq!(senders.len(), node_count);
2258+
assert_eq!(receivers.len(), node_count);
2259+
drop(senders);
2260+
drop(receivers);
2261+
let _ = FuturesUnordered::from_iter(gossips.iter().map(|gossip| gossip.shutdown()))
2262+
.count()
2263+
.await;
2264+
let mut shutdown =
2265+
FuturesUnordered::from_iter(routers.into_iter().map(|router| async move {
2266+
(router.endpoint().node_id(), router.shutdown().await)
2267+
}));
2268+
while let Some((node_id, res)) = shutdown.next().await {
2269+
res.with_context(|| format!("shutdown failed for {}", node_id.fmt_short()))?;
2270+
}
2271+
2272+
Ok(())
2273+
}
2274+
2275+
async fn task<T: Send + 'static>(
2276+
fut: impl std::future::Future<Output = T> + Send + 'static,
2277+
) -> T {
2278+
n0_future::task::spawn(fut).await.unwrap()
2279+
}
20162280
}

0 commit comments

Comments
 (0)