Skip to content

Commit cfefa65

Browse files
committed
tests: improve net_big test
1 parent 1eefb62 commit cfefa65

File tree

1 file changed

+171
-58
lines changed

1 file changed

+171
-58
lines changed

src/net.rs

Lines changed: 171 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,6 +1348,7 @@ impl Dialer {
13481348
mod test {
13491349
use std::time::Duration;
13501350

1351+
use anyhow::{anyhow, bail};
13511352
use bytes::Bytes;
13521353
use futures_concurrency::future::TryJoin;
13531354
use iroh::{protocol::Router, RelayMap, RelayMode, SecretKey};
@@ -2016,20 +2017,32 @@ mod test {
20162017
Ok(())
20172018
}
20182019

2019-
#[tokio::test]
2020-
// #[traced_test]
2020+
#[tokio::test(flavor = "multi_thread")]
20212021
async fn gossip_net_big() -> TestResult {
2022-
tracing_subscriber::fmt::try_init().ok();
20232022
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
20242023
let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
20252024
let dns = iroh::test_utils::DnsPkarrServer::run().await?;
20262025

2027-
let node_count = std::env::var("NODE_COUNT")
2028-
.map(|x| x.parse().unwrap())
2029-
.unwrap_or(10);
2030-
let message_count = std::env::var("MESSAGE_COUNT")
2026+
let node_count: usize = std::env::var("NODE_COUNT")
20312027
.map(|x| x.parse().unwrap())
20322028
.unwrap_or(100);
2029+
let message_count: usize = std::env::var("MESSAGE_COUNT")
2030+
.map(|x| x.parse().unwrap())
2031+
.unwrap_or(2);
2032+
2033+
let warmup_sleep_s = std::env::var("WARMUP_SLEEP")
2034+
.map(|x| x.parse().unwrap())
2035+
.unwrap_or(1);
2036+
2037+
let send_interval_ms = std::env::var("SEND_INTERVAL")
2038+
.map(|x| x.parse().unwrap())
2039+
.unwrap_or(50);
2040+
2041+
let timeout_ms = std::env::var("TIMEOUT")
2042+
.map(|x| x.parse().unwrap())
2043+
.unwrap_or(10000);
2044+
let timeout = Duration::from_millis(timeout_ms);
2045+
info!("recv timeout: {timeout:?}");
20332046

20342047
// spawn
20352048
info!("spawn {node_count} nodes");
@@ -2051,8 +2064,7 @@ mod test {
20512064
let gossip = Gossip::builder().spawn(endpoint.clone()).await?;
20522065
let router = Router::builder(endpoint)
20532066
.accept(GOSSIP_ALPN, gossip.clone())
2054-
.spawn()
2055-
.await?;
2067+
.spawn();
20562068
anyhow::Ok((router, gossip))
20572069
})
20582070
}));
@@ -2074,86 +2086,187 @@ mod test {
20742086
let bootstrap_node = routers[0].endpoint().node_id();
20752087

20762088
let mut senders = vec![];
2077-
let mut receivers = FuturesUnordered::new();
20782089

2090+
let bootstrap_count = node_count.min(10).max(node_count / 50);
2091+
info!("start with {bootstrap_count} bootstrap nodes");
2092+
let mut joining = FuturesUnordered::new();
20792093
#[allow(clippy::needless_range_loop)]
2080-
for i in 0..node_count {
2094+
for i in 0..bootstrap_count {
20812095
let bootstrap = if i == 0 { vec![] } else { vec![bootstrap_node] };
20822096
let (sender, mut receiver) = gossips[i].subscribe(topic_id, bootstrap)?.split();
2083-
senders.push(sender);
2084-
receivers.push(async move {
2085-
receiver.joined().await?;
2086-
Ok(receiver)
2087-
});
2097+
let endpoint = routers[i].endpoint().clone();
2098+
senders.push((sender, endpoint.node_id()));
2099+
joining.push(
2100+
async move {
2101+
receiver.joined().await?;
2102+
Ok((receiver, endpoint))
2103+
}
2104+
.boxed(),
2105+
);
20882106
}
20892107

2090-
let receivers: anyhow::Result<Vec<GossipReceiver>> = receivers.try_collect().await;
2091-
let receivers = receivers.context("failed to join all nodes")?;
2092-
info!("all joined");
2108+
let joined: anyhow::Result<Vec<_>> = joining.try_collect().await;
2109+
let mut receivers = joined.context("failed to join all nodes")?;
2110+
info!("bootstrap nodes joined");
2111+
2112+
info!("sleep {warmup_sleep_s}s for swarm to stabilize");
2113+
tokio::time::sleep(Duration::from_secs(warmup_sleep_s)).await;
2114+
2115+
info!("join {} remaining nodes", node_count - bootstrap_count);
2116+
let chunks = node_count / bootstrap_count;
2117+
for chunk in 1..chunks {
2118+
let mut joining = FuturesUnordered::new();
2119+
#[allow(clippy::needless_range_loop)]
2120+
for j in 0..bootstrap_count {
2121+
let i = (chunk * bootstrap_count) + j;
2122+
if i >= node_count {
2123+
break;
2124+
}
2125+
let bootstrap = vec![routers[i % bootstrap_count].endpoint().node_id()];
2126+
let (sender, mut receiver) = gossips[i].subscribe(topic_id, bootstrap)?.split();
2127+
let endpoint = routers[i].endpoint().clone();
2128+
senders.push((sender, endpoint.node_id()));
2129+
joining.push(
2130+
async move {
2131+
receiver.joined().await?;
2132+
Ok((receiver, endpoint))
2133+
}
2134+
.boxed(),
2135+
);
2136+
}
20932137

2094-
let sleep_seconds = std::env::var("WARMUP_SLEEP")
2095-
.map(|x| x.parse().unwrap())
2096-
.unwrap_or(1);
2097-
info!("sleep {sleep_seconds}s for swarm to stabilize");
2098-
tokio::time::sleep(Duration::from_secs(sleep_seconds)).await;
2138+
let joined: anyhow::Result<Vec<_>> = joining.try_collect().await;
2139+
receivers.extend(joined.context("failed to join all nodes")?);
2140+
info!("joined chunk {chunk} of {chunks} with {bootstrap_count}");
2141+
}
20992142

2100-
let send_interval_ms = std::env::var("SEND_INTERVAL")
2101-
.map(|x| x.parse().unwrap())
2102-
.unwrap_or(5);
2143+
info!("sleep {warmup_sleep_s}s for swarm to stabilize");
2144+
tokio::time::sleep(Duration::from_secs(warmup_sleep_s)).await;
21032145

21042146
info!("sending & receiving {message_count} messages on each node");
21052147
// spawn send tasks
2106-
let sending = senders.into_iter().enumerate().map(|(i, sender)| {
2148+
let sending = senders.into_iter().enumerate().map(|(i, (sender, me))| {
21072149
task(async move {
21082150
for j in 0..message_count {
2109-
let message = format!("{i}:{j}");
2151+
let message = format!("{}:{}", me.fmt_short(), j);
21102152
let message: Bytes = message.as_bytes().to_vec().into();
21112153
sender.broadcast(message).await?;
2112-
if j % (message_count / 10.min(message_count)) == 0 {
2113-
info!("{i}: sent {j} of {message_count}") // // #[tokio::test]
2114-
}
21152154
tokio::time::sleep(Duration::from_millis(send_interval_ms)).await
21162155
}
2117-
info!("{i}: sent all");
2118-
anyhow::Ok(())
2156+
debug!("{i}: sent all");
2157+
anyhow::Ok((me, sender))
21192158
})
21202159
});
21212160
let sending = FuturesUnordered::from_iter(sending);
2161+
2162+
let all_messages: BTreeSet<Bytes> = routers
2163+
.iter()
2164+
.map(|r| r.endpoint().node_id())
2165+
.flat_map(|node_id| {
2166+
(0..message_count)
2167+
.map(move |i| format!("{}:{}", node_id.fmt_short(), i).into_bytes().into())
2168+
})
2169+
.collect();
2170+
let all_messages = Arc::new(all_messages);
2171+
2172+
// closure to create a set of expected messages at a peer
2173+
let expected = move |all_messages: &BTreeSet<Bytes>, me: NodeId| -> BTreeSet<Bytes> {
2174+
let me = me.fmt_short();
2175+
all_messages
2176+
.iter()
2177+
.filter(|m| !m.starts_with(me.as_bytes()))
2178+
.cloned()
2179+
.collect()
2180+
};
2181+
21222182
// spawn recv tasks
2123-
let receiving = receivers.into_iter().enumerate().map(|(i, mut receiver)| {
2183+
let receiving = receivers.into_iter().map(|(mut receiver, endpoint)| {
2184+
let all_messages = Arc::clone(&all_messages);
2185+
let me = endpoint.node_id();
21242186
task(async move {
2125-
let total = message_count * (node_count - 1);
2126-
let mut received = 0;
2127-
while let Some(event) = receiver.try_next().await? {
2128-
if let Event::Gossip(GossipEvent::Received(_message)) = event {
2129-
received += 1;
2130-
if received % ((message_count / 10.min(message_count)) * node_count) == 0 {
2131-
info!("{i}: received {received} of {total}");
2187+
let mut missing = expected(&all_messages, endpoint.node_id());
2188+
let timeout = tokio::time::sleep(timeout);
2189+
tokio::pin!(timeout);
2190+
let res = loop {
2191+
let event = tokio::select! {
2192+
res = receiver.next() => {
2193+
match res {
2194+
None => break Err(anyhow!("receiver closed")),
2195+
Some(Err(err)) => break Err(err.into()),
2196+
Some(Ok(event)) => event,
2197+
}
2198+
},
2199+
_ = &mut timeout => break Err(anyhow!("timeout"))
2200+
};
2201+
if let Event::Gossip(GossipEvent::Received(message)) = event {
2202+
if !missing.remove(&message.content) {
2203+
break Err(anyhow!(
2204+
"duplicate message: {:?} delivered from {}",
2205+
String::from_utf8_lossy(&message.content),
2206+
message.delivered_from.fmt_short()
2207+
));
21322208
}
2133-
if received == total {
2134-
info!("{i}: received all");
2135-
break;
2209+
if missing.is_empty() {
2210+
break Ok(());
21362211
}
21372212
}
2138-
}
2139-
anyhow::Ok(receiver)
2213+
};
2214+
(receiver, missing, res)
21402215
})
2216+
.map(move |res| (me, res))
21412217
});
2142-
let receiving = FuturesUnordered::from_iter(receiving);
2218+
let mut receiving = FuturesUnordered::from_iter(receiving);
21432219

2144-
let count_send = async move {
2145-
let res: Vec<_> = sending.try_collect().await?;
2146-
anyhow::Ok(res.len())
2147-
};
2148-
let count_recv = async move {
2149-
let res: Vec<_> = receiving.try_collect().await?;
2150-
anyhow::Ok(res.len())
2220+
let senders_fut = async move {
2221+
let senders: Vec<_> = sending.try_collect().await?;
2222+
anyhow::Ok(senders)
21512223
};
2224+
let expected_count = message_count * (node_count - 1);
2225+
let receivers_fut = task(async move {
2226+
let mut failed = 0;
2227+
let mut missing_total = 0;
2228+
let mut receivers = vec![];
2229+
while let Some(res) = receiving.next().await {
2230+
let (node_id, (receiver, missing, res)) = res;
2231+
receivers.push(receiver);
2232+
match res {
2233+
Err(err) => {
2234+
missing_total += missing.len();
2235+
failed += 1;
2236+
warn!(me=%node_id.fmt_short(), ?missing, "recv task failed: {err:#}");
2237+
for m in missing {
2238+
let hash = blake3::hash(&m);
2239+
warn!(me=%node_id.fmt_short(), ?hash, "missing");
2240+
}
2241+
}
2242+
Ok(()) => {
2243+
assert!(missing.is_empty());
2244+
}
2245+
}
2246+
}
2247+
if failed > 0 {
2248+
bail!("Receive side failed: {failed} nodes together missed {missing_total} messages of {expected_count}");
2249+
} else {
2250+
Ok(receivers)
2251+
}
2252+
});
21522253

2153-
let (count_send, count_recv) = (count_send, count_recv).try_join().await?;
2254+
let (senders, receivers) = (senders_fut, receivers_fut).try_join().await?;
21542255
info!("all done");
2155-
assert_eq!(count_send, node_count);
2156-
assert_eq!(count_recv, node_count);
2256+
assert_eq!(senders.len(), node_count);
2257+
assert_eq!(receivers.len(), node_count);
2258+
drop(senders);
2259+
drop(receivers);
2260+
let _ = FuturesUnordered::from_iter(gossips.iter().map(|gossip| gossip.shutdown()))
2261+
.count()
2262+
.await;
2263+
let mut shutdown =
2264+
FuturesUnordered::from_iter(routers.into_iter().map(|router| async move {
2265+
(router.endpoint().node_id(), router.shutdown().await)
2266+
}));
2267+
while let Some((node_id, res)) = shutdown.next().await {
2268+
res.with_context(|| format!("shutdown failed for {}", node_id.fmt_short()))?;
2269+
}
21572270

21582271
Ok(())
21592272
}

0 commit comments

Comments
 (0)