Skip to content

Commit 4f683fb

Browse files
committed
Use semaphore in tasking::Scheduler.
Signed-off-by: Michał Zientkiewicz <[email protected]>
1 parent 40e00c1 commit 4f683fb

File tree

3 files changed

+31
-21
lines changed

3 files changed

+31
-21
lines changed

dali/core/exec/tasking/scheduler.cc

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,18 @@ bool Scheduler::AcquireAllAndMoveToReady(SharedTask &task) noexcept {
3838
task->preconditions_.clear();
3939
task->state_ = TaskState::Ready;
4040
pending_.Remove(task);
41-
ready_.push(std::move(task));
41+
{
42+
std::lock_guard lock(queue_lock_);
43+
ready_.push(std::move(task));
44+
}
45+
queue_sem_.release();
4246
return true;
4347
}
4448

4549
void Scheduler::Notify(Waitable *w) {
4650
bool is_completion_event = dynamic_cast<CompletionEvent *>(w) != nullptr;
4751
bool is_task = is_completion_event && dynamic_cast<Task *>(w);
4852

49-
int new_ready = 0;
5053
{
5154
std::lock_guard g(mtx_);
5255
if (is_task)
@@ -88,22 +91,19 @@ void Scheduler::Notify(Waitable *w) {
8891
if (task->Ready()) {
8992
pending_.Remove(task);
9093
task->state_ = TaskState::Ready;
91-
ready_.push(std::move(task));
92-
new_ready++;
94+
{
95+
std::lock_guard lock(queue_lock_);
96+
ready_.push(std::move(task));
97+
}
98+
queue_sem_.release();
9399
// OK, the task is ready, we're done with it
94100
continue;
95101
}
96102
}
97103

98-
if (AcquireAllAndMoveToReady(task))
99-
new_ready++;
104+
AcquireAllAndMoveToReady(task);
100105
}
101106
}
102-
103-
if (new_ready == 1)
104-
this->task_ready_.notify_one();
105-
else if (new_ready > 1)
106-
this->task_ready_.notify_all();
107107
}
108108

109109
} // namespace dali::tasking

include/dali/core/exec/tasking/executor.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ class Executor : public Scheduler {
7070
}
7171

7272
private:
73+
int NumThreads() const override {
74+
return num_threads_;
75+
}
76+
7377
bool started_ = false;
7478

7579
void RunWorker() {

include/dali/core/exec/tasking/scheduler.h

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include "dali/core/api_helper.h"
2424
#include "dali/core/exec/tasking/task.h"
2525
#include "dali/core/exec/tasking/sync.h"
26+
#include "dali/core/semaphore.h"
27+
#include "dali/core/spinlock.h"
2628

2729
namespace dali::tasking {
2830

@@ -182,8 +184,8 @@ class Scheduler {
182184
* for a shutdown notification.
183185
*/
184186
SharedTask Pop() {
185-
std::unique_lock lock(mtx_);
186-
task_ready_.wait(lock, [&]() { return !ready_.empty() || shutdown_requested_; });
187+
queue_sem_.acquire();
188+
std::lock_guard lock(queue_lock_);
187189
if (ready_.empty()) {
188190
assert(shutdown_requested_);
189191
return nullptr;
@@ -234,9 +236,10 @@ class Scheduler {
234236
/** Makes all Pop functions return with an error value. */
235237
void Shutdown() {
236238
std::lock_guard g(mtx_);
239+
std::lock_guard lock(queue_lock_);
237240
shutdown_requested_ = true;
238-
task_ready_.notify_all();
239241
task_done_.notify_all();
242+
queue_sem_.release(NumThreads());
240243
}
241244

242245
/** Checks whether a shutdown was requested. */
@@ -245,10 +248,12 @@ class Scheduler {
245248
}
246249

247250
private:
251+
virtual int NumThreads() const { return 1; }
252+
248253
/** Moves the task to the ready queue if all of its preconditions can be acquired.
249254
*
250255
* This function atomically checks that all preconditions can be met and if so, acquires them.
251-
* If the preconditions where met, the task is moved from the pending list to the ready queue.
256+
* If the preconditions were met, the task is moved from the pending list to the ready queue.
252257
*/
253258
bool DLL_PUBLIC AcquireAllAndMoveToReady(SharedTask &task) noexcept;
254259

@@ -258,11 +263,11 @@ class Scheduler {
258263
if (task->Ready()) { // if the task has no preconditions...
259264
{
260265
// ...then we add it directly to the ready queue.
261-
std::lock_guard lock(mtx_);
266+
std::lock_guard lock(queue_lock_);
262267
task->state_ = TaskState::Ready;
263268
ready_.push(task);
264269
}
265-
task_ready_.notify_one();
270+
queue_sem_.release();
266271
} else {
267272
// Otherwise, the task is added to the pending list
268273
bool ready = false;
@@ -276,17 +281,18 @@ class Scheduler {
276281
}
277282
pending_.PushFront(task);
278283
// ...and we check whether its preconditions are, in fact, met.
279-
ready = AcquireAllAndMoveToReady(task);
284+
AcquireAllAndMoveToReady(task);
280285
}
281-
if (ready)
282-
task_ready_.notify_one();
286+
283287
}
284288
}
285289

286290
friend class Task;
287291

292+
counting_semaphore queue_sem_{0};
293+
spinlock queue_lock_;
288294
std::mutex mtx_;
289-
std::condition_variable task_ready_, task_done_;
295+
std::condition_variable task_done_;
290296

291297
detail::TaskList pending_;
292298
std::priority_queue<SharedTask, std::vector<SharedTask>, TaskPriorityLess> ready_;

0 commit comments

Comments
 (0)