diff --git a/dali/core/exec/tasking/scheduler.cc b/dali/core/exec/tasking/scheduler.cc index 2c225d0e22..de12a3adae 100644 --- a/dali/core/exec/tasking/scheduler.cc +++ b/dali/core/exec/tasking/scheduler.cc @@ -38,7 +38,11 @@ bool Scheduler::AcquireAllAndMoveToReady(SharedTask &task) noexcept { task->preconditions_.clear(); task->state_ = TaskState::Ready; pending_.Remove(task); - ready_.push(std::move(task)); + { + std::lock_guard lock(queue_lock_); + ready_.push(std::move(task)); + } + queue_sem_.release(); return true; } @@ -46,7 +50,6 @@ void Scheduler::Notify(Waitable *w) { bool is_completion_event = dynamic_cast(w) != nullptr; bool is_task = is_completion_event && dynamic_cast(w); - int new_ready = 0; { std::lock_guard g(mtx_); if (is_task) @@ -88,22 +91,19 @@ void Scheduler::Notify(Waitable *w) { if (task->Ready()) { pending_.Remove(task); task->state_ = TaskState::Ready; - ready_.push(std::move(task)); - new_ready++; + { + std::lock_guard lock(queue_lock_); + ready_.push(std::move(task)); + } + queue_sem_.release(); // OK, the task is ready, we're done with it continue; } } - if (AcquireAllAndMoveToReady(task)) - new_ready++; + AcquireAllAndMoveToReady(task); } } - - if (new_ready == 1) - this->task_ready_.notify_one(); - else if (new_ready > 1) - this->task_ready_.notify_all(); } } // namespace dali::tasking diff --git a/include/dali/core/exec/tasking/executor.h b/include/dali/core/exec/tasking/executor.h index 4079840a49..d113dea760 100644 --- a/include/dali/core/exec/tasking/executor.h +++ b/include/dali/core/exec/tasking/executor.h @@ -70,6 +70,10 @@ class Executor : public Scheduler { } private: + int NumThreads() const override { + return num_threads_; + } + bool started_ = false; void RunWorker() { diff --git a/include/dali/core/exec/tasking/scheduler.h b/include/dali/core/exec/tasking/scheduler.h index c81d7f9b1d..cbe6b46f94 100644 --- a/include/dali/core/exec/tasking/scheduler.h +++ b/include/dali/core/exec/tasking/scheduler.h @@ -23,6 +23,8 @@ #include "dali/core/api_helper.h" #include "dali/core/exec/tasking/task.h" #include "dali/core/exec/tasking/sync.h" +#include "dali/core/semaphore.h" +#include "dali/core/spinlock.h" namespace dali::tasking { @@ -178,12 +180,14 @@ class Scheduler { }; public: + virtual ~Scheduler() = default; + /** Removes a ready task with the highest priorty or waits for one to appear or * for a shutdown notification. */ SharedTask Pop() { - std::unique_lock lock(mtx_); - task_ready_.wait(lock, [&]() { return !ready_.empty() || shutdown_requested_; }); + queue_sem_.acquire(); + std::lock_guard lock(queue_lock_); if (ready_.empty()) { assert(shutdown_requested_); return nullptr; @@ -234,9 +238,10 @@ class Scheduler { /** Makes all Pop functions return with an error value. */ void Shutdown() { std::lock_guard g(mtx_); + std::lock_guard lock(queue_lock_); shutdown_requested_ = true; - task_ready_.notify_all(); task_done_.notify_all(); + queue_sem_.release(NumThreads()); } /** Checks whether a shutdown was requested. */ @@ -245,10 +250,12 @@ class Scheduler { } private: + virtual int NumThreads() const { return 1; } + /** Moves the task to the ready queue if all of its preconditions can be acquired. * * This function atomically checks that all preconditions can be met and if so, acquires them. - * If the preconditions where met, the task is moved from the pending list to the ready queue. + * If the preconditions were met, the task is moved from the pending list to the ready queue. */ bool DLL_PUBLIC AcquireAllAndMoveToReady(SharedTask &task) noexcept; @@ -258,11 +265,11 @@ class Scheduler { if (task->Ready()) { // if the task has no preconditions... { // ...then we add it directly to the ready queue. - std::lock_guard lock(mtx_); + std::lock_guard lock(queue_lock_); task->state_ = TaskState::Ready; ready_.push(task); } - task_ready_.notify_one(); + queue_sem_.release(); } else { // Otherwise, the task is added to the pending list bool ready = false; @@ -276,17 +283,17 @@ class Scheduler { } pending_.PushFront(task); // ...and we check whether its preconditions are, in fact, met. - ready = AcquireAllAndMoveToReady(task); + AcquireAllAndMoveToReady(task); } - if (ready) - task_ready_.notify_one(); } } friend class Task; + counting_semaphore queue_sem_{0}; + spinlock queue_lock_; std::mutex mtx_; - std::condition_variable task_ready_, task_done_; + std::condition_variable task_done_; detail::TaskList pending_; std::priority_queue, TaskPriorityLess> ready_;