Skip to content

Commit 0c7f54f

Browse files
committed
Combine setup into one task
Signed-off-by: Joaquin Anton Guirao <[email protected]>
1 parent ac7be11 commit 0c7f54f

File tree

1 file changed

+113
-55
lines changed

1 file changed

+113
-55
lines changed

dali/operators/imgcodec/image_decoder.h

Lines changed: 113 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "dali/pipeline/operator/common.h"
3333
#include "dali/pipeline/operator/operator.h"
3434

35+
3536
#if not(WITH_DYNAMIC_NVIMGCODEC_ENABLED)
3637
nvimgcodecStatus_t get_libjpeg_turbo_extension_desc(nvimgcodecExtensionDesc_t *ext_desc);
3738
nvimgcodecStatus_t get_libtiff_extension_desc(nvimgcodecExtensionDesc_t *ext_desc);
@@ -674,10 +675,10 @@ class ImageDecoder : public StatelessOperator<Backend> {
674675
TensorListShape<> out_shape(nsamples, 3);
675676

676677
const bool use_cache = cache_ && cache_->IsCacheEnabled() && dtype_ == DALI_UINT8;
677-
auto setup_block = [&](int block_idx, int nblocks, int tid) {
678+
auto setup_block = [&](int block_idx, int nblocks) {
678679
int i_start = nsamples * block_idx / nblocks;
679680
int i_end = nsamples * (block_idx + 1) / nblocks;
680-
DomainTimeRange tr("Setup #" + std::to_string(block_idx) + "/" + std::to_string(nblocks),
681+
DomainTimeRange tr("Parse #" + std::to_string(block_idx) + "/" + std::to_string(nblocks),
681682
DomainTimeRange::kOrange);
682683
for (int i = i_start; i < i_end; i++) {
683684
auto *st = state_[i].get();
@@ -729,45 +730,10 @@ class ImageDecoder : public StatelessOperator<Backend> {
729730
}
730731
};
731732

732-
int nsamples_per_block = 16;
733-
int nblocks = std::max(1, nsamples / nsamples_per_block);
734-
int ntasks = std::min<int>(nblocks, std::min<int>(8, tp_->NumThreads() + 1));
735-
736-
if (ntasks < 2) {
737-
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
738-
setup_block(0, 1, -1); // run all in current thread
739-
} else {
740-
int block_idx = 0;
741-
atomic_idx_.store(0);
742-
auto setup_task = [&, nblocks](int tid) {
743-
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
744-
int block_idx;
745-
while ((block_idx = atomic_idx_.fetch_add(1)) < nblocks) {
746-
setup_block(block_idx, nblocks, tid);
747-
}
748-
};
749-
750-
for (int task_idx = 0; task_idx < ntasks - 1; task_idx++) {
751-
tp_->AddWork(setup_task, -task_idx);
752-
}
753-
assert(ntasks >= 2);
754-
tp_->RunAll(false); // start work but not wait
755-
setup_task(-1); // last task in current thread
756-
tp_->WaitForWork(); // wait for the other threads
757-
}
758-
759-
// Allocate the memory for the outputs...
760-
{
761-
DomainTimeRange tr("Alloc output", DomainTimeRange::kOrange);
762-
output.Resize(out_shape);
763-
}
764-
// ... and create image descriptors.
765-
766733
// The image descriptors are created in parallel, in block-wise fashion.
767-
auto init_desc_task = [&](int start_sample, int end_sample) {
768-
DomainTimeRange tr(
769-
"Create images " + std::to_string(start_sample) + ".." + std::to_string(end_sample),
770-
DomainTimeRange::kOrange);
734+
auto init_desc_task = [&](int block_idx, int nblocks) {
735+
int start_sample = nsamples * block_idx / nblocks;
736+
int end_sample = nsamples * (block_idx + 1) / nblocks;
771737
for (int orig_idx = start_sample; orig_idx < end_sample; orig_idx++) {
772738
auto &st = *state_[orig_idx];
773739
if (use_cache && st.load_from_cache) {
@@ -795,31 +761,79 @@ class ImageDecoder : public StatelessOperator<Backend> {
795761
}
796762
};
797763

798-
// Just one task? Run it in this thread!
764+
int nsamples_per_block = 16;
765+
int nblocks = std::max(1, nsamples / nsamples_per_block);
766+
int ntasks = std::min<int>(nblocks, std::min<int>(8, tp_->NumThreads() + 1));
767+
799768
if (ntasks < 2) {
800-
DomainTimeRange tr("Create images", DomainTimeRange::kOrange);
801-
init_desc_task(0, nsamples);
769+
// run all in current thread
770+
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
771+
{
772+
DomainTimeRange tr("Parse", DomainTimeRange::kOrange);
773+
for (int block_idx = 0; block_idx < nblocks; block_idx++) {
774+
setup_block(block_idx, nblocks);
775+
}
776+
}
777+
{
778+
DomainTimeRange tr("Alloc output", DomainTimeRange::kOrange);
779+
output.Resize(out_shape);
780+
}
781+
{
782+
DomainTimeRange tr("Create images", DomainTimeRange::kOrange);
783+
for (int block_idx = 0; block_idx < nblocks; block_idx++) {
784+
init_desc_task(block_idx, nblocks);
785+
}
786+
}
802787
} else {
803-
DomainTimeRange tr("Create images", DomainTimeRange::kOrange);
804-
// Many tasks? Run in thread pool.
788+
// run in parallel
805789
int block_idx = 0;
806-
atomic_idx_.store(0);
807-
auto create_images_task = [&, nblocks](int tid) {
808-
int block_idx;
809-
while ((block_idx = atomic_idx_.fetch_add(1)) < nblocks) {
810-
int64_t start = nsamples * block_idx / nblocks;
811-
int64_t end = nsamples * (block_idx + 1) / nblocks;
812-
init_desc_task(start, end);
790+
// relaxed, only need atomicity, not ordering
791+
atomic_idx_.store(0, std::memory_order_relaxed);
792+
parse_barrier_.Reset(ntasks);
793+
alloc_output_barrier_.Reset(ntasks);
794+
create_images_barrier_.Reset(ntasks);
795+
auto setup_task = [&](int tid) {
796+
int sample_idx;
797+
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
798+
{
799+
DomainTimeRange tr("Parse", DomainTimeRange::kOrange);
800+
while ((block_idx = atomic_idx_.fetch_add(1, std::memory_order_relaxed)) < nblocks) {
801+
setup_block(block_idx, nblocks);
802+
}
803+
}
804+
parse_barrier_.ArriveAndWait(); // wait until parsing is done
805+
806+
if (tid == -1) {
807+
DomainTimeRange tr("Alloc output", DomainTimeRange::kOrange);
808+
output.Resize(out_shape);
809+
atomic_idx_.store(0, std::memory_order_relaxed);
810+
alloc_output_barrier_.Arrive(); // No need to wait here, we are in the main thread
811+
} else {
812+
alloc_output_barrier_.ArriveAndWait(); // wait until allocation is done
813+
}
814+
815+
// Create image descriptors
816+
{
817+
DomainTimeRange tr("Create images", DomainTimeRange::kOrange);
818+
while ((block_idx = atomic_idx_.fetch_add(1, std::memory_order_relaxed)) < nblocks) {
819+
init_desc_task(block_idx, nblocks);
820+
}
821+
}
822+
// the main thread needs to wait until creating images is done
823+
if (tid == -1) {
824+
create_images_barrier_.ArriveAndWait();
825+
} else {
826+
create_images_barrier_.Arrive();
813827
}
814828
};
815829

816830
for (int task_idx = 0; task_idx < ntasks - 1; task_idx++) {
817-
tp_->AddWork(create_images_task, -task_idx);
831+
tp_->AddWork(setup_task, -task_idx);
818832
}
819833
assert(ntasks >= 2);
820834
tp_->RunAll(false); // start work but not wait
821-
create_images_task(-1);
822-
tp_->WaitForWork(); // wait for the other threads
835+
setup_task(-1); // last task in current thread
836+
// tp_->WaitForWork(); // wait for the other threads
823837
}
824838

825839
bool any_need_processing = false;
@@ -985,6 +999,50 @@ class ImageDecoder : public StatelessOperator<Backend> {
985999
std::vector<nvimgcodecExtension_t> extensions_;
9861000

9871001
std::vector<std::function<void(int)>> nvimgcodec_scheduled_tasks_;
1002+
1003+
class ThreadBarrier {
1004+
public:
1005+
explicit ThreadBarrier(std::size_t count) : count_(count), current_(count) {}
1006+
void Arrive() {
1007+
std::unique_lock<std::mutex> lock(lock_);
1008+
if (current_ == 0) {
1009+
throw std::logic_error("barrier is already completed");
1010+
}
1011+
current_--;
1012+
if (current_ == 0) {
1013+
cv_.notify_all();
1014+
}
1015+
}
1016+
void ArriveAndWait(bool reset = false) {
1017+
std::unique_lock<std::mutex> lock(lock_);
1018+
if (current_ == 0) {
1019+
throw std::logic_error("barrier is already completed");
1020+
}
1021+
current_--;
1022+
if (current_ == 0 || count_ == 0) {
1023+
if (reset)
1024+
current_ = count_;
1025+
cv_.notify_all();
1026+
} else {
1027+
cv_.wait(lock, [this] { return current_ == 0; });
1028+
}
1029+
}
1030+
void Reset(std::size_t count) {
1031+
std::lock_guard<std::mutex> lock(lock_);
1032+
count_ = count;
1033+
current_ = count;
1034+
}
1035+
1036+
private:
1037+
std::mutex lock_;
1038+
std::condition_variable cv_;
1039+
size_t count_;
1040+
size_t current_;
1041+
};
1042+
1043+
ThreadBarrier parse_barrier_{0};
1044+
ThreadBarrier alloc_output_barrier_{0};
1045+
ThreadBarrier create_images_barrier_{0};
9881046
};
9891047

9901048
} // namespace imgcodec

0 commit comments

Comments
 (0)