@@ -25,7 +25,7 @@ use linera_sdk::abis::fungible;
2525use linera_storage:: Storage ;
2626use num_format:: { Locale , ToFormattedString } ;
2727use prometheus_parse:: { HistogramCount , Scrape , Value } ;
28- use tokio:: { runtime:: Handle , task, time} ;
28+ use tokio:: { runtime:: Handle , sync :: mpsc , task, time} ;
2929use tokio_util:: sync:: CancellationToken ;
3030use tracing:: { debug, error, info, warn, Instrument as _} ;
3131
@@ -37,9 +37,9 @@ pub enum BenchmarkError {
3737 #[ error( "Proxy of validator {0} unhealthy! Latency p99 is too high: {1} ms" ) ]
3838 ProxyUnhealthy ( String , f64 ) ,
3939 #[ error( "Failed to send message: {0}" ) ]
40- SendError ( #[ from] crossbeam_channel:: SendError < ( ) > ) ,
40+ CrossbeamSendError ( #[ from] crossbeam_channel:: SendError < ( ) > ) ,
4141 #[ error( "Failed to join task: {0}" ) ]
42- JoinError ( #[ from] tokio :: task:: JoinError ) ,
42+ JoinError ( #[ from] task:: JoinError ) ,
4343 #[ error( "Failed to parse validator metrics port: {0}" ) ]
4444 ParseValidatorMetricsPort ( #[ from] std:: num:: ParseIntError ) ,
4545 #[ error( "Failed to parse validator metrics address: {0}" ) ]
@@ -72,6 +72,8 @@ pub enum BenchmarkError {
7272 NoDataYetForP99Calculation ,
7373 #[ error( "Unexpected empty bucket" ) ]
7474 UnexpectedEmptyBucket ,
75+ #[ error( "Failed to send message: {0}" ) ]
76+ TokioSendError ( #[ from] mpsc:: error:: SendError < ( ) > ) ,
7577}
7678
7779#[ derive( Debug ) ]
@@ -119,7 +121,7 @@ where
119121 // the desired BPS, the tasks would continue sending block proposals until the channel's
120122 // buffer is filled, which would cause us to not properly control the BPS rate.
121123 let ( sender, receiver) = crossbeam_channel:: bounded ( 0 ) ;
122- let bps_control_task = tokio :: task:: spawn_blocking ( move || {
124+ let bps_control_task = task:: spawn_blocking ( move || {
123125 handle. block_on ( async move {
124126 let mut recv_count = 0 ;
125127 let mut start = time:: Instant :: now ( ) ;
@@ -166,6 +168,19 @@ where
166168 } )
167169 } ) ;
168170
171+ let ( bps_tasks_logger_sender, mut bps_tasks_logger_receiver) = mpsc:: channel ( num_chains) ;
172+ let bps_tasks_logger_task = task:: spawn ( async move {
173+ let mut tasks_running = 0 ;
174+ while let Some ( ( ) ) = bps_tasks_logger_receiver. recv ( ) . await {
175+ tasks_running += 1 ;
176+ info ! ( "{}/{} tasks running" , tasks_running, num_chains) ;
177+ if tasks_running == num_chains {
178+ info ! ( "All tasks are running" ) ;
179+ break ;
180+ }
181+ }
182+ } ) ;
183+
169184 let mut bps_remainder = bps. unwrap_or_default ( ) % num_chains;
170185 let bps_share = bps. map ( |bps| bps / num_chains) ;
171186
@@ -184,6 +199,7 @@ where
184199 let committee = committee. clone ( ) ;
185200 let local_node = local_node. clone ( ) ;
186201 let chain_client = chain_clients[ & chain_id] . clone ( ) ;
202+ let bps_tasks_logger_sender = bps_tasks_logger_sender. clone ( ) ;
187203 chain_client. process_inbox ( ) . await ?;
188204 join_set. spawn_blocking ( move || {
189205 handle. block_on (
@@ -198,6 +214,7 @@ where
198214 sender,
199215 committee,
200216 local_node,
217+ bps_tasks_logger_sender,
201218 ) )
202219 . await ?;
203220
@@ -224,6 +241,7 @@ where
224241 if let Some ( metrics_watcher) = metrics_watcher {
225242 metrics_watcher. await ??;
226243 }
244+ bps_tasks_logger_task. await ?;
227245
228246 Ok ( ( ) )
229247 }
@@ -484,12 +502,14 @@ where
484502 sender : crossbeam_channel:: Sender < ( ) > ,
485503 committee : Committee ,
486504 local_node : LocalNodeClient < S > ,
505+ bps_tasks_logger_sender : mpsc:: Sender < ( ) > ,
487506 ) -> Result < ( ) , BenchmarkError > {
488507 let chain_id = chain_client. chain_id ( ) ;
489508 info ! (
490509 "Starting benchmark at target BPS of {:?}, for chain {:?}" ,
491510 bps, chain_id
492511 ) ;
512+ bps_tasks_logger_sender. send ( ( ) ) . await ?;
493513 let cross_chain_message_delivery = chain_client. options ( ) . cross_chain_message_delivery ;
494514 let mut num_sent_proposals = 0 ;
495515 let authenticated_signer = Some ( AccountOwner :: from ( key_pair. public ( ) ) ) ;
0 commit comments