@@ -692,6 +692,77 @@ fn mutex_in_block_in_place() {
692692 } )
693693}
694694
695+ // Tests that when a task is notified by another task and is placed in the LIFO
696+ // slot, and then the notifying task blocks the runtime, the notified task will
697+ // be stolen by another worker thread.
698+ //
699+ // Integration test for: https://github.com/tokio-rs/tokio/issues/4941
700+ #[ test]
701+ fn lifo_stealable ( ) {
702+ use std:: time:: Duration ;
703+
704+ let ( unblock_tx, unblock_rx) = tokio:: sync:: oneshot:: channel ( ) ;
705+ let ( task_started_tx, task_started_rx) = tokio:: sync:: oneshot:: channel ( ) ;
706+ let ( block_thread_tx, block_thread_rx) = mpsc:: channel :: < ( ) > ( ) ;
707+ let rt = runtime:: Builder :: new_multi_thread ( )
708+ // Make sure there are enough workers that one can be parked running the
709+ // I/O driver and another can be parked running the timer wheel and
710+ // there's still at least one worker free to steal the blocked task.
711+ . worker_threads ( 4 )
712+ . enable_time ( )
713+ . build ( )
714+ . unwrap ( ) ;
715+
716+ rt. block_on ( async {
717+ // Keep the runtime busy so that the workers that might steal the
718+ // blocked task don't all park themselves forever.
719+ let churn = tokio:: spawn ( async move {
720+ loop {
721+ tokio:: time:: sleep ( Duration :: from_millis ( 64 ) ) . await ;
722+ }
723+ } ) ;
724+
725+ let blocked_task_joined = tokio:: spawn ( async move {
726+ println ! ( "[LIFO] task started" ) ;
727+ task_started_tx. send ( ( ) ) . unwrap ( ) ;
728+ println ! ( "[LIFO] task waiting for wakeup..." ) ;
729+ unblock_rx. await . unwrap ( ) ;
730+ println ! ( "[LIFO] task running after wakeup" ) ;
731+ } ) ;
732+
733+ // Wait for the blocked task to have been polled once and have yielded
734+ // before we spawn the task that will notify it.
735+ task_started_rx. await . unwrap ( ) ;
736+ println ! ( "[main] LIFO slot task start acked!" ) ;
737+
738+ // Now, spawn a task that will notify the blocked task before going
739+ // blocking forever.
740+ tokio:: spawn ( async move {
741+ println ! ( "[blocker] sending wakeup" ) ;
742+ unblock_tx. send ( ( ) ) . unwrap ( ) ;
743+
744+ println ! ( "[blocker] blocking the worker thread..." ) ;
745+ // Block the worker thread indefinitely by waiting for a message on
746+ // a blocking channel. Using a channel rather than e.g. `loop {}`
747+ // allows us to terminate the task cleanly when the test finishes.
748+ let _ = block_thread_rx. recv ( ) ;
749+ println ! ( "[blocker] done" ) ;
750+ } ) ;
751+
752+ println ! ( "[main] blocker task spawned" ) ;
753+
754+ let result = tokio:: time:: timeout ( Duration :: from_secs ( 30 ) , blocked_task_joined) . await ;
755+ println ! ( "[main] result: {result:?}" ) ;
756+ // Before possibly panicking, make sure that we wake up the blocked task
757+ // so that it doesn't stop the runtime from shutting down.
758+ block_thread_tx. send ( ( ) ) . unwrap ( ) ;
759+ churn. abort ( ) ;
760+ result
761+ . expect ( "task in LIFO slot should complete within 30 seconds" )
762+ . expect ( "task in LIFO slot should not panic" ) ;
763+ } )
764+ }
765+
695766// Testing the tuning logic is tricky as it is inherently timing based, and more
696767// of a heuristic than an exact behavior. This test checks that the interval
697768// changes over time based on load factors. There are no assertions, completion
0 commit comments