Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
cf64a66
fix: eliminate data race on mNextSeq in pwrite path
Apr 13, 2026
b32ea3a
fix: make SPSC head pointer atomic to eliminate producer/consumer race
Apr 13, 2026
6316345
fix: make ReadPool counters atomic and SPSC produced/consumed atomic
Apr 14, 2026
fe5d994
build: switch to C++23 with GCC 15 (conda-forge toolchain)
KimBioInfoStudio Apr 16, 2026
b1ea866
refactor(5): replace cerr with std::println in PE/SE reporting blocks
KimBioInfoStudio Apr 16, 2026
f390419
refactor(4): replace mFinishedThreads atomic counter with std::latch
KimBioInfoStudio Apr 16, 2026
275bd43
refactor(3): replace raw thread* with std::jthread + std::optional, r…
KimBioInfoStudio Apr 16, 2026
caf7ed2
refactor(2): replace usleep(100) busy-wait with std::counting_semapho…
KimBioInfoStudio Apr 16, 2026
40329bb
refactor(1): replace mutex+CV backpressure polling with atomic::wait/…
KimBioInfoStudio Apr 16, 2026
536e911
fix(2): revert counting_semaphore, use yield() — semaphore release/ac…
KimBioInfoStudio Apr 16, 2026
5481356
perf: replace yield() with hwy::BlockUntilDifferent/WakeAll futex
KimBioInfoStudio Apr 17, 2026
2f33c85
fix: restore CXX ?= g++ in Makefile (remove hardcoded local toolchain…
KimBioInfoStudio Apr 17, 2026
04452e5
refactor: downgrade C++20/23 to C++11 for Apple Clang compatibility
KimBioInfoStudio Apr 17, 2026
11c0249
fix: writer thread deadlock with BlockUntilDifferent
KimBioInfoStudio Apr 17, 2026
f5f4bca
perf: replace pwrite ring sleep_for(1µs) spin with Highway futex
KimBioInfoStudio Apr 17, 2026
2f348b3
perf: replace bgzf mutex+condvar with Highway futex
KimBioInfoStudio Apr 17, 2026
5c0c444
fix: eliminate data races in writer thread
KimBioInfoStudio Apr 23, 2026
c093f88
fix: break reader/worker/writer circular deadlock and close lost-wake…
KimBioInfoStudio Apr 24, 2026
4a31bd7
Merge upstream 1.3.3 updates into threading refactor.
KimBioInfoStudio Apr 29, 2026
f01c530
fix: eliminate macOS build warnings
KimBioInfoStudio Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 68 additions & 52 deletions src/bgzf.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
#include <cstring>
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <isa-l/igzip_lib.h>
#include <hwy/contrib/thread_pool/futex.h>

static const int BGZF_HEADER_SIZE = 18;
static const int BGZF_MAX_BLOCK_SIZE = 65536;
Expand All @@ -32,44 +31,46 @@ static inline uint32_t bgzfBlockSize(const unsigned char* h) {

// Parallel BGZF decompression with fixed thread pool.
// All decompress threads start upfront; ring buffer provides backpressure.
// Slot lifecycle: FREE → COMPRESSED → DECOMPRESSING → READY → FREE
// Slot lifecycle: FREE -> COMPRESSED -> DECOMPRESSING -> READY -> FREE
// Synchronization: Highway futex (BlockUntilDifferent/WakeAll) on per-slot
// atomic state, plus a global mSlotNotify counter for decompressor wakeup.
class BgzfMtReader {
enum SlotState { FREE, COMPRESSED, DECOMPRESSING, READY, DONE };
static const uint32_t STATE_FREE = 0;
static const uint32_t STATE_COMPRESSED = 1;
static const uint32_t STATE_DECOMPRESSING = 2;
static const uint32_t STATE_READY = 3;
static const uint32_t STATE_DONE = 4;

struct alignas(64) Slot {
unsigned char comp[BGZF_MAX_BLOCK_SIZE];
unsigned char decomp[BGZF_MAX_BLOCK_SIZE];
int compLen;
int decompLen;
std::atomic<SlotState> state{FREE};
std::atomic<uint32_t> state{STATE_FREE};
};

public:
// threadBudget: max decompress threads allowed for this reader.
// 0 = auto (use half of available CPU cores).
// Caller should compute: (hardware_concurrency - workers - readers - writers) / num_gz_inputs
BgzfMtReader(FILE* fp, int threadBudget = 0)
: mFp(fp), mConsumeIdx(0), mConsumeOffset(0),
mProduceIdx(0), mStop(false) {
mSlotNotify.store(0, std::memory_order_relaxed);
int cpus = std::thread::hardware_concurrency();
if (cpus < 2) cpus = 2;
mMaxPool = (threadBudget > 0) ? threadBudget : std::max(1, cpus / 2);
mRingSize = mMaxPool * 4;
if (mRingSize < 16) mRingSize = 16;
if (mRingSize > 64) mRingSize = 64; // cap at ~8MB per reader
if (mRingSize > 64) mRingSize = 64;
mSlots = new Slot[mRingSize];

// Start all decompress threads upfront — ring buffer handles backpressure
for (int i = 0; i < mMaxPool; i++)
mPool.push_back(new std::thread(&BgzfMtReader::decompWorker, this));
mReaderThread = new std::thread(&BgzfMtReader::readerLoop, this);
}

~BgzfMtReader() {
mStop = true;
mDecompCv.notify_all();
mProduceCv.notify_all();
mConsumeCv.notify_all();
mStop.store(true, std::memory_order_release);
// Wake all waiters so they see mStop
notifyAll();
if (mReaderThread) { mReaderThread->join(); delete mReaderThread; }
for (auto* t : mPool) { t->join(); delete t; }
delete[] mSlots;
Expand All @@ -79,14 +80,13 @@ class BgzfMtReader {
int filled = 0;
while (filled < outBufSize) {
Slot& s = mSlots[mConsumeIdx % mRingSize];
{
std::unique_lock<std::mutex> lk(mConsumeMtx);
mConsumeCv.wait(lk, [&]() {
SlotState v = s.state.load(std::memory_order_acquire);
return v == READY || v == DONE;
});
// Wait for slot to become READY or DONE
while (true) {
uint32_t v = s.state.load(std::memory_order_acquire);
if (v == STATE_READY || v == STATE_DONE) break;
hwy::BlockUntilDifferent(v, s.state);
}
if (s.state.load(std::memory_order_acquire) == DONE) break;
if (s.state.load(std::memory_order_acquire) == STATE_DONE) break;

int avail = s.decompLen - mConsumeOffset;
int tocopy = (outBufSize - filled) < avail ? (outBufSize - filled) : avail;
Expand All @@ -95,27 +95,39 @@ class BgzfMtReader {
mConsumeOffset += tocopy;

if (mConsumeOffset >= s.decompLen) {
s.state.store(FREE, std::memory_order_release);
s.state.store(STATE_FREE, std::memory_order_release);
hwy::WakeAll(s.state);
mConsumeOffset = 0;
mConsumeIdx++;
mProduceCv.notify_one();
// Notify producer that a slot is free
mSlotNotify.fetch_add(1, std::memory_order_release);
hwy::WakeAll(mSlotNotify);
}
}
return filled;
}

private:
void notifyAll() {
// Wake all threads blocked on slot states or mSlotNotify
for (int i = 0; i < mRingSize; i++) {
hwy::WakeAll(mSlots[i].state);
}
mSlotNotify.fetch_add(1, std::memory_order_release);
hwy::WakeAll(mSlotNotify);
}

void readerLoop() {
unsigned char header[BGZF_HEADER_SIZE];

while (!mStop) {
while (!mStop.load(std::memory_order_acquire)) {
Slot& s = mSlots[mProduceIdx % mRingSize];
{
std::unique_lock<std::mutex> lk(mProduceMtx);
mProduceCv.wait(lk, [&]() {
return s.state.load(std::memory_order_acquire) == FREE || mStop;
});
if (mStop) break;
// Wait for slot to become FREE
while (true) {
if (mStop.load(std::memory_order_acquire)) return;
uint32_t v = s.state.load(std::memory_order_acquire);
if (v == STATE_FREE) break;
hwy::BlockUntilDifferent(v, s.state);
}

size_t n = fread(header, 1, BGZF_HEADER_SIZE, mFp);
Expand All @@ -134,22 +146,24 @@ class BgzfMtReader {
}

s.compLen = bsize;
s.state.store(COMPRESSED, std::memory_order_release);
s.state.store(STATE_COMPRESSED, std::memory_order_release);
hwy::WakeAll(s.state);
mProduceIdx++;
mDecompCv.notify_all();
// Notify decompressors that a slot has data
mSlotNotify.fetch_add(1, std::memory_order_release);
hwy::WakeAll(mSlotNotify);
}
}

void decompWorker() {
while (!mStop) {
while (!mStop.load(std::memory_order_acquire)) {
Slot* target = nullptr;
{
std::unique_lock<std::mutex> lk(mDecompMtx);
mDecompCv.wait(lk, [&]() {
return mStop.load(std::memory_order_relaxed) || claimSlot(&target);
});
if (mStop && !target) return;
if (!target) continue;
if (!claimSlot(&target)) {
if (mStop.load(std::memory_order_acquire)) return;
// No slot available — block until state changes somewhere
uint32_t cur = mSlotNotify.load(std::memory_order_acquire);
hwy::BlockUntilDifferent(cur, mSlotNotify);
continue;
}

struct inflate_state ist;
Expand All @@ -162,8 +176,11 @@ class BgzfMtReader {
int ret = isal_inflate_stateless(&ist);
target->decompLen = (ret == ISAL_DECOMP_OK) ? (int)ist.total_out : 0;

target->state.store(READY, std::memory_order_release);
mConsumeCv.notify_one();
target->state.store(STATE_READY, std::memory_order_release);
hwy::WakeAll(target->state);
// Notify consumer that data is ready
mSlotNotify.fetch_add(1, std::memory_order_release);
hwy::WakeAll(mSlotNotify);
}
}

Expand All @@ -172,8 +189,8 @@ class BgzfMtReader {
int tail = mProduceIdx;
for (int i = head; i < tail; i++) {
Slot& s = mSlots[i % mRingSize];
SlotState expected = COMPRESSED;
if (s.state.compare_exchange_strong(expected, DECOMPRESSING,
uint32_t expected = STATE_COMPRESSED;
if (s.state.compare_exchange_strong(expected, STATE_DECOMPRESSING,
std::memory_order_acq_rel)) {
*out = &s;
return true;
Expand All @@ -183,25 +200,24 @@ class BgzfMtReader {
}

void markDone(Slot& s) {
s.state.store(DONE, std::memory_order_release);
mConsumeCv.notify_all();
mDecompCv.notify_all();
s.state.store(STATE_DONE, std::memory_order_release);
hwy::WakeAll(s.state);
mSlotNotify.fetch_add(1, std::memory_order_release);
hwy::WakeAll(mSlotNotify);
}

FILE* mFp; // not owned — caller must keep open for lifetime of BgzfMtReader
FILE* mFp;
Slot* mSlots;
int mRingSize;
std::atomic<int> mConsumeIdx;
int mConsumeOffset; // only accessed by consumer thread
int mConsumeOffset;
std::atomic<int> mProduceIdx;
int mMaxPool;
std::atomic<bool> mStop;
std::atomic<uint32_t> mSlotNotify; // monotonic counter for cross-thread wakeup

std::thread* mReaderThread;
std::vector<std::thread*> mPool;

std::mutex mConsumeMtx, mProduceMtx, mDecompMtx;
std::condition_variable mConsumeCv, mProduceCv, mDecompCv;
};

#endif
3 changes: 2 additions & 1 deletion src/htmlreporter.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "htmlreporter.h"
#include <chrono>
#include <cstdio>
#include <memory.h>
#include "knownadapters.h"

Expand Down Expand Up @@ -614,7 +615,7 @@ const string HtmlReporter::getCurrentSystemTime()
auto tt = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
struct tm* ptm = localtime(&tt);
char date[60] = {0};
sprintf(date, "%d-%02d-%02d %02d:%02d:%02d",
snprintf(date, sizeof(date), "%d-%02d-%02d %02d:%02d:%02d",
(int)ptm->tm_year + 1900,(int)ptm->tm_mon + 1,(int)ptm->tm_mday,
(int)ptm->tm_hour,(int)ptm->tm_min,(int)ptm->tm_sec);
return std::string(date);
Expand Down
9 changes: 5 additions & 4 deletions src/matcher.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "matcher.h"
#include <vector>

Matcher::Matcher(){
}
Expand All @@ -9,8 +10,8 @@ Matcher::~Matcher(){

bool Matcher::matchWithOneInsertion(const char* insData, const char* normalData, int cmplen, int diffLimit) {
// accumlated mismatches from left/right
int accMismatchFromLeft[cmplen];
int accMismatchFromRight[cmplen];
vector<int> accMismatchFromLeft(cmplen);
vector<int> accMismatchFromRight(cmplen);

// accMismatchFromLeft[0]: head vs. head
// accMismatchFromRight[cmplen-1]: tail vs. tail
Expand Down Expand Up @@ -55,8 +56,8 @@ bool Matcher::matchWithOneInsertion(const char* insData, const char* normalData,

int Matcher::diffWithOneInsertion(const char* insData, const char* normalData, int cmplen, int diffLimit) {
// accumlated mismatches from left/right
int accMismatchFromLeft[cmplen];
int accMismatchFromRight[cmplen];
vector<int> accMismatchFromLeft(cmplen);
vector<int> accMismatchFromRight(cmplen);

// accMismatchFromLeft[0]: head vs. head
// accMismatchFromRight[cmplen-1]: tail vs. tail
Expand Down
Loading
Loading