Skip to content

Commit 9832a6f

Browse files
committed
Combine setup into one task
Signed-off-by: Joaquin Anton Guirao <[email protected]>
1 parent ac7be11 commit 9832a6f

File tree

1 file changed

+70
-44
lines changed

1 file changed

+70
-44
lines changed

dali/operators/imgcodec/image_decoder.h

Lines changed: 70 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <vector>
2020
#include "dali/core/call_at_exit.h"
2121
#include "dali/core/mm/memory.h"
22+
#include "dali/core/semaphore.h"
2223
#include "dali/operators.h"
2324
#include "dali/operators/decoder/cache/cached_decoder_impl.h"
2425
#include "dali/operators/generic/slice/slice_attr.h"
@@ -677,7 +678,7 @@ class ImageDecoder : public StatelessOperator<Backend> {
677678
auto setup_block = [&](int block_idx, int nblocks, int tid) {
678679
int i_start = nsamples * block_idx / nblocks;
679680
int i_end = nsamples * (block_idx + 1) / nblocks;
680-
DomainTimeRange tr("Setup #" + std::to_string(block_idx) + "/" + std::to_string(nblocks),
681+
DomainTimeRange tr("Parse #" + std::to_string(block_idx) + "/" + std::to_string(nblocks),
681682
DomainTimeRange::kOrange);
682683
for (int i = i_start; i < i_end; i++) {
683684
auto *st = state_[i].get();
@@ -729,40 +730,6 @@ class ImageDecoder : public StatelessOperator<Backend> {
729730
}
730731
};
731732

732-
int nsamples_per_block = 16;
733-
int nblocks = std::max(1, nsamples / nsamples_per_block);
734-
int ntasks = std::min<int>(nblocks, std::min<int>(8, tp_->NumThreads() + 1));
735-
736-
if (ntasks < 2) {
737-
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
738-
setup_block(0, 1, -1); // run all in current thread
739-
} else {
740-
int block_idx = 0;
741-
atomic_idx_.store(0);
742-
auto setup_task = [&, nblocks](int tid) {
743-
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
744-
int block_idx;
745-
while ((block_idx = atomic_idx_.fetch_add(1)) < nblocks) {
746-
setup_block(block_idx, nblocks, tid);
747-
}
748-
};
749-
750-
for (int task_idx = 0; task_idx < ntasks - 1; task_idx++) {
751-
tp_->AddWork(setup_task, -task_idx);
752-
}
753-
assert(ntasks >= 2);
754-
tp_->RunAll(false); // start work but not wait
755-
setup_task(-1); // last task in current thread
756-
tp_->WaitForWork(); // wait for the other threads
757-
}
758-
759-
// Allocate the memory for the outputs...
760-
{
761-
DomainTimeRange tr("Alloc output", DomainTimeRange::kOrange);
762-
output.Resize(out_shape);
763-
}
764-
// ... and create image descriptors.
765-
766733
// The image descriptors are created in parallel, in block-wise fashion.
767734
auto init_desc_task = [&](int start_sample, int end_sample) {
768735
DomainTimeRange tr(
@@ -795,30 +762,55 @@ class ImageDecoder : public StatelessOperator<Backend> {
795762
}
796763
};
797764

798-
// Just one task? Run it in this thread!
765+
int nsamples_per_block = 16;
766+
int nblocks = std::max(1, nsamples / nsamples_per_block);
767+
int ntasks = std::min<int>(nblocks, std::min<int>(8, tp_->NumThreads() + 1));
768+
799769
if (ntasks < 2) {
800-
DomainTimeRange tr("Create images", DomainTimeRange::kOrange);
770+
// run all in current thread
771+
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
772+
setup_block(0, 1, -1);
773+
{
774+
DomainTimeRange tr("Alloc output", DomainTimeRange::kOrange);
775+
output.Resize(out_shape);
776+
}
801777
init_desc_task(0, nsamples);
802778
} else {
803-
DomainTimeRange tr("Create images", DomainTimeRange::kOrange);
804-
// Many tasks? Run in thread pool.
779+
// run in parallel
805780
int block_idx = 0;
806-
atomic_idx_.store(0);
807-
auto create_images_task = [&, nblocks](int tid) {
781+
// relaxed, only need atomicity, not ordering
782+
atomic_idx_.store(0, std::memory_order_relaxed);
783+
parse_barrier_.Reset(ntasks);
784+
alloc_output_barrier_.Reset(ntasks);
785+
auto setup_task = [&, nblocks](int tid) {
808786
int block_idx;
809-
while ((block_idx = atomic_idx_.fetch_add(1)) < nblocks) {
787+
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
788+
while ((block_idx = atomic_idx_.fetch_add(1, std::memory_order_relaxed)) < nblocks) {
789+
setup_block(block_idx, nblocks, tid);
790+
}
791+
parse_barrier_.Wait(); // wait until parsing is done
792+
793+
if (tid == -1) {
794+
DomainTimeRange tr("Alloc output", DomainTimeRange::kOrange);
795+
output.Resize(out_shape);
796+
atomic_idx_.store(0, std::memory_order_relaxed);
797+
}
798+
799+
alloc_output_barrier_.Wait(); // wait until allocation is done
800+
// Create image descriptors
801+
while ((block_idx = atomic_idx_.fetch_add(1, std::memory_order_relaxed)) < nblocks) {
810802
int64_t start = nsamples * block_idx / nblocks;
811803
int64_t end = nsamples * (block_idx + 1) / nblocks;
812804
init_desc_task(start, end);
813805
}
814806
};
815807

816808
for (int task_idx = 0; task_idx < ntasks - 1; task_idx++) {
817-
tp_->AddWork(create_images_task, -task_idx);
809+
tp_->AddWork(setup_task, -task_idx);
818810
}
819811
assert(ntasks >= 2);
820812
tp_->RunAll(false); // start work but not wait
821-
create_images_task(-1);
813+
setup_task(-1); // last task in current thread
822814
tp_->WaitForWork(); // wait for the other threads
823815
}
824816

@@ -985,6 +977,40 @@ class ImageDecoder : public StatelessOperator<Backend> {
985977
std::vector<nvimgcodecExtension_t> extensions_;
986978

987979
std::vector<std::function<void(int)>> nvimgcodec_scheduled_tasks_;
980+
981+
class ThreadBarrier {
982+
public:
983+
explicit ThreadBarrier(std::size_t count) : count_(count), current_(count) {}
984+
void Wait(bool reset = false) {
985+
std::unique_lock<std::mutex> lock(mutex_);
986+
if (current_ == 0) {
987+
throw std::logic_error("barrier is already completed");
988+
}
989+
current_--;
990+
if (current_ == 0 || count_ == 0) {
991+
if (reset)
992+
current_ = count_;
993+
cv_.notify_all();
994+
} else {
995+
cv_.wait(lock, [this] {
996+
return current_ == 0; });
997+
}
998+
}
999+
void Reset(std::size_t count) {
1000+
std::lock_guard<std::mutex> lock(mutex_);
1001+
count_ = count;
1002+
current_ = count;
1003+
}
1004+
1005+
private:
1006+
std::mutex mutex_;
1007+
std::condition_variable cv_;
1008+
size_t count_;
1009+
size_t current_;
1010+
};
1011+
1012+
ThreadBarrier parse_barrier_{1};
1013+
ThreadBarrier alloc_output_barrier_{1};
9881014
};
9891015

9901016
} // namespace imgcodec

0 commit comments

Comments
 (0)