@@ -1352,7 +1352,9 @@ mod test {
1352
1352
use bytes:: Bytes ;
1353
1353
use futures_concurrency:: future:: TryJoin ;
1354
1354
use iroh:: { protocol:: Router , RelayMap , RelayMode , SecretKey } ;
1355
+ use n0_future:: { FuturesUnordered , StreamExt } ;
1355
1356
use rand:: Rng ;
1357
+ use testresult:: TestResult ;
1356
1358
use tokio:: { spawn, time:: timeout} ;
1357
1359
use tokio_util:: sync:: CancellationToken ;
1358
1360
use tracing:: { info, instrument} ;
@@ -2014,4 +2016,152 @@ mod test {
2014
2016
2015
2017
Ok ( ( ) )
2016
2018
}
2019
+
2020
+ #[ tokio:: test]
2021
+ // #[traced_test]
2022
+ async fn gossip_net_big ( ) -> TestResult {
2023
+ tracing_subscriber:: fmt:: try_init ( ) . ok ( ) ;
2024
+ let mut rng = rand_chacha:: ChaCha12Rng :: seed_from_u64 ( 1 ) ;
2025
+ let ( relay_map, _relay_url, _guard) = iroh:: test_utils:: run_relay_server ( ) . await . unwrap ( ) ;
2026
+ let dns = iroh:: test_utils:: DnsPkarrServer :: run ( ) . await ?;
2027
+
2028
+ let node_count = std:: env:: var ( "NODE_COUNT" )
2029
+ . map ( |x| x. parse ( ) . unwrap ( ) )
2030
+ . unwrap_or ( 10 ) ;
2031
+ let message_count = std:: env:: var ( "MESSAGE_COUNT" )
2032
+ . map ( |x| x. parse ( ) . unwrap ( ) )
2033
+ . unwrap_or ( 100 ) ;
2034
+
2035
+ // spawn
2036
+ info ! ( "spawn {node_count} nodes" ) ;
2037
+ let secret_keys = ( 0 ..node_count) . map ( |_i| SecretKey :: generate ( & mut rng) ) ;
2038
+ let spawning = FuturesUnordered :: from_iter ( secret_keys. map ( |secret_key| {
2039
+ let relay_map = relay_map. clone ( ) ;
2040
+ let discovery = dns. discovery ( secret_key. clone ( ) ) ;
2041
+ let dns_resolver = dns. dns_resolver ( ) ;
2042
+ task ( async move {
2043
+ let endpoint = Endpoint :: builder ( )
2044
+ . secret_key ( secret_key)
2045
+ . alpns ( vec ! [ GOSSIP_ALPN . to_vec( ) ] )
2046
+ . relay_mode ( RelayMode :: Custom ( relay_map) )
2047
+ . discovery ( discovery)
2048
+ . dns_resolver ( dns_resolver)
2049
+ . insecure_skip_relay_cert_verify ( true )
2050
+ . bind ( )
2051
+ . await ?;
2052
+ let gossip = Gossip :: builder ( ) . spawn ( endpoint. clone ( ) ) . await ?;
2053
+ let router = Router :: builder ( endpoint)
2054
+ . accept ( GOSSIP_ALPN , gossip. clone ( ) )
2055
+ . spawn ( )
2056
+ . await ?;
2057
+ anyhow:: Ok ( ( router, gossip) )
2058
+ } )
2059
+ } ) ) ;
2060
+ let spawned: Vec < _ > = spawning. try_collect ( ) . await ?;
2061
+ let ( routers, gossips) : ( Vec < _ > , Vec < _ > ) = spawned. into_iter ( ) . unzip ( ) ;
2062
+ info ! ( "all spawned" ) ;
2063
+
2064
+ // wait for all nodes to be visible on the router
2065
+ for router in routers. iter ( ) {
2066
+ let node_id = router. endpoint ( ) . node_id ( ) ;
2067
+ dns. on_node ( & node_id, Duration :: from_secs ( 1 ) ) . await ?;
2068
+ }
2069
+
2070
+ info ! ( "all published to discovery" ) ;
2071
+
2072
+ // bootstrap
2073
+ let topic_id = TopicId :: from_bytes ( [ 0u8 ; 32 ] ) ;
2074
+
2075
+ let bootstrap_node = routers[ 0 ] . endpoint ( ) . node_id ( ) ;
2076
+
2077
+ let mut senders = vec ! [ ] ;
2078
+ let mut receivers = FuturesUnordered :: new ( ) ;
2079
+
2080
+ #[ allow( clippy:: needless_range_loop) ]
2081
+ for i in 0 ..node_count {
2082
+ let bootstrap = if i == 0 { vec ! [ ] } else { vec ! [ bootstrap_node] } ;
2083
+ let ( sender, mut receiver) = gossips[ i] . subscribe ( topic_id, bootstrap) ?. split ( ) ;
2084
+ senders. push ( sender) ;
2085
+ receivers. push ( async move {
2086
+ receiver. joined ( ) . await ?;
2087
+ Ok ( receiver)
2088
+ } ) ;
2089
+ }
2090
+
2091
+ let receivers: anyhow:: Result < Vec < GossipReceiver > > = receivers. try_collect ( ) . await ;
2092
+ let receivers = receivers. context ( "failed to join all nodes" ) ?;
2093
+ info ! ( "all joined" ) ;
2094
+
2095
+ let sleep_seconds = std:: env:: var ( "WARMUP_SLEEP" )
2096
+ . map ( |x| x. parse ( ) . unwrap ( ) )
2097
+ . unwrap_or ( 1 ) ;
2098
+ info ! ( "sleep {sleep_seconds}s for swarm to stabilize" ) ;
2099
+ tokio:: time:: sleep ( Duration :: from_secs ( sleep_seconds) ) . await ;
2100
+
2101
+ let send_interval_ms = std:: env:: var ( "SEND_INTERVAL" )
2102
+ . map ( |x| x. parse ( ) . unwrap ( ) )
2103
+ . unwrap_or ( 5 ) ;
2104
+
2105
+ info ! ( "sending & receiving {message_count} messages on each node" ) ;
2106
+ // spawn send tasks
2107
+ let sending = senders. into_iter ( ) . enumerate ( ) . map ( |( i, sender) | {
2108
+ task ( async move {
2109
+ for j in 0 ..message_count {
2110
+ let message = format ! ( "{i}:{j}" ) ;
2111
+ let message: Bytes = message. as_bytes ( ) . to_vec ( ) . into ( ) ;
2112
+ sender. broadcast ( message) . await ?;
2113
+ if j % ( message_count / 10 . min ( message_count) ) == 0 {
2114
+ info ! ( "{i}: sent {j} of {message_count}" ) // // #[tokio::test]
2115
+ }
2116
+ tokio:: time:: sleep ( Duration :: from_millis ( send_interval_ms) ) . await
2117
+ }
2118
+ info ! ( "{i}: sent all" ) ;
2119
+ anyhow:: Ok ( ( ) )
2120
+ } )
2121
+ } ) ;
2122
+ let sending = FuturesUnordered :: from_iter ( sending) ;
2123
+ // spawn recv tasks
2124
+ let receiving = receivers. into_iter ( ) . enumerate ( ) . map ( |( i, mut receiver) | {
2125
+ task ( async move {
2126
+ let total = message_count * ( node_count - 1 ) ;
2127
+ let mut received = 0 ;
2128
+ while let Some ( event) = receiver. try_next ( ) . await ? {
2129
+ if let Event :: Gossip ( GossipEvent :: Received ( _message) ) = event {
2130
+ received += 1 ;
2131
+ if received % ( ( message_count / 10 . min ( message_count) ) * node_count) == 0 {
2132
+ info ! ( "{i}: received {received} of {total}" ) ;
2133
+ }
2134
+ if received == total {
2135
+ info ! ( "{i}: received all" ) ;
2136
+ break ;
2137
+ }
2138
+ }
2139
+ }
2140
+ anyhow:: Ok ( receiver)
2141
+ } )
2142
+ } ) ;
2143
+ let receiving = FuturesUnordered :: from_iter ( receiving) ;
2144
+
2145
+ let count_send = async move {
2146
+ let res: Vec < _ > = sending. try_collect ( ) . await ?;
2147
+ anyhow:: Ok ( res. len ( ) )
2148
+ } ;
2149
+ let count_recv = async move {
2150
+ let res: Vec < _ > = receiving. try_collect ( ) . await ?;
2151
+ anyhow:: Ok ( res. len ( ) )
2152
+ } ;
2153
+
2154
+ let ( count_send, count_recv) = ( count_send, count_recv) . try_join ( ) . await ?;
2155
+ info ! ( "all done" ) ;
2156
+ assert_eq ! ( count_send, node_count) ;
2157
+ assert_eq ! ( count_recv, node_count) ;
2158
+
2159
+ Ok ( ( ) )
2160
+ }
2161
+
2162
+ async fn task < T : Send + ' static > (
2163
+ fut : impl std:: future:: Future < Output = T > + Send + ' static ,
2164
+ ) -> T {
2165
+ n0_future:: task:: spawn ( fut) . await . unwrap ( )
2166
+ }
2017
2167
}
0 commit comments