Skip to content

Commit 9fc90bd

Browse files
committed
tests: improve net_big test
1 parent 8c7347d commit 9fc90bd

File tree

1 file changed

+177
-61
lines changed

1 file changed

+177
-61
lines changed

src/net.rs

Lines changed: 177 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,6 +1349,7 @@ impl Dialer {
13491349
mod test {
13501350
use std::time::Duration;
13511351

1352+
use anyhow::{anyhow, bail};
13521353
use bytes::Bytes;
13531354
use futures_concurrency::future::TryJoin;
13541355
use iroh::{protocol::Router, RelayMap, RelayMode, SecretKey};
@@ -2017,20 +2018,32 @@ mod test {
20172018
Ok(())
20182019
}
20192020

2020-
#[tokio::test]
2021-
// #[traced_test]
2021+
#[tokio::test(flavor = "multi_thread")]
20222022
async fn gossip_net_big() -> TestResult {
2023-
tracing_subscriber::fmt::try_init().ok();
20242023
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
20252024
let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
20262025
let dns = iroh::test_utils::DnsPkarrServer::run().await?;
20272026

2028-
let node_count = std::env::var("NODE_COUNT")
2029-
.map(|x| x.parse().unwrap())
2030-
.unwrap_or(10);
2031-
let message_count = std::env::var("MESSAGE_COUNT")
2027+
let node_count: usize = std::env::var("NODE_COUNT")
20322028
.map(|x| x.parse().unwrap())
20332029
.unwrap_or(100);
2030+
let message_count: usize = std::env::var("MESSAGE_COUNT")
2031+
.map(|x| x.parse().unwrap())
2032+
.unwrap_or(2);
2033+
2034+
let warmup_sleep_s = std::env::var("WARMUP_SLEEP")
2035+
.map(|x| x.parse().unwrap())
2036+
.unwrap_or(1);
2037+
2038+
let send_interval_ms = std::env::var("SEND_INTERVAL")
2039+
.map(|x| x.parse().unwrap())
2040+
.unwrap_or(50);
2041+
2042+
let timeout_ms = std::env::var("TIMEOUT")
2043+
.map(|x| x.parse().unwrap())
2044+
.unwrap_or(10000);
2045+
let timeout = Duration::from_millis(timeout_ms);
2046+
info!("recv timeout: {timeout:?}");
20342047

20352048
// spawn
20362049
info!("spawn {node_count} nodes");
@@ -2052,8 +2065,7 @@ mod test {
20522065
let gossip = Gossip::builder().spawn(endpoint.clone()).await?;
20532066
let router = Router::builder(endpoint)
20542067
.accept(GOSSIP_ALPN, gossip.clone())
2055-
.spawn()
2056-
.await?;
2068+
.spawn();
20572069
anyhow::Ok((router, gossip))
20582070
})
20592071
}));
@@ -2075,86 +2087,190 @@ mod test {
20752087
let bootstrap_node = routers[0].endpoint().node_id();
20762088

20772089
let mut senders = vec![];
2078-
let mut receivers = FuturesUnordered::new();
20792090

2091+
let bootstrap_count = node_count.min(10).max(node_count / 50);
2092+
info!("start with {bootstrap_count} bootstrap nodes");
2093+
let mut joining = FuturesUnordered::new();
20802094
#[allow(clippy::needless_range_loop)]
2081-
for i in 0..node_count {
2095+
for i in 0..bootstrap_count {
20822096
let bootstrap = if i == 0 { vec![] } else { vec![bootstrap_node] };
20832097
let (sender, mut receiver) = gossips[i].subscribe(topic_id, bootstrap)?.split();
2084-
senders.push(sender);
2085-
receivers.push(async move {
2086-
receiver.joined().await?;
2087-
Ok(receiver)
2088-
});
2098+
let endpoint = routers[i].endpoint().clone();
2099+
senders.push((sender, endpoint.node_id()));
2100+
joining.push(
2101+
async move {
2102+
receiver.joined().await?;
2103+
Ok((receiver, endpoint))
2104+
}
2105+
.boxed(),
2106+
);
20892107
}
20902108

2091-
let receivers: anyhow::Result<Vec<GossipReceiver>> = receivers.try_collect().await;
2092-
let receivers = receivers.context("failed to join all nodes")?;
2093-
info!("all joined");
2109+
let joined: anyhow::Result<Vec<_>> = joining.try_collect().await;
2110+
let mut receivers = joined.context("failed to join all nodes")?;
2111+
info!("bootstrap nodes joined");
2112+
2113+
info!("sleep {warmup_sleep_s}s for swarm to stabilize");
2114+
tokio::time::sleep(Duration::from_secs(warmup_sleep_s)).await;
2115+
2116+
info!("join {} remaining nodes", node_count - bootstrap_count);
2117+
let chunks = node_count / bootstrap_count;
2118+
for chunk in 1..chunks {
2119+
let mut joining = FuturesUnordered::new();
2120+
#[allow(clippy::needless_range_loop)]
2121+
for j in 0..bootstrap_count {
2122+
let i = (chunk * bootstrap_count) + j;
2123+
if i >= node_count {
2124+
break;
2125+
}
2126+
let bootstrap = vec![routers[i % bootstrap_count].endpoint().node_id()];
2127+
let (sender, mut receiver) = gossips[i].subscribe(topic_id, bootstrap)?.split();
2128+
let endpoint = routers[i].endpoint().clone();
2129+
senders.push((sender, endpoint.node_id()));
2130+
joining.push(
2131+
async move {
2132+
receiver.joined().await?;
2133+
Ok((receiver, endpoint))
2134+
}
2135+
.boxed(),
2136+
);
2137+
}
20942138

2095-
let sleep_seconds = std::env::var("WARMUP_SLEEP")
2096-
.map(|x| x.parse().unwrap())
2097-
.unwrap_or(1);
2098-
info!("sleep {sleep_seconds}s for swarm to stabilize");
2099-
tokio::time::sleep(Duration::from_secs(sleep_seconds)).await;
2139+
let joined: anyhow::Result<Vec<_>> = joining.try_collect().await;
2140+
receivers.extend(joined.context("failed to join all nodes")?);
2141+
info!("joined chunk {chunk} of {chunks} with {bootstrap_count}");
2142+
}
21002143

2101-
let send_interval_ms = std::env::var("SEND_INTERVAL")
2102-
.map(|x| x.parse().unwrap())
2103-
.unwrap_or(5);
2144+
info!("sleep {warmup_sleep_s}s for swarm to stabilize");
2145+
tokio::time::sleep(Duration::from_secs(warmup_sleep_s)).await;
21042146

21052147
info!("sending & receiving {message_count} messages on each node");
21062148
// spawn send tasks
2107-
let sending = senders.into_iter().enumerate().map(|(i, sender)| {
2149+
let sending = senders.into_iter().enumerate().map(|(i, (sender, me))| {
21082150
task(async move {
21092151
for j in 0..message_count {
2110-
let message = format!("{i}:{j}");
2152+
let message = format!("{}:{}", me.fmt_short(), j);
21112153
let message: Bytes = message.as_bytes().to_vec().into();
21122154
sender.broadcast(message).await?;
2113-
if j % (message_count / 10.min(message_count)) == 0 {
2114-
info!("{i}: sent {j} of {message_count}") // // #[tokio::test]
2115-
}
21162155
tokio::time::sleep(Duration::from_millis(send_interval_ms)).await
21172156
}
2118-
info!("{i}: sent all");
2119-
anyhow::Ok(())
2157+
debug!("{i}: sent all");
2158+
anyhow::Ok((me, sender))
21202159
})
21212160
});
21222161
let sending = FuturesUnordered::from_iter(sending);
2162+
2163+
let all_messages: BTreeSet<Bytes> = routers
2164+
.iter()
2165+
.map(|r| r.endpoint().node_id())
2166+
.flat_map(|node_id| {
2167+
(0..message_count)
2168+
.map(move |i| format!("{}:{}", node_id.fmt_short(), i).into_bytes().into())
2169+
})
2170+
.collect();
2171+
let all_messages = Arc::new(all_messages);
2172+
2173+
// closure to create a set of expected messages at a peer
2174+
let expected = move |all_messages: &BTreeSet<Bytes>, me: NodeId| -> BTreeSet<Bytes> {
2175+
let me = me.fmt_short();
2176+
all_messages
2177+
.iter()
2178+
.filter(|m| !m.starts_with(me.as_bytes()))
2179+
.cloned()
2180+
.collect()
2181+
};
2182+
21232183
// spawn recv tasks
2124-
let receiving = receivers.into_iter().enumerate().map(|(i, mut receiver)| {
2125-
task(async move {
2126-
let total = message_count * (node_count - 1);
2127-
let mut received = 0;
2128-
while let Some(event) = receiver.try_next().await? {
2129-
if let Event::Gossip(GossipEvent::Received(_message)) = event {
2130-
received += 1;
2131-
if received % ((message_count / 10.min(message_count)) * node_count) == 0 {
2132-
info!("{i}: received {received} of {total}");
2184+
let receiving = receivers
2185+
.into_iter()
2186+
.enumerate()
2187+
.map(|(_i, (mut receiver, endpoint))| {
2188+
let all_messages = Arc::clone(&all_messages);
2189+
let me = endpoint.node_id();
2190+
task(async move {
2191+
let mut missing = expected(&all_messages, endpoint.node_id());
2192+
let timeout = tokio::time::sleep(timeout);
2193+
tokio::pin!(timeout);
2194+
let res = loop {
2195+
let event = tokio::select! {
2196+
res = receiver.next() => {
2197+
match res {
2198+
None => break Err(anyhow!("receiver closed")),
2199+
Some(Err(err)) => break Err(err.into()),
2200+
Some(Ok(event)) => event,
2201+
}
2202+
},
2203+
_ = &mut timeout => break Err(anyhow!("timeout"))
2204+
};
2205+
if let Event::Gossip(GossipEvent::Received(message)) = event {
2206+
if !missing.remove(&message.content) {
2207+
break Err(anyhow!(
2208+
"duplicate message: {:?} delivered from {}",
2209+
String::from_utf8_lossy(&message.content),
2210+
message.delivered_from.fmt_short()
2211+
));
2212+
}
2213+
if missing.is_empty() {
2214+
break Ok(());
2215+
}
21332216
}
2134-
if received == total {
2135-
info!("{i}: received all");
2136-
break;
2217+
};
2218+
(receiver, missing, res)
2219+
})
2220+
.map(move |res| (me, res))
2221+
});
2222+
let mut receiving = FuturesUnordered::from_iter(receiving);
2223+
2224+
let senders_fut = async move {
2225+
let senders: Vec<_> = sending.try_collect().await?;
2226+
anyhow::Ok(senders)
2227+
};
2228+
let expected_count = message_count * (node_count - 1);
2229+
let receivers_fut = task(async move {
2230+
let mut failed = 0;
2231+
let mut missing_total = 0;
2232+
let mut receivers = vec![];
2233+
while let Some(res) = receiving.next().await {
2234+
let (node_id, (receiver, missing, res)) = res;
2235+
receivers.push(receiver);
2236+
match res {
2237+
Err(err) => {
2238+
missing_total += missing.len();
2239+
failed += 1;
2240+
warn!(me=%node_id.fmt_short(), ?missing, "recv task failed: {err:#}");
2241+
for m in missing {
2242+
let hash = blake3::hash(&m);
2243+
warn!(me=%node_id.fmt_short(), ?hash, "missing");
21372244
}
21382245
}
2246+
Ok(()) => {
2247+
assert!(missing.is_empty());
2248+
}
21392249
}
2140-
anyhow::Ok(receiver)
2141-
})
2250+
}
2251+
if failed > 0 {
2252+
bail!("Receive side failed: {failed} nodes together missed {missing_total} messages of {expected_count}");
2253+
} else {
2254+
Ok(receivers)
2255+
}
21422256
});
2143-
let receiving = FuturesUnordered::from_iter(receiving);
2144-
2145-
let count_send = async move {
2146-
let res: Vec<_> = sending.try_collect().await?;
2147-
anyhow::Ok(res.len())
2148-
};
2149-
let count_recv = async move {
2150-
let res: Vec<_> = receiving.try_collect().await?;
2151-
anyhow::Ok(res.len())
2152-
};
21532257

2154-
let (count_send, count_recv) = (count_send, count_recv).try_join().await?;
2258+
let (senders, receivers) = (senders_fut, receivers_fut).try_join().await?;
21552259
info!("all done");
2156-
assert_eq!(count_send, node_count);
2157-
assert_eq!(count_recv, node_count);
2260+
assert_eq!(senders.len(), node_count);
2261+
assert_eq!(receivers.len(), node_count);
2262+
drop(senders);
2263+
drop(receivers);
2264+
let _ = FuturesUnordered::from_iter(gossips.iter().map(|gossip| gossip.shutdown()))
2265+
.count()
2266+
.await;
2267+
let mut shutdown =
2268+
FuturesUnordered::from_iter(routers.into_iter().map(|router| async move {
2269+
(router.endpoint().node_id(), router.shutdown().await)
2270+
}));
2271+
while let Some((node_id, res)) = shutdown.next().await {
2272+
res.with_context(|| format!("shutdown failed for {}", node_id.fmt_short()))?;
2273+
}
21582274

21592275
Ok(())
21602276
}

0 commit comments

Comments
 (0)