Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions dali/core/exec/tasking/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@ bool Scheduler::AcquireAllAndMoveToReady(SharedTask &task) noexcept {
task->preconditions_.clear();
task->state_ = TaskState::Ready;
pending_.Remove(task);
ready_.push(std::move(task));
{
std::lock_guard lock(queue_lock_);
ready_.push(std::move(task));
}
queue_sem_.release();
return true;
}

void Scheduler::Notify(Waitable *w) {
bool is_completion_event = dynamic_cast<CompletionEvent *>(w) != nullptr;
bool is_task = is_completion_event && dynamic_cast<Task *>(w);

int new_ready = 0;
{
std::lock_guard g(mtx_);
if (is_task)
Expand Down Expand Up @@ -88,22 +91,19 @@ void Scheduler::Notify(Waitable *w) {
if (task->Ready()) {
pending_.Remove(task);
task->state_ = TaskState::Ready;
ready_.push(std::move(task));
new_ready++;
{
std::lock_guard lock(queue_lock_);
ready_.push(std::move(task));
}
queue_sem_.release();
// OK, the task is ready, we're done with it
continue;
}
}

if (AcquireAllAndMoveToReady(task))
new_ready++;
AcquireAllAndMoveToReady(task);
}
}

if (new_ready == 1)
this->task_ready_.notify_one();
else if (new_ready > 1)
this->task_ready_.notify_all();
}

} // namespace dali::tasking
4 changes: 4 additions & 0 deletions include/dali/core/exec/tasking/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class Executor : public Scheduler {
}

private:
int NumThreads() const override {
return num_threads_;
}

bool started_ = false;

void RunWorker() {
Expand Down
27 changes: 17 additions & 10 deletions include/dali/core/exec/tasking/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "dali/core/api_helper.h"
#include "dali/core/exec/tasking/task.h"
#include "dali/core/exec/tasking/sync.h"
#include "dali/core/semaphore.h"
#include "dali/core/spinlock.h"

namespace dali::tasking {

Expand Down Expand Up @@ -178,12 +180,14 @@ class Scheduler {
};

public:
virtual ~Scheduler() = default;

/** Removes a ready task with the highest priorty or waits for one to appear or
* for a shutdown notification.
*/
SharedTask Pop() {
std::unique_lock lock(mtx_);
task_ready_.wait(lock, [&]() { return !ready_.empty() || shutdown_requested_; });
queue_sem_.acquire();
std::lock_guard lock(queue_lock_);
if (ready_.empty()) {
assert(shutdown_requested_);
return nullptr;
Expand Down Expand Up @@ -234,9 +238,10 @@ class Scheduler {
/** Makes all Pop functions return with an error value. */
void Shutdown() {
std::lock_guard g(mtx_);
std::lock_guard lock(queue_lock_);
shutdown_requested_ = true;
task_ready_.notify_all();
task_done_.notify_all();
queue_sem_.release(NumThreads());
}

/** Checks whether a shutdown was requested. */
Expand All @@ -245,10 +250,12 @@ class Scheduler {
}

private:
virtual int NumThreads() const { return 1; }

/** Moves the task to the ready queue if all of its preconditions can be acquired.
*
* This function atomically checks that all preconditions can be met and if so, acquires them.
* If the preconditions where met, the task is moved from the pending list to the ready queue.
* If the preconditions were met, the task is moved from the pending list to the ready queue.
*/
bool DLL_PUBLIC AcquireAllAndMoveToReady(SharedTask &task) noexcept;

Expand All @@ -258,11 +265,11 @@ class Scheduler {
if (task->Ready()) { // if the task has no preconditions...
{
// ...then we add it directly to the ready queue.
std::lock_guard lock(mtx_);
std::lock_guard lock(queue_lock_);
task->state_ = TaskState::Ready;
ready_.push(task);
}
task_ready_.notify_one();
queue_sem_.release();
} else {
// Otherwise, the task is added to the pending list
bool ready = false;
Expand All @@ -276,17 +283,17 @@ class Scheduler {
}
pending_.PushFront(task);
// ...and we check whether its preconditions are, in fact, met.
ready = AcquireAllAndMoveToReady(task);
AcquireAllAndMoveToReady(task);
}
if (ready)
task_ready_.notify_one();
}
}

friend class Task;

counting_semaphore queue_sem_{0};
spinlock queue_lock_;
std::mutex mtx_;
std::condition_variable task_ready_, task_done_;
std::condition_variable task_done_;

detail::TaskList pending_;
std::priority_queue<SharedTask, std::vector<SharedTask>, TaskPriorityLess> ready_;
Expand Down
Loading