@@ -46,7 +46,7 @@ use std::panic::{RefUnwindSafe, UnwindSafe};
4646use std:: pin:: Pin ;
4747use std:: rc:: Rc ;
4848use std:: sync:: atomic:: { AtomicBool , AtomicPtr , Ordering } ;
49- use std:: sync:: { Arc , Mutex , MutexGuard , RwLock , TryLockError } ;
49+ use std:: sync:: { Arc , Mutex , MutexGuard , PoisonError , RwLock , TryLockError } ;
5050use std:: task:: { Context , Poll , Waker } ;
5151
5252use async_task:: { Builder , Runnable } ;
@@ -350,7 +350,8 @@ impl<'a> Executor<'a> {
350350
351351 // TODO: If possible, push into the current local queue and notify the ticker.
352352 move |runnable| {
353- state. queue . push ( runnable) . unwrap ( ) ;
353+ let result = state. queue . push ( runnable) ;
354+ debug_assert ! ( result. is_ok( ) ) ;
354355 state. notify ( ) ;
355356 }
356357 }
@@ -696,7 +697,7 @@ impl State {
696697
697698 /// Returns a reference to currently active tasks.
698699 fn active ( & self ) -> MutexGuard < ' _ , Slab < Waker > > {
699- self . active . lock ( ) . unwrap_or_else ( |e| e . into_inner ( ) )
700+ self . active . lock ( ) . unwrap_or_else ( PoisonError :: into_inner)
700701 }
701702
702703 /// Notifies a sleeping ticker.
@@ -707,7 +708,11 @@ impl State {
707708 . compare_exchange ( false , true , Ordering :: AcqRel , Ordering :: Acquire )
708709 . is_ok ( )
709710 {
710- let waker = self . sleepers . lock ( ) . unwrap ( ) . notify ( ) ;
711+ let waker = self
712+ . sleepers
713+ . lock ( )
714+ . unwrap_or_else ( PoisonError :: into_inner)
715+ . notify ( ) ;
711716 if let Some ( w) = waker {
712717 w. wake ( ) ;
713718 }
@@ -852,7 +857,11 @@ impl Ticker<'_> {
852857 ///
853858 /// Returns `false` if the ticker was already sleeping and unnotified.
854859 fn sleep ( & mut self , waker : & Waker ) -> bool {
855- let mut sleepers = self . state . sleepers . lock ( ) . unwrap ( ) ;
860+ let mut sleepers = self
861+ . state
862+ . sleepers
863+ . lock ( )
864+ . unwrap_or_else ( PoisonError :: into_inner) ;
856865
857866 match self . sleeping {
858867 // Move to sleeping state.
@@ -878,7 +887,11 @@ impl Ticker<'_> {
878887 /// Moves the ticker into woken state.
879888 fn wake ( & mut self ) {
880889 if self . sleeping != 0 {
881- let mut sleepers = self . state . sleepers . lock ( ) . unwrap ( ) ;
890+ let mut sleepers = self
891+ . state
892+ . sleepers
893+ . lock ( )
894+ . unwrap_or_else ( PoisonError :: into_inner) ;
882895 sleepers. remove ( self . sleeping ) ;
883896
884897 self . state
@@ -926,7 +939,11 @@ impl Drop for Ticker<'_> {
926939 fn drop ( & mut self ) {
927940 // If this ticker is in sleeping state, it must be removed from the sleepers list.
928941 if self . sleeping != 0 {
929- let mut sleepers = self . state . sleepers . lock ( ) . unwrap ( ) ;
942+ let mut sleepers = self
943+ . state
944+ . sleepers
945+ . lock ( )
946+ . unwrap_or_else ( PoisonError :: into_inner) ;
930947 let notified = sleepers. remove ( self . sleeping ) ;
931948
932949 self . state
@@ -971,7 +988,7 @@ impl Runner<'_> {
971988 state
972989 . local_queues
973990 . write ( )
974- . unwrap ( )
991+ . unwrap_or_else ( PoisonError :: into_inner )
975992 . push ( runner. local . clone ( ) ) ;
976993 runner
977994 }
@@ -993,25 +1010,25 @@ impl Runner<'_> {
9931010 }
9941011
9951012 // Try stealing from other runners.
996- let local_queues = self . state . local_queues . read ( ) . unwrap ( ) ;
997-
998- // Pick a random starting point in the iterator list and rotate the list.
999- let n = local_queues . len ( ) ;
1000- let start = rng . usize ( ..n ) ;
1001- let iter = local_queues
1002- . iter ( )
1003- . chain ( local_queues . iter ( ) )
1004- . skip ( start )
1005- . take ( n ) ;
1006-
1007- // Remove this runner's local queue.
1008- let iter = iter . filter ( |local| ! Arc :: ptr_eq ( local , & self . local ) ) ;
1009-
1010- // Try stealing from each local queue in the list.
1011- for local in iter {
1012- steal ( local , & self . local ) ;
1013- if let Ok ( r ) = self . local . pop ( ) {
1014- return Some ( r ) ;
1013+ if let Ok ( local_queues) = self . state . local_queues . try_read ( ) {
1014+ // Pick a random starting point in the iterator list and rotate the list.
1015+ let n = local_queues . len ( ) ;
1016+ let start = rng . usize ( ..n ) ;
1017+ let iter = local_queues
1018+ . iter ( )
1019+ . chain ( local_queues . iter ( ) )
1020+ . skip ( start )
1021+ . take ( n ) ;
1022+
1023+ // Remove this runner's local queue.
1024+ let iter = iter . filter ( | local| ! Arc :: ptr_eq ( local , & self . local ) ) ;
1025+
1026+ // Try stealing from each local queue in the list.
1027+ for local in iter {
1028+ steal ( local , & self . local ) ;
1029+ if let Ok ( r ) = self . local . pop ( ) {
1030+ return Some ( r ) ;
1031+ }
10151032 }
10161033 }
10171034
@@ -1037,7 +1054,7 @@ impl Drop for Runner<'_> {
10371054 self . state
10381055 . local_queues
10391056 . write ( )
1040- . unwrap ( )
1057+ . unwrap_or_else ( PoisonError :: into_inner )
10411058 . retain ( |local| !Arc :: ptr_eq ( local, & self . local ) ) ;
10421059
10431060 // Re-schedule remaining tasks in the local queue.
0 commit comments