@@ -12,6 +12,7 @@ use std::path::Path;
12
12
use std:: path:: PathBuf ;
13
13
use std:: pin:: Pin ;
14
14
use std:: sync:: Arc ;
15
+ use std:: sync:: RwLock ;
15
16
use std:: task:: Context as TaskContext ;
16
17
use std:: task:: Poll ;
17
18
use std:: time:: Duration ;
@@ -668,6 +669,30 @@ pub struct LogClientActor {
668
669
/// The watch sender for the aggregation window in seconds
669
670
aggregate_window_tx : watch:: Sender < u64 > ,
670
671
should_aggregate : bool ,
672
+ // Store aggregators directly in the actor for access in Drop
673
+ aggregators : Arc < RwLock < HashMap < OutputTarget , Aggregator > > > ,
674
+ }
675
+
676
+ impl LogClientActor {
677
+ fn print_aggregators ( aggregators : & RwLock < HashMap < OutputTarget , Aggregator > > ) {
678
+ let mut aggregators_guard = aggregators. write ( ) . unwrap ( ) ;
679
+ for ( output_target, aggregator) in aggregators_guard. iter_mut ( ) {
680
+ if aggregator. is_empty ( ) {
681
+ continue ;
682
+ }
683
+ match output_target {
684
+ OutputTarget :: Stdout => {
685
+ println ! ( "{}" , aggregator) ;
686
+ }
687
+ OutputTarget :: Stderr => {
688
+ eprintln ! ( "{}" , aggregator) ;
689
+ }
690
+ }
691
+
692
+ // Reset the aggregator
693
+ aggregator. reset ( ) ;
694
+ }
695
+ }
671
696
}
672
697
673
698
#[ async_trait]
@@ -683,26 +708,42 @@ impl Actor for LogClientActor {
683
708
let ( aggregate_window_tx, aggregate_window_rx) =
684
709
watch:: channel ( DEFAULT_AGGREGATE_WINDOW_SEC ) ;
685
710
711
+ // Initialize aggregators
712
+ let mut aggregators = HashMap :: new ( ) ;
713
+ aggregators. insert ( OutputTarget :: Stderr , Aggregator :: new ( ) ) ;
714
+ aggregators. insert ( OutputTarget :: Stdout , Aggregator :: new ( ) ) ;
715
+ let aggregators = Arc :: new ( RwLock :: new ( aggregators) ) ;
716
+
717
+ // Clone aggregators for the aggregator task
718
+ let aggregators_for_task = Arc :: clone ( & aggregators) ;
719
+
686
720
// Start the loggregator
687
- let aggregator_handle =
688
- { tokio:: spawn ( async move { start_aggregator ( log_rx, aggregate_window_rx) . await } ) } ;
721
+ let aggregator_handle = tokio:: spawn ( async move {
722
+ start_aggregator ( log_rx, aggregate_window_rx, aggregators_for_task) . await
723
+ } ) ;
689
724
690
725
Ok ( Self {
691
726
log_tx,
692
727
aggregator_handle,
693
728
aggregate_window_tx,
694
- should_aggregate : false ,
729
+ should_aggregate : true ,
730
+ aggregators,
695
731
} )
696
732
}
697
733
}
698
734
735
+ impl Drop for LogClientActor {
736
+ fn drop ( & mut self ) {
737
+ // Flush the remaining logs before shutting down
738
+ Self :: print_aggregators ( & self . aggregators ) ;
739
+ }
740
+ }
741
+
699
742
async fn start_aggregator (
700
743
mut log_rx : mpsc:: Receiver < ( OutputTarget , String ) > ,
701
744
mut interval_sec_rx : watch:: Receiver < u64 > ,
745
+ aggregators : Arc < RwLock < HashMap < OutputTarget , Aggregator > > > ,
702
746
) -> anyhow:: Result < ( ) > {
703
- let mut aggregators = HashMap :: new ( ) ;
704
- aggregators. insert ( OutputTarget :: Stderr , Aggregator :: new ( ) ) ;
705
- aggregators. insert ( OutputTarget :: Stdout , Aggregator :: new ( ) ) ;
706
747
let mut interval =
707
748
tokio:: time:: interval ( tokio:: time:: Duration :: from_secs ( * interval_sec_rx. borrow ( ) ) ) ;
708
749
@@ -711,7 +752,8 @@ async fn start_aggregator(
711
752
tokio:: select! {
712
753
// Process incoming log messages
713
754
Some ( ( output_target, log_line) ) = log_rx. recv( ) => {
714
- if let Some ( aggregator) = aggregators. get_mut( & output_target) {
755
+ let mut aggregators_guard = aggregators. write( ) . unwrap( ) ;
756
+ if let Some ( aggregator) = aggregators_guard. get_mut( & output_target) {
715
757
if let Err ( e) = aggregator. add_line( & log_line) {
716
758
tracing:: error!( "error adding log line: {}" , e) ;
717
759
}
@@ -726,24 +768,14 @@ async fn start_aggregator(
726
768
727
769
// Every interval tick, print and reset the aggregator
728
770
_ = interval. tick( ) => {
729
- for ( output_target, aggregator) in aggregators. iter_mut( ) {
730
- if aggregator. is_empty( ) {
731
- continue ;
732
- }
733
- if output_target == & OutputTarget :: Stdout {
734
- println!( "{}" , aggregator) ;
735
- } else {
736
- eprintln!( "{}" , aggregator) ;
737
- }
738
-
739
- // Reset the aggregator
740
- aggregator. reset( ) ;
741
- }
771
+ LogClientActor :: print_aggregators( & aggregators) ;
742
772
}
743
773
744
774
// Exit if the channel is closed
745
775
else => {
746
776
tracing:: error!( "log channel closed, exiting aggregator" ) ;
777
+ // Print final aggregated logs before shutting down
778
+ LogClientActor :: print_aggregators( & aggregators) ;
747
779
break ;
748
780
}
749
781
}
0 commit comments