@@ -12,6 +12,7 @@ use std::path::Path;
1212use std:: path:: PathBuf ;
1313use std:: pin:: Pin ;
1414use std:: sync:: Arc ;
15+ use std:: sync:: RwLock ;
1516use std:: task:: Context as TaskContext ;
1617use std:: task:: Poll ;
1718use std:: time:: Duration ;
@@ -668,6 +669,30 @@ pub struct LogClientActor {
668669 /// The watch sender for the aggregation window in seconds
669670 aggregate_window_tx : watch:: Sender < u64 > ,
670671 should_aggregate : bool ,
672+ // Store aggregators directly in the actor for access in Drop
673+ aggregators : Arc < RwLock < HashMap < OutputTarget , Aggregator > > > ,
674+ }
675+
676+ impl LogClientActor {
677+ fn print_aggregators ( aggregators : & RwLock < HashMap < OutputTarget , Aggregator > > ) {
678+ let mut aggregators_guard = aggregators. write ( ) . unwrap ( ) ;
679+ for ( output_target, aggregator) in aggregators_guard. iter_mut ( ) {
680+ if aggregator. is_empty ( ) {
681+ continue ;
682+ }
683+ match output_target {
684+ OutputTarget :: Stdout => {
685+ println ! ( "{}" , aggregator) ;
686+ }
687+ OutputTarget :: Stderr => {
688+ eprintln ! ( "{}" , aggregator) ;
689+ }
690+ }
691+
692+ // Reset the aggregator
693+ aggregator. reset ( ) ;
694+ }
695+ }
671696}
672697
673698#[ async_trait]
@@ -683,26 +708,42 @@ impl Actor for LogClientActor {
683708 let ( aggregate_window_tx, aggregate_window_rx) =
684709 watch:: channel ( DEFAULT_AGGREGATE_WINDOW_SEC ) ;
685710
711+ // Initialize aggregators
712+ let mut aggregators = HashMap :: new ( ) ;
713+ aggregators. insert ( OutputTarget :: Stderr , Aggregator :: new ( ) ) ;
714+ aggregators. insert ( OutputTarget :: Stdout , Aggregator :: new ( ) ) ;
715+ let aggregators = Arc :: new ( RwLock :: new ( aggregators) ) ;
716+
717+ // Clone aggregators for the aggregator task
718+ let aggregators_for_task = Arc :: clone ( & aggregators) ;
719+
686720 // Start the loggregator
687- let aggregator_handle =
688- { tokio:: spawn ( async move { start_aggregator ( log_rx, aggregate_window_rx) . await } ) } ;
721+ let aggregator_handle = tokio:: spawn ( async move {
722+ start_aggregator ( log_rx, aggregate_window_rx, aggregators_for_task) . await
723+ } ) ;
689724
690725 Ok ( Self {
691726 log_tx,
692727 aggregator_handle,
693728 aggregate_window_tx,
694- should_aggregate : false ,
729+ should_aggregate : true ,
730+ aggregators,
695731 } )
696732 }
697733}
698734
735+ impl Drop for LogClientActor {
736+ fn drop ( & mut self ) {
737+ // Flush the remaining logs before shutting down
738+ Self :: print_aggregators ( & self . aggregators ) ;
739+ }
740+ }
741+
699742async fn start_aggregator (
700743 mut log_rx : mpsc:: Receiver < ( OutputTarget , String ) > ,
701744 mut interval_sec_rx : watch:: Receiver < u64 > ,
745+ aggregators : Arc < RwLock < HashMap < OutputTarget , Aggregator > > > ,
702746) -> anyhow:: Result < ( ) > {
703- let mut aggregators = HashMap :: new ( ) ;
704- aggregators. insert ( OutputTarget :: Stderr , Aggregator :: new ( ) ) ;
705- aggregators. insert ( OutputTarget :: Stdout , Aggregator :: new ( ) ) ;
706747 let mut interval =
707748 tokio:: time:: interval ( tokio:: time:: Duration :: from_secs ( * interval_sec_rx. borrow ( ) ) ) ;
708749
@@ -711,7 +752,8 @@ async fn start_aggregator(
711752 tokio:: select! {
712753 // Process incoming log messages
713754 Some ( ( output_target, log_line) ) = log_rx. recv( ) => {
714- if let Some ( aggregator) = aggregators. get_mut( & output_target) {
755+ let mut aggregators_guard = aggregators. write( ) . unwrap( ) ;
756+ if let Some ( aggregator) = aggregators_guard. get_mut( & output_target) {
715757 if let Err ( e) = aggregator. add_line( & log_line) {
716758 tracing:: error!( "error adding log line: {}" , e) ;
717759 }
@@ -726,24 +768,14 @@ async fn start_aggregator(
726768
727769 // Every interval tick, print and reset the aggregator
728770 _ = interval. tick( ) => {
729- for ( output_target, aggregator) in aggregators. iter_mut( ) {
730- if aggregator. is_empty( ) {
731- continue ;
732- }
733- if output_target == & OutputTarget :: Stdout {
734- println!( "{}" , aggregator) ;
735- } else {
736- eprintln!( "{}" , aggregator) ;
737- }
738-
739- // Reset the aggregator
740- aggregator. reset( ) ;
741- }
771+ LogClientActor :: print_aggregators( & aggregators) ;
742772 }
743773
744774 // Exit if the channel is closed
745775 else => {
746776 tracing:: error!( "log channel closed, exiting aggregator" ) ;
777+ // Print final aggregated logs before shutting down
778+ LogClientActor :: print_aggregators( & aggregators) ;
747779 break ;
748780 }
749781 }
0 commit comments