Skip to content

Commit ac7be11

Browse files
committed
Create images per blocks
Signed-off-by: Joaquin Anton Guirao <[email protected]>
1 parent a9022b7 commit ac7be11

File tree

2 files changed

+25
-15
lines changed

2 files changed

+25
-15
lines changed

dali/operators/imgcodec/image_decoder.h

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -765,6 +765,9 @@ class ImageDecoder : public StatelessOperator<Backend> {
765765

766766
// The image descriptors are created in parallel, in block-wise fashion.
767767
auto init_desc_task = [&](int start_sample, int end_sample) {
768+
DomainTimeRange tr(
769+
"Create images " + std::to_string(start_sample) + ".." + std::to_string(end_sample),
770+
DomainTimeRange::kOrange);
768771
for (int orig_idx = start_sample; orig_idx < end_sample; orig_idx++) {
769772
auto &st = *state_[orig_idx];
770773
if (use_cache && st.load_from_cache) {
@@ -799,19 +802,24 @@ class ImageDecoder : public StatelessOperator<Backend> {
799802
} else {
800803
DomainTimeRange tr("Create images", DomainTimeRange::kOrange);
801804
// Many tasks? Run in thread pool.
802-
// The first span of tasks is processed in the main operator thread.
803-
for (int i = 1; i < ntasks; i++) {
804-
int start = i * nsamples / ntasks;
805-
int end = (i + 1) * nsamples / ntasks;
806-
tp_->AddWork([&, start, end](int) {
805+
int block_idx = 0;
806+
atomic_idx_.store(0);
807+
auto create_images_task = [&, nblocks](int tid) {
808+
int block_idx;
809+
while ((block_idx = atomic_idx_.fetch_add(1)) < nblocks) {
810+
int64_t start = nsamples * block_idx / nblocks;
811+
int64_t end = nsamples * (block_idx + 1) / nblocks;
807812
init_desc_task(start, end);
808-
});
813+
}
814+
};
815+
816+
for (int task_idx = 0; task_idx < ntasks - 1; task_idx++) {
817+
tp_->AddWork(create_images_task, -task_idx);
809818
}
810-
// Start processing of subsequent segments...
811-
tp_->RunAll(false);
812-
// ...and process the 1st segment in this thread.
813-
init_desc_task(0, nsamples / ntasks);
814-
tp_->WaitForWork();
819+
assert(ntasks >= 2);
820+
tp_->RunAll(false); // start work but not wait
821+
create_images_task(-1);
822+
tp_->WaitForWork(); // wait for the other threads
815823
}
816824

817825
bool any_need_processing = false;

dali/pipeline/util/thread_pool.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ ThreadPool::~ThreadPool() {
6060

6161
void ThreadPool::AddWork(Work work, int64_t priority, bool start_immediately) {
6262
bool started_before = started_;
63-
outstanding_work_.fetch_add(1);
63+
outstanding_work_.fetch_add(1, std::memory_order_relaxed);
6464
if (started_before) {
6565
std::lock_guard lock(queue_lock_);
6666
work_queue_.push({priority, std::move(work)});
@@ -81,10 +81,12 @@ void ThreadPool::AddWork(Work work, int64_t priority, bool start_immediately) {
8181

8282
// Blocks until all work issued to the thread pool is complete
8383
void ThreadPool::WaitForWork(bool checkForErrors) {
84-
if (outstanding_work_.load()) {
84+
// Wait for outstanding work to complete
85+
if (outstanding_work_.load(std::memory_order_relaxed) > 0) {
86+
// If still not complete, use condition variable
8587
std::unique_lock lock(completed_mutex_);
8688
completed_.wait(lock, [&, this] {
87-
return this->outstanding_work_ == 0;
89+
return this->outstanding_work_.load(std::memory_order_relaxed) == 0;
8890
});
8991
}
9092
started_ = false;
@@ -184,7 +186,7 @@ void ThreadPool::ThreadMain(int thread_id, int device_id, bool set_affinity,
184186
// If it reaches zero, we must safely notify the potential threads waiting for the work
185187
// to complete.
186188
// NOTE: We don't have to acquire the mutex until the number of waiting threads reaches 0.
187-
if (--outstanding_work_ == 0) {
189+
if (outstanding_work_.fetch_sub(1, std::memory_order_relaxed) == 1) {
188190
// We don't need to guard the modification of the atomic value with a mutex -
189191
// however, we need to lock it briefly to make sure we don't have this scenario:
190192
//

0 commit comments

Comments
 (0)