Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 120 additions & 54 deletions dali/operators/imgcodec/image_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "dali/pipeline/operator/common.h"
#include "dali/pipeline/operator/operator.h"


#if not(WITH_DYNAMIC_NVIMGCODEC_ENABLED)
nvimgcodecStatus_t get_libjpeg_turbo_extension_desc(nvimgcodecExtensionDesc_t *ext_desc);
nvimgcodecStatus_t get_libtiff_extension_desc(nvimgcodecExtensionDesc_t *ext_desc);
Expand Down Expand Up @@ -674,10 +675,10 @@ class ImageDecoder : public StatelessOperator<Backend> {
TensorListShape<> out_shape(nsamples, 3);

const bool use_cache = cache_ && cache_->IsCacheEnabled() && dtype_ == DALI_UINT8;
auto setup_block = [&](int block_idx, int nblocks, int tid) {
auto setup_block = [&](int block_idx, int nblocks) {
int i_start = nsamples * block_idx / nblocks;
int i_end = nsamples * (block_idx + 1) / nblocks;
DomainTimeRange tr("Setup #" + std::to_string(block_idx) + "/" + std::to_string(nblocks),
DomainTimeRange tr("Parse #" + std::to_string(block_idx) + "/" + std::to_string(nblocks),
DomainTimeRange::kOrange);
for (int i = i_start; i < i_end; i++) {
auto *st = state_[i].get();
Expand Down Expand Up @@ -729,42 +730,10 @@ class ImageDecoder : public StatelessOperator<Backend> {
}
};

int nsamples_per_block = 16;
int nblocks = std::max(1, nsamples / nsamples_per_block);
int ntasks = std::min<int>(nblocks, std::min<int>(8, tp_->NumThreads() + 1));

if (ntasks < 2) {
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
setup_block(0, 1, -1); // run all in current thread
} else {
int block_idx = 0;
atomic_idx_.store(0);
auto setup_task = [&, nblocks](int tid) {
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
int block_idx;
while ((block_idx = atomic_idx_.fetch_add(1)) < nblocks) {
setup_block(block_idx, nblocks, tid);
}
};

for (int task_idx = 0; task_idx < ntasks - 1; task_idx++) {
tp_->AddWork(setup_task, -task_idx);
}
assert(ntasks >= 2);
tp_->RunAll(false); // start work but not wait
setup_task(-1); // last task in current thread
tp_->WaitForWork(); // wait for the other threads
}

// Allocate the memory for the outputs...
{
DomainTimeRange tr("Alloc output", DomainTimeRange::kOrange);
output.Resize(out_shape);
}
// ... and create image descriptors.

// The image descriptors are created in parallel, in block-wise fashion.
auto init_desc_task = [&](int start_sample, int end_sample) {
auto init_desc_task = [&](int block_idx, int nblocks) {
int start_sample = nsamples * block_idx / nblocks;
int end_sample = nsamples * (block_idx + 1) / nblocks;
for (int orig_idx = start_sample; orig_idx < end_sample; orig_idx++) {
auto &st = *state_[orig_idx];
if (use_cache && st.load_from_cache) {
Expand Down Expand Up @@ -792,26 +761,79 @@ class ImageDecoder : public StatelessOperator<Backend> {
}
};

// Just one task? Run it in this thread!
int nsamples_per_block = 16;
int nblocks = std::max(1, nsamples / nsamples_per_block);
int ntasks = std::min<int>(nblocks, std::min<int>(8, tp_->NumThreads() + 1));

if (ntasks < 2) {
DomainTimeRange tr("Create images", DomainTimeRange::kOrange);
init_desc_task(0, nsamples);
// run all in current thread
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
{
DomainTimeRange tr("Parse", DomainTimeRange::kOrange);
for (int block_idx = 0; block_idx < nblocks; block_idx++) {
setup_block(block_idx, nblocks);
}
}
{
DomainTimeRange tr("Alloc output", DomainTimeRange::kOrange);
output.Resize(out_shape);
}
{
DomainTimeRange tr("Create images", DomainTimeRange::kOrange);
for (int block_idx = 0; block_idx < nblocks; block_idx++) {
init_desc_task(block_idx, nblocks);
}
}
} else {
DomainTimeRange tr("Create images", DomainTimeRange::kOrange);
// Many tasks? Run in thread pool.
// The first span of tasks is processed in the main operator thread.
for (int i = 1; i < ntasks; i++) {
int start = i * nsamples / ntasks;
int end = (i + 1) * nsamples / ntasks;
tp_->AddWork([&, start, end](int) {
init_desc_task(start, end);
});
// run in parallel
int block_idx = 0;
// relaxed, only need atomicity, not ordering
atomic_idx_.store(0, std::memory_order_relaxed);
parse_barrier_.Reset(ntasks);
alloc_output_barrier_.Reset(ntasks);
create_images_barrier_.Reset(ntasks);
auto setup_task = [&](int tid) {
int sample_idx;
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
{
DomainTimeRange tr("Parse", DomainTimeRange::kOrange);
while ((block_idx = atomic_idx_.fetch_add(1, std::memory_order_relaxed)) < nblocks) {
setup_block(block_idx, nblocks);
}
}
parse_barrier_.ArriveAndWait(); // wait until parsing is done

if (tid == -1) {
DomainTimeRange tr("Alloc output", DomainTimeRange::kOrange);
output.Resize(out_shape);
atomic_idx_.store(0, std::memory_order_relaxed);
alloc_output_barrier_.Arrive(); // No need to wait here, we are in the main thread
} else {
alloc_output_barrier_.ArriveAndWait(); // wait until allocation is done
}

// Create image descriptors
{
DomainTimeRange tr("Create images", DomainTimeRange::kOrange);
while ((block_idx = atomic_idx_.fetch_add(1, std::memory_order_relaxed)) < nblocks) {
init_desc_task(block_idx, nblocks);
}
}
// the main thread needs to wait until creating images is done
if (tid == -1) {
create_images_barrier_.ArriveAndWait();
} else {
create_images_barrier_.Arrive();
}
};

for (int task_idx = 0; task_idx < ntasks - 1; task_idx++) {
tp_->AddWork(setup_task, -task_idx);
}
// Start processing of subsequent segments...
tp_->RunAll(false);
// ...and process the 1st segment in this thread.
init_desc_task(0, nsamples / ntasks);
tp_->WaitForWork();
assert(ntasks >= 2);
tp_->RunAll(false); // start work but not wait
setup_task(-1); // last task in current thread
// tp_->WaitForWork(); // wait for the other threads
}

bool any_need_processing = false;
Expand Down Expand Up @@ -977,6 +999,50 @@ class ImageDecoder : public StatelessOperator<Backend> {
std::vector<nvimgcodecExtension_t> extensions_;

std::vector<std::function<void(int)>> nvimgcodec_scheduled_tasks_;

class ThreadBarrier {
public:
explicit ThreadBarrier(std::size_t count) : count_(count), current_(count) {}
void Arrive() {
std::unique_lock<std::mutex> lock(lock_);
if (current_ == 0) {
throw std::logic_error("barrier is already completed");
}
current_--;
if (current_ == 0) {
cv_.notify_all();
}
}
void ArriveAndWait(bool reset = false) {
std::unique_lock<std::mutex> lock(lock_);
if (current_ == 0) {
throw std::logic_error("barrier is already completed");
}
current_--;
if (current_ == 0 || count_ == 0) {
if (reset)
current_ = count_;
cv_.notify_all();
} else {
cv_.wait(lock, [this] { return current_ == 0; });
}
}
void Reset(std::size_t count) {
std::lock_guard<std::mutex> lock(lock_);
count_ = count;
current_ = count;
}

private:
std::mutex lock_;
std::condition_variable cv_;
size_t count_;
size_t current_;
};

ThreadBarrier parse_barrier_{0};
ThreadBarrier alloc_output_barrier_{0};
ThreadBarrier create_images_barrier_{0};
};

} // namespace imgcodec
Expand Down
10 changes: 6 additions & 4 deletions dali/pipeline/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ ThreadPool::~ThreadPool() {

void ThreadPool::AddWork(Work work, int64_t priority, bool start_immediately) {
bool started_before = started_;
outstanding_work_.fetch_add(1);
outstanding_work_.fetch_add(1, std::memory_order_relaxed);
if (started_before) {
std::lock_guard lock(queue_lock_);
work_queue_.push({priority, std::move(work)});
Expand All @@ -81,10 +81,12 @@ void ThreadPool::AddWork(Work work, int64_t priority, bool start_immediately) {

// Blocks until all work issued to the thread pool is complete
void ThreadPool::WaitForWork(bool checkForErrors) {
if (outstanding_work_.load()) {
// Wait for outstanding work to complete
if (outstanding_work_.load(std::memory_order_relaxed) > 0) {
// If still not complete, use condition variable
std::unique_lock lock(completed_mutex_);
completed_.wait(lock, [&, this] {
return this->outstanding_work_ == 0;
return this->outstanding_work_.load(std::memory_order_relaxed) == 0;
});
}
started_ = false;
Expand Down Expand Up @@ -184,7 +186,7 @@ void ThreadPool::ThreadMain(int thread_id, int device_id, bool set_affinity,
// If it reaches zero, we must safely notify the potential threads waiting for the work
// to complete.
// NOTE: We don't have to acquire the mutex until the number of waiting threads reaches 0.
if (--outstanding_work_ == 0) {
if (outstanding_work_.fetch_sub(1, std::memory_order_relaxed) == 1) {
// We don't need to guard the modification of the atomic value with a mutex -
// however, we need to lock it briefly to make sure we don't have this scenario:
//
Expand Down
Loading