@@ -37,7 +37,6 @@ namespace detail {
3737struct LIBASYNC_CACHELINE_ALIGN thread_data_t {
3838 work_steal_queue queue;
3939 std::minstd_rand rng;
40- task_wait_event event;
4140 std::thread handle;
4241};
4342
@@ -180,11 +179,15 @@ static void thread_task_loop(threadpool_data* impl, std::size_t thread_id, task_
180179 // Flag indicating if we have added a continuation to the task
181180 bool added_continuation = false ;
182181
182+ // Event to wait on
183+ task_wait_event event;
184+
183185 // Loop while waiting for the task to complete
184186 while (true ) {
185187 // Check if the task has finished. If we have added a continuation, we
186- // need to make sure the event has been signaled.
187- if (wait_task && (added_continuation ? current_thread.event .try_wait (wait_type::task_finished) : wait_task.ready ()))
188+ // need to make sure the event has been signaled, otherwise the other
189+ // thread may try to signal it after we have freed it.
190+ if (wait_task && (added_continuation ? event.try_wait (wait_type::task_finished) : wait_task.ready ()))
188191 return ;
189192
190193 // Try to get a task from the local queue
@@ -220,11 +223,13 @@ static void thread_task_loop(threadpool_data* impl, std::size_t thread_id, task_
220223 return ;
221224 }
222225
226+ // Initialize the event object
227+ event.init ();
228+
223229 // No tasks found, so sleep until something happens.
224230 // If a continuation has not been added yet, add it.
225231 if (wait_task && !added_continuation) {
226232 // Create a continuation for the task we are waiting for
227- task_wait_event& event = current_thread.event ;
228233 wait_task.on_finish ([&event] {
229234 // Signal the thread's event
230235 event.signal (wait_type::task_finished);
@@ -234,27 +239,29 @@ static void thread_task_loop(threadpool_data* impl, std::size_t thread_id, task_
234239
235240 // Add our thread to the list of waiting threads
236241 size_t num_waiters_val = impl->num_waiters .load (std::memory_order_relaxed);
237- impl->waiters [num_waiters_val] = ¤t_thread. event ;
242+ impl->waiters [num_waiters_val] = &event;
238243 impl->num_waiters .store (num_waiters_val + 1 , std::memory_order_relaxed);
239244
240245 // Wait for our event to be signaled when a task is scheduled or
241246 // the task we are waiting for has completed.
242247 locked.unlock ();
243- int events = current_thread. event .wait ();
248+ int events = event.wait ();
244249 locked.lock ();
245250
246251 // Remove our thread from the list of waiting threads
247252 num_waiters_val = impl->num_waiters .load (std::memory_order_relaxed);
248253 for (std::size_t i = 0 ; i < num_waiters_val; i++) {
249- if (impl->waiters [i] == ¤t_thread. event ) {
254+ if (impl->waiters [i] == &event) {
250255 if (i != num_waiters_val - 1 )
251256 std::swap (impl->waiters [i], impl->waiters [num_waiters_val - 1 ]);
252257 impl->num_waiters .store (num_waiters_val - 1 , std::memory_order_relaxed);
253258 break ;
254259 }
255260 }
256261
257- // Check again if the task has finished
262+ // Check again if the task has finished. We have added a
263+ // continuation at this point, so we need to check that the
264+ // continuation has finished signaling the event.
258265 if (wait_task && (events & wait_type::task_finished))
259266 return ;
260267 }
0 commit comments