Skip to content

Commit 08c3fc4

Browse files
james7132taiki-e
andauthored
Don't explicitly panic or block when avoidable (#147)
* Don't explicitly panic when avoidable * Update src/lib.rs Co-authored-by: Taiki Endo <[email protected]> * Do the same for static executors --------- Co-authored-by: Taiki Endo <[email protected]>
1 parent 58411d6 commit 08c3fc4

File tree

2 files changed

+52
-32
lines changed

2 files changed

+52
-32
lines changed

src/lib.rs

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use std::panic::{RefUnwindSafe, UnwindSafe};
4646
use std::pin::Pin;
4747
use std::rc::Rc;
4848
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
49-
use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError};
49+
use std::sync::{Arc, Mutex, MutexGuard, PoisonError, RwLock, TryLockError};
5050
use std::task::{Context, Poll, Waker};
5151

5252
use async_task::{Builder, Runnable};
@@ -350,7 +350,8 @@ impl<'a> Executor<'a> {
350350

351351
// TODO: If possible, push into the current local queue and notify the ticker.
352352
move |runnable| {
353-
state.queue.push(runnable).unwrap();
353+
let result = state.queue.push(runnable);
354+
debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
354355
state.notify();
355356
}
356357
}
@@ -696,7 +697,7 @@ impl State {
696697

697698
/// Returns a reference to currently active tasks.
698699
fn active(&self) -> MutexGuard<'_, Slab<Waker>> {
699-
self.active.lock().unwrap_or_else(|e| e.into_inner())
700+
self.active.lock().unwrap_or_else(PoisonError::into_inner)
700701
}
701702

702703
/// Notifies a sleeping ticker.
@@ -707,7 +708,11 @@ impl State {
707708
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
708709
.is_ok()
709710
{
710-
let waker = self.sleepers.lock().unwrap().notify();
711+
let waker = self
712+
.sleepers
713+
.lock()
714+
.unwrap_or_else(PoisonError::into_inner)
715+
.notify();
711716
if let Some(w) = waker {
712717
w.wake();
713718
}
@@ -852,7 +857,11 @@ impl Ticker<'_> {
852857
///
853858
/// Returns `false` if the ticker was already sleeping and unnotified.
854859
fn sleep(&mut self, waker: &Waker) -> bool {
855-
let mut sleepers = self.state.sleepers.lock().unwrap();
860+
let mut sleepers = self
861+
.state
862+
.sleepers
863+
.lock()
864+
.unwrap_or_else(PoisonError::into_inner);
856865

857866
match self.sleeping {
858867
// Move to sleeping state.
@@ -878,7 +887,11 @@ impl Ticker<'_> {
878887
/// Moves the ticker into woken state.
879888
fn wake(&mut self) {
880889
if self.sleeping != 0 {
881-
let mut sleepers = self.state.sleepers.lock().unwrap();
890+
let mut sleepers = self
891+
.state
892+
.sleepers
893+
.lock()
894+
.unwrap_or_else(PoisonError::into_inner);
882895
sleepers.remove(self.sleeping);
883896

884897
self.state
@@ -926,7 +939,11 @@ impl Drop for Ticker<'_> {
926939
fn drop(&mut self) {
927940
// If this ticker is in sleeping state, it must be removed from the sleepers list.
928941
if self.sleeping != 0 {
929-
let mut sleepers = self.state.sleepers.lock().unwrap();
942+
let mut sleepers = self
943+
.state
944+
.sleepers
945+
.lock()
946+
.unwrap_or_else(PoisonError::into_inner);
930947
let notified = sleepers.remove(self.sleeping);
931948

932949
self.state
@@ -971,7 +988,7 @@ impl Runner<'_> {
971988
state
972989
.local_queues
973990
.write()
974-
.unwrap()
991+
.unwrap_or_else(PoisonError::into_inner)
975992
.push(runner.local.clone());
976993
runner
977994
}
@@ -993,25 +1010,25 @@ impl Runner<'_> {
9931010
}
9941011

9951012
// Try stealing from other runners.
996-
let local_queues = self.state.local_queues.read().unwrap();
997-
998-
// Pick a random starting point in the iterator list and rotate the list.
999-
let n = local_queues.len();
1000-
let start = rng.usize(..n);
1001-
let iter = local_queues
1002-
.iter()
1003-
.chain(local_queues.iter())
1004-
.skip(start)
1005-
.take(n);
1006-
1007-
// Remove this runner's local queue.
1008-
let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
1009-
1010-
// Try stealing from each local queue in the list.
1011-
for local in iter {
1012-
steal(local, &self.local);
1013-
if let Ok(r) = self.local.pop() {
1014-
return Some(r);
1013+
if let Ok(local_queues) = self.state.local_queues.try_read() {
1014+
// Pick a random starting point in the iterator list and rotate the list.
1015+
let n = local_queues.len();
1016+
let start = rng.usize(..n);
1017+
let iter = local_queues
1018+
.iter()
1019+
.chain(local_queues.iter())
1020+
.skip(start)
1021+
.take(n);
1022+
1023+
// Remove this runner's local queue.
1024+
let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
1025+
1026+
// Try stealing from each local queue in the list.
1027+
for local in iter {
1028+
steal(local, &self.local);
1029+
if let Ok(r) = self.local.pop() {
1030+
return Some(r);
1031+
}
10151032
}
10161033
}
10171034

@@ -1037,7 +1054,7 @@ impl Drop for Runner<'_> {
10371054
self.state
10381055
.local_queues
10391056
.write()
1040-
.unwrap()
1057+
.unwrap_or_else(PoisonError::into_inner)
10411058
.retain(|local| !Arc::ptr_eq(local, &self.local));
10421059

10431060
// Re-schedule remaining tasks in the local queue.

src/static_executors.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
future::Future,
88
marker::PhantomData,
99
panic::{RefUnwindSafe, UnwindSafe},
10+
sync::PoisonError,
1011
};
1112

1213
impl Executor<'static> {
@@ -42,7 +43,7 @@ impl Executor<'static> {
4243

4344
std::mem::forget(self);
4445

45-
let mut active = state.active.lock().unwrap();
46+
let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner);
4647
if !active.is_empty() {
4748
// Reschedule all of the active tasks.
4849
for waker in active.drain() {
@@ -92,7 +93,7 @@ impl LocalExecutor<'static> {
9293

9394
std::mem::forget(self);
9495

95-
let mut active = state.active.lock().unwrap();
96+
let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner);
9697
if !active.is_empty() {
9798
// Reschedule all of the active tasks.
9899
for waker in active.drain() {
@@ -283,7 +284,8 @@ impl StaticExecutor {
283284
let state: &'static State = &self.state;
284285
// TODO: If possible, push into the current local queue and notify the ticker.
285286
move |runnable| {
286-
state.queue.push(runnable).unwrap();
287+
let result = state.queue.push(runnable);
288+
debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
287289
state.notify();
288290
}
289291
}
@@ -468,7 +470,8 @@ impl StaticLocalExecutor {
468470
let state: &'static State = &self.state;
469471
// TODO: If possible, push into the current local queue and notify the ticker.
470472
move |runnable| {
471-
state.queue.push(runnable).unwrap();
473+
let result = state.queue.push(runnable);
474+
debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
472475
state.notify();
473476
}
474477
}

0 commit comments

Comments
 (0)