@@ -243,6 +243,8 @@ struct Actor {
243
243
handles : EntityManagerState < EmParams > ,
244
244
// temp tags
245
245
temp_tags : TempTags ,
246
+ // waiters for idle state.
247
+ idle_waiters : Vec < irpc:: channel:: oneshot:: Sender < ( ) > > ,
246
248
// our private tokio runtime. It has to live somewhere.
247
249
_rt : RtWrapper ,
248
250
}
@@ -456,6 +458,16 @@ impl Actor {
456
458
trace ! ( "{cmd:?}" ) ;
457
459
self . db ( ) . send ( cmd. into ( ) ) . await . ok ( ) ;
458
460
}
461
+ Command :: WaitIdle ( cmd) => {
462
+ trace ! ( "{cmd:?}" ) ;
463
+ if self . tasks . is_empty ( ) {
464
+ // we are currently idle
465
+ cmd. tx . send ( ( ) ) . await . ok ( ) ;
466
+ } else {
467
+ // wait for idle state
468
+ self . idle_waiters . push ( cmd. tx ) ;
469
+ }
470
+ }
459
471
Command :: Shutdown ( cmd) => {
460
472
trace ! ( "{cmd:?}" ) ;
461
473
self . db ( ) . send ( cmd. into ( ) ) . await . ok ( ) ;
@@ -599,6 +611,11 @@ impl Actor {
599
611
}
600
612
Some ( res) = self . tasks. join_next( ) , if !self . tasks. is_empty( ) => {
601
613
Self :: log_task_result( res) ;
614
+ if self . tasks. is_empty( ) {
615
+ for tx in self . idle_waiters. drain( ..) {
616
+ let _ = tx. send( ( ) ) ;
617
+ }
618
+ }
602
619
}
603
620
}
604
621
}
@@ -648,6 +665,7 @@ impl Actor {
648
665
tasks : JoinSet :: new ( ) ,
649
666
handles : EntityManagerState :: new ( slot_context, 1024 , 32 , 32 , 2 ) ,
650
667
temp_tags : Default :: default ( ) ,
668
+ idle_waiters : Vec :: new ( ) ,
651
669
_rt : rt,
652
670
} )
653
671
}
@@ -1969,6 +1987,7 @@ pub mod tests {
1969
1987
println ! ( "dropping batch" ) ;
1970
1988
drop ( batch) ;
1971
1989
store. sync_db ( ) . await ?;
1990
+ store. wait_idle ( ) . await ?;
1972
1991
println ! ( "reading temp tags after batch drop" ) ;
1973
1992
let tts = store
1974
1993
. tags ( )
0 commit comments