@@ -25,7 +25,7 @@ use linera_sdk::abis::fungible;
2525use  linera_storage:: Storage ; 
2626use  num_format:: { Locale ,  ToFormattedString } ; 
2727use  prometheus_parse:: { HistogramCount ,  Scrape ,  Value } ; 
28- use  tokio:: { runtime:: Handle ,  task,  time} ; 
28+ use  tokio:: { runtime:: Handle ,  sync :: mpsc ,   task,  time} ; 
2929use  tokio_util:: sync:: CancellationToken ; 
3030use  tracing:: { debug,  error,  info,  warn,  Instrument  as  _} ; 
3131
@@ -37,9 +37,9 @@ pub enum BenchmarkError {
3737    #[ error( "Proxy of validator {0} unhealthy! Latency p99 is too high: {1} ms" ) ]  
3838    ProxyUnhealthy ( String ,  f64 ) , 
3939    #[ error( "Failed to send message: {0}" ) ]  
40-     SendError ( #[ from]   crossbeam_channel:: SendError < ( ) > ) , 
40+     CrossbeamSendError ( #[ from]   crossbeam_channel:: SendError < ( ) > ) , 
4141    #[ error( "Failed to join task: {0}" ) ]  
42-     JoinError ( #[ from]   tokio :: task:: JoinError ) , 
42+     JoinError ( #[ from]   task:: JoinError ) , 
4343    #[ error( "Failed to parse validator metrics port: {0}" ) ]  
4444    ParseValidatorMetricsPort ( #[ from]   std:: num:: ParseIntError ) , 
4545    #[ error( "Failed to parse validator metrics address: {0}" ) ]  
@@ -72,6 +72,8 @@ pub enum BenchmarkError {
7272    NoDataYetForP99Calculation , 
7373    #[ error( "Unexpected empty bucket" ) ]  
7474    UnexpectedEmptyBucket , 
75+     #[ error( "Failed to send message: {0}" ) ]  
76+     TokioSendError ( #[ from]   mpsc:: error:: SendError < ( ) > ) , 
7577} 
7678
7779#[ derive( Debug ) ]  
@@ -119,7 +121,7 @@ where
119121        // the desired BPS, the tasks would continue sending block proposals until the channel's 
120122        // buffer is filled, which would cause us to not properly control the BPS rate. 
121123        let  ( sender,  receiver)  = crossbeam_channel:: bounded ( 0 ) ; 
122-         let  bps_control_task = tokio :: task:: spawn_blocking ( move  || { 
124+         let  bps_control_task = task:: spawn_blocking ( move  || { 
123125            handle. block_on ( async  move  { 
124126                let  mut  recv_count = 0 ; 
125127                let  mut  start = time:: Instant :: now ( ) ; 
@@ -166,6 +168,19 @@ where
166168            } ) 
167169        } ) ; 
168170
171+         let  ( bps_tasks_logger_sender,  mut  bps_tasks_logger_receiver)  = mpsc:: channel ( num_chains) ; 
172+         let  bps_tasks_logger_task = task:: spawn ( async  move  { 
173+             let  mut  tasks_running = 0 ; 
174+             while  let  Some ( ( ) )  = bps_tasks_logger_receiver. recv ( ) . await  { 
175+                 tasks_running += 1 ; 
176+                 info ! ( "{}/{} tasks running" ,  tasks_running,  num_chains) ; 
177+                 if  tasks_running == num_chains { 
178+                     info ! ( "All tasks are running" ) ; 
179+                     break ; 
180+                 } 
181+             } 
182+         } ) ; 
183+ 
169184        let  mut  bps_remainder = bps. unwrap_or_default ( )  % num_chains; 
170185        let  bps_share = bps. map ( |bps| bps / num_chains) ; 
171186
@@ -184,6 +199,7 @@ where
184199            let  committee = committee. clone ( ) ; 
185200            let  local_node = local_node. clone ( ) ; 
186201            let  chain_client = chain_clients[ & chain_id] . clone ( ) ; 
202+             let  bps_tasks_logger_sender = bps_tasks_logger_sender. clone ( ) ; 
187203            chain_client. process_inbox ( ) . await ?; 
188204            join_set. spawn_blocking ( move  || { 
189205                handle. block_on ( 
@@ -198,6 +214,7 @@ where
198214                            sender, 
199215                            committee, 
200216                            local_node, 
217+                             bps_tasks_logger_sender, 
201218                        ) ) 
202219                        . await ?; 
203220
@@ -224,6 +241,7 @@ where
224241        if  let  Some ( metrics_watcher)  = metrics_watcher { 
225242            metrics_watcher. await ??; 
226243        } 
244+         bps_tasks_logger_task. await ?; 
227245
228246        Ok ( ( ) ) 
229247    } 
@@ -484,12 +502,14 @@ where
484502        sender :  crossbeam_channel:: Sender < ( ) > , 
485503        committee :  Committee , 
486504        local_node :  LocalNodeClient < S > , 
505+         bps_tasks_logger_sender :  mpsc:: Sender < ( ) > , 
487506    )  -> Result < ( ) ,  BenchmarkError >  { 
488507        let  chain_id = chain_client. chain_id ( ) ; 
489508        info ! ( 
490509            "Starting benchmark at target BPS of {:?}, for chain {:?}" , 
491510            bps,  chain_id
492511        ) ; 
512+         bps_tasks_logger_sender. send ( ( ) ) . await ?; 
493513        let  cross_chain_message_delivery = chain_client. options ( ) . cross_chain_message_delivery ; 
494514        let  mut  num_sent_proposals = 0 ; 
495515        let  authenticated_signer = Some ( AccountOwner :: from ( key_pair. public ( ) ) ) ; 
0 commit comments