@@ -106,17 +106,9 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
106106 } )
107107}
108108
109- fn poll_executor < T , F : FnMut ( & mut Context < ' _ > ) -> T > ( mut f : F ) -> T {
110- let _enter = enter ( ) . expect (
111- "cannot execute `LocalPool` executor from within \
112- another executor",
113- ) ;
114-
115- CURRENT_THREAD_NOTIFY . with ( |thread_notify| {
116- let waker = waker_ref ( thread_notify) ;
117- let mut cx = Context :: from_waker ( & waker) ;
118- f ( & mut cx)
119- } )
109+ /// Check for a wakeup, but don't consume it.
110+ fn woken ( ) -> bool {
111+ CURRENT_THREAD_NOTIFY . with ( |thread_notify| thread_notify. unparked . load ( Ordering :: SeqCst ) )
120112}
121113
122114impl LocalPool {
@@ -212,20 +204,26 @@ impl LocalPool {
212204 /// further use of one of the pool's run or poll methods.
213205 /// Though only one task will be completed, progress may be made on multiple tasks.
214206 pub fn try_run_one ( & mut self ) -> bool {
215- poll_executor ( |ctx | {
207+ run_executor ( |cx | {
216208 loop {
217- let ret = self . poll_pool_once ( ctx) ;
218-
219- // return if we have executed a future
220- if let Poll :: Ready ( Some ( _) ) = ret {
221- return true ;
209+ self . drain_incoming ( ) ;
210+
211+ match self . pool . poll_next_unpin ( cx) {
212+ // Success!
213+ Poll :: Ready ( Some ( ( ) ) ) => return Poll :: Ready ( true ) ,
214+ // The pool was empty.
215+ Poll :: Ready ( None ) => return Poll :: Ready ( false ) ,
216+ Poll :: Pending => ( ) ,
222217 }
223218
224- // if there are no new incoming futures
225- // then there is no feature that can make progress
226- // and we can return without having completed a single future
227- if self . incoming . borrow ( ) . is_empty ( ) {
228- return false ;
219+ if !self . incoming . borrow ( ) . is_empty ( ) {
220+ // New tasks were spawned; try again.
221+ continue ;
222+ } else if woken ( ) {
223+ // The pool yielded to us, but there's more progress to be made.
224+ return Poll :: Pending ;
225+ } else {
226+ return Poll :: Ready ( false ) ;
229227 }
230228 }
231229 } )
@@ -257,44 +255,52 @@ impl LocalPool {
257255 /// of the pool's run or poll methods. While the function is running, all tasks
258256 /// in the pool will try to make progress.
259257 pub fn run_until_stalled ( & mut self ) {
260- poll_executor ( |ctx| {
261- let _ = self . poll_pool ( ctx) ;
258+ run_executor ( |cx| match self . poll_pool ( cx) {
259+ // The pool is empty.
260+ Poll :: Ready ( ( ) ) => Poll :: Ready ( ( ) ) ,
261+ Poll :: Pending => {
262+ if woken ( ) {
263+ Poll :: Pending
264+ } else {
265+ // We're stalled for now.
266+ Poll :: Ready ( ( ) )
267+ }
268+ }
262269 } ) ;
263270 }
264271
265- // Make maximal progress on the entire pool of spawned task, returning `Ready`
266- // if the pool is empty and `Pending` if no further progress can be made.
272+ /// Poll `self.pool`, re-filling it with any newly-spawned tasks.
273+ /// Repeat until either the pool is empty, or it returns `Pending`.
274+ ///
275+ /// Returns `Ready` if the pool was empty, and `Pending` otherwise.
276+ ///
277+ /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
278+ /// mean that the pool can't make progress.
267279 fn poll_pool ( & mut self , cx : & mut Context < ' _ > ) -> Poll < ( ) > {
268- // state for the FuturesUnordered, which will never be used
269280 loop {
270- let ret = self . poll_pool_once ( cx ) ;
281+ self . drain_incoming ( ) ;
271282
272- // we queued up some new tasks; add them and poll again
283+ let pool_ret = self . pool . poll_next_unpin ( cx) ;
284+
285+ // We queued up some new tasks; add them and poll again.
273286 if !self . incoming . borrow ( ) . is_empty ( ) {
274287 continue ;
275288 }
276289
277- // no queued tasks; we may be done
278- match ret {
279- Poll :: Pending => return Poll :: Pending ,
290+ match pool_ret {
291+ Poll :: Ready ( Some ( ( ) ) ) => continue ,
280292 Poll :: Ready ( None ) => return Poll :: Ready ( ( ) ) ,
281- _ => { }
293+ Poll :: Pending => return Poll :: Pending ,
282294 }
283295 }
284296 }
285297
286- // Try make minimal progress on the pool of spawned tasks
287- fn poll_pool_once ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Option < ( ) > > {
288- // empty the incoming queue of newly-spawned tasks
289- {
290- let mut incoming = self . incoming . borrow_mut ( ) ;
291- for task in incoming. drain ( ..) {
292- self . pool . push ( task)
293- }
298+ /// Empty the incoming queue of newly-spawned tasks.
299+ fn drain_incoming ( & mut self ) {
300+ let mut incoming = self . incoming . borrow_mut ( ) ;
301+ for task in incoming. drain ( ..) {
302+ self . pool . push ( task)
294303 }
295-
296- // try to execute the next ready future
297- self . pool . poll_next_unpin ( cx)
298304 }
299305}
300306
0 commit comments