diff --git a/src/bgzf.h b/src/bgzf.h index 9b02852..8cbf43e 100644 --- a/src/bgzf.h +++ b/src/bgzf.h @@ -6,10 +6,9 @@ #include #include #include -#include -#include #include #include +#include static const int BGZF_HEADER_SIZE = 18; static const int BGZF_MAX_BLOCK_SIZE = 65536; @@ -32,44 +31,46 @@ static inline uint32_t bgzfBlockSize(const unsigned char* h) { // Parallel BGZF decompression with fixed thread pool. // All decompress threads start upfront; ring buffer provides backpressure. -// Slot lifecycle: FREE → COMPRESSED → DECOMPRESSING → READY → FREE +// Slot lifecycle: FREE -> COMPRESSED -> DECOMPRESSING -> READY -> FREE +// Synchronization: Highway futex (BlockUntilDifferent/WakeAll) on per-slot +// atomic state, plus a global mSlotNotify counter for decompressor wakeup. class BgzfMtReader { - enum SlotState { FREE, COMPRESSED, DECOMPRESSING, READY, DONE }; + static const uint32_t STATE_FREE = 0; + static const uint32_t STATE_COMPRESSED = 1; + static const uint32_t STATE_DECOMPRESSING = 2; + static const uint32_t STATE_READY = 3; + static const uint32_t STATE_DONE = 4; struct alignas(64) Slot { unsigned char comp[BGZF_MAX_BLOCK_SIZE]; unsigned char decomp[BGZF_MAX_BLOCK_SIZE]; int compLen; int decompLen; - std::atomic state{FREE}; + std::atomic state{STATE_FREE}; }; public: - // threadBudget: max decompress threads allowed for this reader. - // 0 = auto (use half of available CPU cores). - // Caller should compute: (hardware_concurrency - workers - readers - writers) / num_gz_inputs BgzfMtReader(FILE* fp, int threadBudget = 0) : mFp(fp), mConsumeIdx(0), mConsumeOffset(0), mProduceIdx(0), mStop(false) { + mSlotNotify.store(0, std::memory_order_relaxed); int cpus = std::thread::hardware_concurrency(); if (cpus < 2) cpus = 2; mMaxPool = (threadBudget > 0) ? threadBudget : std::max(1, cpus / 2); mRingSize = mMaxPool * 4; if (mRingSize < 16) mRingSize = 16; - if (mRingSize > 64) mRingSize = 64; // cap at ~8MB per reader + if (mRingSize > 64) mRingSize = 64; mSlots = new Slot[mRingSize]; - // Start all decompress threads upfront — ring buffer handles backpressure for (int i = 0; i < mMaxPool; i++) mPool.push_back(new std::thread(&BgzfMtReader::decompWorker, this)); mReaderThread = new std::thread(&BgzfMtReader::readerLoop, this); } ~BgzfMtReader() { - mStop = true; - mDecompCv.notify_all(); - mProduceCv.notify_all(); - mConsumeCv.notify_all(); + mStop.store(true, std::memory_order_release); + // Wake all waiters so they see mStop + notifyAll(); if (mReaderThread) { mReaderThread->join(); delete mReaderThread; } for (auto* t : mPool) { t->join(); delete t; } delete[] mSlots; @@ -79,14 +80,13 @@ class BgzfMtReader { int filled = 0; while (filled < outBufSize) { Slot& s = mSlots[mConsumeIdx % mRingSize]; - { - std::unique_lock lk(mConsumeMtx); - mConsumeCv.wait(lk, [&]() { - SlotState v = s.state.load(std::memory_order_acquire); - return v == READY || v == DONE; - }); + // Wait for slot to become READY or DONE + while (true) { + uint32_t v = s.state.load(std::memory_order_acquire); + if (v == STATE_READY || v == STATE_DONE) break; + hwy::BlockUntilDifferent(v, s.state); } - if (s.state.load(std::memory_order_acquire) == DONE) break; + if (s.state.load(std::memory_order_acquire) == STATE_DONE) break; int avail = s.decompLen - mConsumeOffset; int tocopy = (outBufSize - filled) < avail ? (outBufSize - filled) : avail; @@ -95,27 +95,39 @@ class BgzfMtReader { mConsumeOffset += tocopy; if (mConsumeOffset >= s.decompLen) { - s.state.store(FREE, std::memory_order_release); + s.state.store(STATE_FREE, std::memory_order_release); + hwy::WakeAll(s.state); mConsumeOffset = 0; mConsumeIdx++; - mProduceCv.notify_one(); + // Notify producer that a slot is free + mSlotNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mSlotNotify); } } return filled; } private: + void notifyAll() { + // Wake all threads blocked on slot states or mSlotNotify + for (int i = 0; i < mRingSize; i++) { + hwy::WakeAll(mSlots[i].state); + } + mSlotNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mSlotNotify); + } + void readerLoop() { unsigned char header[BGZF_HEADER_SIZE]; - while (!mStop) { + while (!mStop.load(std::memory_order_acquire)) { Slot& s = mSlots[mProduceIdx % mRingSize]; - { - std::unique_lock lk(mProduceMtx); - mProduceCv.wait(lk, [&]() { - return s.state.load(std::memory_order_acquire) == FREE || mStop; - }); - if (mStop) break; + // Wait for slot to become FREE + while (true) { + if (mStop.load(std::memory_order_acquire)) return; + uint32_t v = s.state.load(std::memory_order_acquire); + if (v == STATE_FREE) break; + hwy::BlockUntilDifferent(v, s.state); } size_t n = fread(header, 1, BGZF_HEADER_SIZE, mFp); @@ -134,22 +146,24 @@ class BgzfMtReader { } s.compLen = bsize; - s.state.store(COMPRESSED, std::memory_order_release); + s.state.store(STATE_COMPRESSED, std::memory_order_release); + hwy::WakeAll(s.state); mProduceIdx++; - mDecompCv.notify_all(); + // Notify decompressors that a slot has data + mSlotNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mSlotNotify); } } void decompWorker() { - while (!mStop) { + while (!mStop.load(std::memory_order_acquire)) { Slot* target = nullptr; - { - std::unique_lock lk(mDecompMtx); - mDecompCv.wait(lk, [&]() { - return mStop.load(std::memory_order_relaxed) || claimSlot(&target); - }); - if (mStop && !target) return; - if (!target) continue; + if (!claimSlot(&target)) { + if (mStop.load(std::memory_order_acquire)) return; + // No slot available — block until state changes somewhere + uint32_t cur = mSlotNotify.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mSlotNotify); + continue; } struct inflate_state ist; @@ -162,8 +176,11 @@ class BgzfMtReader { int ret = isal_inflate_stateless(&ist); target->decompLen = (ret == ISAL_DECOMP_OK) ? (int)ist.total_out : 0; - target->state.store(READY, std::memory_order_release); - mConsumeCv.notify_one(); + target->state.store(STATE_READY, std::memory_order_release); + hwy::WakeAll(target->state); + // Notify consumer that data is ready + mSlotNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mSlotNotify); } } @@ -172,8 +189,8 @@ class BgzfMtReader { int tail = mProduceIdx; for (int i = head; i < tail; i++) { Slot& s = mSlots[i % mRingSize]; - SlotState expected = COMPRESSED; - if (s.state.compare_exchange_strong(expected, DECOMPRESSING, + uint32_t expected = STATE_COMPRESSED; + if (s.state.compare_exchange_strong(expected, STATE_DECOMPRESSING, std::memory_order_acq_rel)) { *out = &s; return true; @@ -183,25 +200,24 @@ class BgzfMtReader { } void markDone(Slot& s) { - s.state.store(DONE, std::memory_order_release); - mConsumeCv.notify_all(); - mDecompCv.notify_all(); + s.state.store(STATE_DONE, std::memory_order_release); + hwy::WakeAll(s.state); + mSlotNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mSlotNotify); } - FILE* mFp; // not owned — caller must keep open for lifetime of BgzfMtReader + FILE* mFp; Slot* mSlots; int mRingSize; std::atomic mConsumeIdx; - int mConsumeOffset; // only accessed by consumer thread + int mConsumeOffset; std::atomic mProduceIdx; int mMaxPool; std::atomic mStop; + std::atomic mSlotNotify; // monotonic counter for cross-thread wakeup std::thread* mReaderThread; std::vector mPool; - - std::mutex mConsumeMtx, mProduceMtx, mDecompMtx; - std::condition_variable mConsumeCv, mProduceCv, mDecompCv; }; #endif diff --git a/src/htmlreporter.cpp b/src/htmlreporter.cpp index 697f232..f6c0485 100644 --- a/src/htmlreporter.cpp +++ b/src/htmlreporter.cpp @@ -1,5 +1,6 @@ #include "htmlreporter.h" #include +#include #include #include "knownadapters.h" @@ -614,7 +615,7 @@ const string HtmlReporter::getCurrentSystemTime() auto tt = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); struct tm* ptm = localtime(&tt); char date[60] = {0}; - sprintf(date, "%d-%02d-%02d %02d:%02d:%02d", + snprintf(date, sizeof(date), "%d-%02d-%02d %02d:%02d:%02d", (int)ptm->tm_year + 1900,(int)ptm->tm_mon + 1,(int)ptm->tm_mday, (int)ptm->tm_hour,(int)ptm->tm_min,(int)ptm->tm_sec); return std::string(date); diff --git a/src/matcher.cpp b/src/matcher.cpp index c438ca6..701660e 100644 --- a/src/matcher.cpp +++ b/src/matcher.cpp @@ -1,4 +1,5 @@ #include "matcher.h" +#include Matcher::Matcher(){ } @@ -9,8 +10,8 @@ Matcher::~Matcher(){ bool Matcher::matchWithOneInsertion(const char* insData, const char* normalData, int cmplen, int diffLimit) { // accumlated mismatches from left/right - int accMismatchFromLeft[cmplen]; - int accMismatchFromRight[cmplen]; + vector accMismatchFromLeft(cmplen); + vector accMismatchFromRight(cmplen); // accMismatchFromLeft[0]: head vs. head // accMismatchFromRight[cmplen-1]: tail vs. tail @@ -55,8 +56,8 @@ bool Matcher::matchWithOneInsertion(const char* insData, const char* normalData, int Matcher::diffWithOneInsertion(const char* insData, const char* normalData, int cmplen, int diffLimit) { // accumlated mismatches from left/right - int accMismatchFromLeft[cmplen]; - int accMismatchFromRight[cmplen]; + vector accMismatchFromLeft(cmplen); + vector accMismatchFromRight(cmplen); // accMismatchFromLeft[0]: head vs. head // accMismatchFromRight[cmplen-1]: tail vs. tail diff --git a/src/peprocessor.cpp b/src/peprocessor.cpp index 04accbb..f886062 100644 --- a/src/peprocessor.cpp +++ b/src/peprocessor.cpp @@ -1,4 +1,5 @@ #include "peprocessor.h" +#include "hwy/contrib/thread_pool/futex.h" #include "fastqreader.h" #include #include @@ -14,10 +15,10 @@ #include "polyx.h" PairEndProcessor::PairEndProcessor(Options* opt){ + mWorkersLatch.store(opt->thread); mOptions = opt; mLeftReaderFinished = false; mRightReaderFinished = false; - mFinishedThreads = 0; mFilter = new Filter(opt); mUmiProcessor = new UmiProcessor(opt); @@ -41,6 +42,7 @@ PairEndProcessor::PairEndProcessor(Options* opt){ mLeftPackReadCounter = 0; mRightPackReadCounter = 0; mPackProcessedCounter = 0; + mPackProducedCounter = 0; mLeftReadPool = new ReadPool(mOptions); mRightReadPool = new ReadPool(mOptions); @@ -190,6 +192,7 @@ bool PairEndProcessor::process(){ readerLeft->join(); readerRight->join(); } + for(int t=0; tthread; t++){ threads[t]->join(); } @@ -233,25 +236,25 @@ bool PairEndProcessor::process(){ Stats* finalPostStats2 = Stats::merge(postStats2); FilterResult* finalFilterResult = FilterResult::merge(filterResults); - cerr << "Read1 before filtering:"<print(); cerr << endl; - cerr << "Read2 before filtering:"<print(); cerr << endl; if(!mOptions->merge.enabled) { - cerr << "Read1 after filtering:"<print(); cerr << endl; - cerr << "Read2 after filtering:"<print(); } else { - cerr << "Merged and filtered:"<print(); } cerr << endl; - cerr << "Filtering result:"<print(); double dupRate = 0.0; @@ -279,6 +282,7 @@ bool PairEndProcessor::process(){ } // make JSON report + JsonReporter jr(mOptions); jr.setDup(dupRate); jr.setInsertHist(mInsertSizeHist, peakInsertSize); @@ -292,43 +296,18 @@ bool PairEndProcessor::process(){ // clean up for(int t=0; tthread; t++){ - delete threads[t]; - threads[t] = NULL; delete configs[t]; configs[t] = NULL; } - if(readerInterveleaved) { - delete readerInterveleaved; - } else { - delete readerLeft; - delete readerRight; - } - delete finalPreStats1; delete finalPostStats1; delete finalPreStats2; delete finalPostStats2; delete finalFilterResult; - delete[] threads; delete[] configs; - if(leftWriterThread) - delete leftWriterThread; - if(rightWriterThread) - delete rightWriterThread; - if(unpairedLeftWriterThread) - delete unpairedLeftWriterThread; - if(unpairedRightWriterThread) - delete unpairedRightWriterThread; - if(mergedWriterThread) - delete mergedWriterThread; - if(failedWriterThread) - delete failedWriterThread; - if(overlappedWriterThread) - delete overlappedWriterThread; - if(!mOptions->split.enabled) closeOutput(); @@ -697,7 +676,7 @@ bool PairEndProcessor::processPairEnd(ReadPack* leftPack, ReadPack* rightPack, T delete rightPack; mPackProcessedCounter.fetch_add(1, std::memory_order_release); - mBackpressureCV.notify_all(); + hwy::WakeAll(mPackProcessedCounter); return true; } @@ -769,7 +748,8 @@ void PairEndProcessor::readerTask(bool isLeft) mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(pack); mRightPackReadCounter++; } - mBackpressureCV.notify_all(); + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); data = NULL; if(read) { delete read; @@ -806,36 +786,35 @@ void PairEndProcessor::readerTask(bool isLeft) mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(pack); mRightPackReadCounter++; } - mBackpressureCV.notify_all(); + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); //re-initialize data for next pack data = new Read*[PACK_SIZE]; memset(data, 0, sizeof(Read*)*PACK_SIZE); - // if the processor is far behind this reader, sleep and wait to limit memory usage - { - std::unique_lock lk(mBackpressureMtx); - if(isLeft) { - while(mLeftPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ - slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); - } - } else { - while(mRightPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ - slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); - } + // if the processor is far behind this reader, wait to limit memory usage + if(isLeft) { + while(mLeftPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ + uint32_t cur = mPackProcessedCounter.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mPackProcessedCounter); + slept++; } - } - readNum += count; - // if the writer threads are far behind this producer, sleep and wait - // check this only when necessary - if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) { - std::unique_lock lk(mBackpressureMtx); - while( (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) || (mRightWriter && mRightWriter->bufferLength() > PACK_IN_MEM_LIMIT) ){ + } else { + while(mRightPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ + uint32_t cur = mPackProcessedCounter.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mPackProcessedCounter); slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); } } + readNum += count; + // NOTE: mLeftWriter->waitForBufferBelow() used to be gated here. It + // caused a mid-flight deadlock under -w>=23 + plain fq + adapter_fasta: + // writer consumes per-worker lists in strict round-robin, so one slow + // worker leaves its slot empty while others pile up, mBufferLength + // stays above the limit, the reader halts here, the slow worker never + // receives new input, its slot stays empty, and every thread blocks. + // Pack-level backpressure (mLeftPackReadCounter - mPackProcessedCounter + // above) already bounds in-flight memory without the cycle. // reset count to 0 count = 0; // re-evaluate split size @@ -861,7 +840,11 @@ void PairEndProcessor::readerTask(bool isLeft) else mRightInputLists[t]->setProducerFinished(); } - mBackpressureCV.notify_all(); + // Wake workers that may have latched a mPackProducedCounter snapshot just + // before setProducerFinished() ran; setProducerFinished does not bump the + // counter so without this they would miss the completion signal. + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); if(mOptions->verbose) { if(isLeft) { @@ -916,7 +899,9 @@ void PairEndProcessor::interleavedReaderTask() mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(packRight); mRightPackReadCounter++; - mBackpressureCV.notify_all(); + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); + dataLeft = NULL; dataRight = NULL; break; @@ -947,31 +932,25 @@ void PairEndProcessor::interleavedReaderTask() mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(packRight); mRightPackReadCounter++; - mBackpressureCV.notify_all(); + + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); //re-initialize data for next pack dataLeft = new Read*[PACK_SIZE]; dataRight = new Read*[PACK_SIZE]; memset(dataLeft, 0, sizeof(Read*)*PACK_SIZE); memset(dataRight, 0, sizeof(Read*)*PACK_SIZE); - // if the consumer is far behind this producer, sleep and wait to limit memory usage - { - std::unique_lock lk(mBackpressureMtx); - while(mLeftPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ - slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); - } + // if the consumer is far behind this producer, wait to limit memory usage + while(mLeftPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ + uint32_t cur = mPackProcessedCounter.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mPackProcessedCounter); + slept++; } readNum += count; - // if the writer threads are far behind this producer, sleep and wait - // check this only when necessary - if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) { - std::unique_lock lk(mBackpressureMtx); - while( (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) || (mRightWriter && mRightWriter->bufferLength() > PACK_IN_MEM_LIMIT) ){ - slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); - } - } + // NOTE: see readerTask() for why we no longer gate the reader on + // mLeftWriter->bufferLength here — round-robin writer + tight buffer + // limit caused a mid-flight deadlock. // reset count to 0 count = 0; // re-evaluate split size @@ -997,7 +976,10 @@ void PairEndProcessor::interleavedReaderTask() mLeftInputLists[t]->setProducerFinished(); mRightInputLists[t]->setProducerFinished(); } - mBackpressureCV.notify_all(); + // Wake any worker that snapshot mPackProducedCounter just before the + // setProducerFinished() above — without a counter bump they would miss it. + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); if(mOptions->verbose) { loginfo("interleaved: loading completed with " + to_string(mLeftPackReadCounter) + " packs"); @@ -1031,20 +1013,14 @@ void PairEndProcessor::processorTask(ThreadConfig* config) } else if(inputRight->isProducerFinished() && !inputRight->canBeConsumed()) { break; } else { - std::unique_lock lk(mBackpressureMtx); - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); + uint32_t cur = mPackProducedCounter.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mPackProducedCounter); } } inputLeft->setConsumerFinished(); inputRight->setConsumerFinished(); - int finishedCount = mFinishedThreads.fetch_add(1, std::memory_order_release) + 1; - if(mOptions->verbose) { - string msg = "thread " + to_string(config->getThreadId() + 1) + " data processing completed"; - loginfo(msg); - } - - if(finishedCount == mOptions->thread) { + if(mWorkersLatch.fetch_sub(1, std::memory_order_release) - 1 == 0) { if(mLeftWriter) mLeftWriter->setInputCompleted(); if(mRightWriter) @@ -1060,6 +1036,10 @@ void PairEndProcessor::processorTask(ThreadConfig* config) if(mOverlappedWriter) mOverlappedWriter->setInputCompleted(); } + if(mOptions->verbose) { + string msg = "thread " + to_string(config->getThreadId() + 1) + " data processing completed"; + loginfo(msg); + } if(mOptions->verbose) { string msg = "thread " + to_string(config->getThreadId() + 1) + " finished"; diff --git a/src/peprocessor.h b/src/peprocessor.h index 707e41c..4149d82 100644 --- a/src/peprocessor.h +++ b/src/peprocessor.h @@ -6,8 +6,6 @@ #include #include "read.h" #include -#include -#include #include #include "options.h" #include "threadconfig.h" @@ -17,6 +15,7 @@ #include "writerthread.h" #include "duplicate.h" #include "readpool.h" +#include using namespace std; @@ -46,7 +45,7 @@ class PairEndProcessor{ private: atomic_bool mLeftReaderFinished; atomic_bool mRightReaderFinished; - alignas(128) atomic_int mFinishedThreads; + std::atomic mWorkersLatch; Options* mOptions; Filter* mFilter; UmiProcessor* mUmiProcessor; @@ -63,12 +62,11 @@ class PairEndProcessor{ SingleProducerSingleConsumerList** mRightInputLists; size_t mLeftPackReadCounter; size_t mRightPackReadCounter; - alignas(128) atomic_long mPackProcessedCounter; + alignas(128) std::atomic mPackProcessedCounter; + alignas(128) std::atomic mPackProducedCounter; ReadPool* mLeftReadPool; ReadPool* mRightReadPool; atomic_bool shouldStopReading; - std::mutex mBackpressureMtx; - std::condition_variable mBackpressureCV; }; diff --git a/src/readpool.h b/src/readpool.h index 2b1bf73..616637e 100644 --- a/src/readpool.h +++ b/src/readpool.h @@ -7,6 +7,7 @@ #include #include #include +#include #include "read.h" #include "options.h" #include "singleproducersingleconsumerlist.h" @@ -29,10 +30,10 @@ class ReadPool{ private: Options* mOptions; SingleProducerSingleConsumerList** mBufferLists; - size_t mProduced; + std::atomic mProduced; size_t mConsumed; unsigned long mLimit; - bool mIsFull; + std::atomic mIsFull; }; #endif \ No newline at end of file diff --git a/src/seprocessor.cpp b/src/seprocessor.cpp index 76663b0..e68dd66 100644 --- a/src/seprocessor.cpp +++ b/src/seprocessor.cpp @@ -1,4 +1,5 @@ #include "seprocessor.h" +#include "hwy/contrib/thread_pool/futex.h" #include "fastqreader.h" #include #include @@ -13,9 +14,9 @@ #include "polyx.h" SingleEndProcessor::SingleEndProcessor(Options* opt){ + mWorkersLatch.store(opt->thread); mOptions = opt; mReaderFinished = false; - mFinishedThreads = 0; mFilter = new Filter(opt); mUmiProcessor = new UmiProcessor(opt); mLeftWriter = NULL; @@ -28,6 +29,7 @@ SingleEndProcessor::SingleEndProcessor(Options* opt){ mPackReadCounter = 0; mPackProcessedCounter = 0; + mPackProducedCounter = 0; mReadPool = new ReadPool(mOptions); } @@ -135,14 +137,14 @@ bool SingleEndProcessor::process(){ postStats.push_back(configs[t]->getPostStats1()); } - cerr << "Read1 before filtering:"<print(); cerr << endl; - cerr << "Read1 after filtering:"<print(); cerr << endl; - cerr << "Filtering result:"<print(); double dupRate = 0.0; @@ -152,7 +154,7 @@ bool SingleEndProcessor::process(){ cerr << "Duplication rate (may be overestimated since this is SE data): " << dupRate * 100.0 << "%" << endl; } - // make JSON report + // make JSON report JsonReporter jr(mOptions); jr.setDup(dupRate); jr.report(finalFilterResult, finalPreStats, finalPostStats); @@ -319,7 +321,7 @@ bool SingleEndProcessor::processSingleEnd(ReadPack* pack, ThreadConfig* config){ delete pack; mPackProcessedCounter.fetch_add(1, std::memory_order_release); - mBackpressureCV.notify_all(); + hwy::WakeAll(mPackProcessedCounter); return true; } @@ -349,7 +351,8 @@ void SingleEndProcessor::readerTask() pack->count = count; mInputLists[mPackReadCounter % mOptions->thread]->produce(pack); mPackReadCounter++; - mBackpressureCV.notify_all(); + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); data = NULL; if(read) { delete read; @@ -375,28 +378,23 @@ void SingleEndProcessor::readerTask() pack->count = count; mInputLists[mPackReadCounter % mOptions->thread]->produce(pack); mPackReadCounter++; - mBackpressureCV.notify_all(); + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); //re-initialize data for next pack data = new Read*[PACK_SIZE]; memset(data, 0, sizeof(Read*)*PACK_SIZE); - // if the processor is far behind this reader, sleep and wait to limit memory usage - { - std::unique_lock lk(mBackpressureMtx); - while( mPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ - slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); - } + // if the processor is far behind this reader, wait to limit memory usage + while(mPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ + uint32_t cur = mPackProcessedCounter.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mPackProcessedCounter); + slept++; } readNum += count; - // if the writer threads are far behind this reader, sleep and wait - // check this only when necessary - if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) { - std::unique_lock lk(mBackpressureMtx); - while(mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) { - slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); - } - } + // NOTE: writer-buffer backpressure (waitForBufferBelow) removed — + // it formed a circular wait with round-robin writer consumption and + // deadlocked under high thread counts. Pack-level backpressure + // (mPackReadCounter - mPackProcessedCounter above) still bounds + // in-flight memory. // reset count to 0 count = 0; // re-evaluate split size @@ -418,6 +416,10 @@ void SingleEndProcessor::readerTask() for(int t=0; tthread; t++) mInputLists[t]->setProducerFinished(); + // Wake any worker that snapshot mPackProducedCounter just before the + // setProducerFinished() above — without a counter bump they would miss it. + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); //std::unique_lock lock(mRepo.readCounterMtx); mReaderFinished.store(true, std::memory_order_release); @@ -451,13 +453,13 @@ void SingleEndProcessor::processorTask(ThreadConfig* config) break; } } else { - std::unique_lock lk(mBackpressureMtx); - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); + uint32_t cur = mPackProducedCounter.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mPackProducedCounter); } } input->setConsumerFinished(); - if(mFinishedThreads.fetch_add(1, std::memory_order_release) + 1 == mOptions->thread) { + if(mWorkersLatch.fetch_sub(1, std::memory_order_release) - 1 == 0) { if(mLeftWriter) mLeftWriter->setInputCompleted(); if(mFailedWriter) diff --git a/src/seprocessor.h b/src/seprocessor.h index b3c7152..7379483 100644 --- a/src/seprocessor.h +++ b/src/seprocessor.h @@ -6,8 +6,6 @@ #include #include "read.h" #include -#include -#include #include #include "options.h" #include "threadconfig.h" @@ -17,6 +15,7 @@ #include "duplicate.h" #include "singleproducersingleconsumerlist.h" #include "readpool.h" +#include using namespace std; @@ -41,7 +40,7 @@ class SingleEndProcessor{ private: Options* mOptions; atomic_bool mReaderFinished; - alignas(128) atomic_int mFinishedThreads; + std::atomic mWorkersLatch; Filter* mFilter; UmiProcessor* mUmiProcessor; WriterThread* mLeftWriter; @@ -49,10 +48,9 @@ class SingleEndProcessor{ Duplicate* mDuplicate; SingleProducerSingleConsumerList** mInputLists; size_t mPackReadCounter; - alignas(128) atomic_long mPackProcessedCounter; + alignas(128) std::atomic mPackProcessedCounter; + alignas(128) std::atomic mPackProducedCounter; ReadPool* mReadPool; - std::mutex mBackpressureMtx; - std::condition_variable mBackpressureCV; }; diff --git a/src/singleproducersingleconsumerlist.h b/src/singleproducersingleconsumerlist.h index 5bb16fe..5f19852 100644 --- a/src/singleproducersingleconsumerlist.h +++ b/src/singleproducersingleconsumerlist.h @@ -63,7 +63,7 @@ template class SingleProducerSingleConsumerList { public: inline SingleProducerSingleConsumerList() { - head = NULL; + head.store(NULL, std::memory_order_relaxed); tail = NULL; producerFinished = false; consumerFinished = false; @@ -87,39 +87,47 @@ class SingleProducerSingleConsumerList { blocks = NULL; } inline size_t size() { - return produced - consumed; + return produced.load(std::memory_order_relaxed) - consumed.load(std::memory_order_relaxed); } inline bool isEmpty() const { return head == NULL; } inline bool canBeConsumed() { - if(head == NULL) + LockFreeListItem* h = head.load(std::memory_order_acquire); + if(h == NULL) return false; // `nextItemReady` is a publication barrier for `nextItem`. // The last node has no successor, so `nextItemReady` may remain false; // it must still be consumable to avoid writer stalls when many queues exist. - return head->nextItemReady.load(std::memory_order_acquire) || (head == tail) || producerFinished.load(std::memory_order_acquire); + return h->nextItemReady.load(std::memory_order_acquire) + || (h == tail) + || producerFinished.load(std::memory_order_acquire); } inline void produce(T val) { LockFreeListItem* item = makeItem(val); - if(head==NULL) { - head = item; + if(head.load(std::memory_order_relaxed) == NULL) { tail = item; + // Release store: publishing head to consumer thread. + // All writes to *item are ordered before this store. + head.store(item, std::memory_order_release); // Signal the first item is consumable (no predecessor to set this) - head->nextItemReady.store(true, std::memory_order_release); + item->nextItemReady.store(true, std::memory_order_release); } else { tail->nextItem = item; + // Release store: ensures nextItem write visible before nextItemReady flag. tail->nextItemReady.store(true, std::memory_order_release); tail = item; } - produced++; + produced.fetch_add(1, std::memory_order_relaxed); } inline T consume() { - assert(head != NULL); - T val = head->value; - head = head->nextItem; - consumed++; - if((consumed & 0xFFF) == 0) + LockFreeListItem* h = head.load(std::memory_order_acquire); + assert(h != NULL); + T val = h->value; + // Advance head; release so next canBeConsumed() acquire sees updated state. + head.store(h->nextItem, std::memory_order_release); + unsigned long _c = consumed.fetch_add(1, std::memory_order_relaxed) + 1; + if((_c & 0xFFF) == 0) recycle(); return val; } @@ -138,8 +146,9 @@ class SingleProducerSingleConsumerList { private: // blockized list inline LockFreeListItem* makeItem(T val) { - unsigned long blk = produced >> 12; - unsigned long idx = produced & 0xFFF; + unsigned long _p = produced.load(std::memory_order_relaxed); + unsigned long blk = _p >> 12; + unsigned long idx = _p & 0xFFF; size_t size = 0x01<<12; if(blocksNum <= blk) { LockFreeListItem* buffer = new LockFreeListItem[size]; @@ -153,7 +162,7 @@ class SingleProducerSingleConsumerList { } inline void recycle() { - unsigned long blk = consumed >> 12; + unsigned long blk = consumed.load(std::memory_order_relaxed) >> 12; while((recycled+1) < blk) { delete[] blocks[recycled & blocksRingBufferSizeMask]; blocks[recycled & blocksRingBufferSizeMask] = NULL; @@ -162,13 +171,13 @@ class SingleProducerSingleConsumerList { } private: - LockFreeListItem* head; - LockFreeListItem* tail; + std::atomic*> head; + LockFreeListItem* tail; // tail is producer-private, no atomic needed LockFreeListItem** blocks; std::atomic_bool producerFinished; std::atomic_bool consumerFinished; - unsigned long produced; - unsigned long consumed; + std::atomic produced; + std::atomic consumed; unsigned long recycled; unsigned long blocksRingBufferSize; unsigned long blocksRingBufferSizeMask; diff --git a/src/writerthread.cpp b/src/writerthread.cpp index 5d21091..2394247 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -6,12 +6,11 @@ #include #include #include -#include WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mOptions = opt; mWriter1 = NULL; - mInputCompleted = false; + mInputCompleted.store(false, std::memory_order_relaxed); mFilename = filename; mPwriteMode = !isSTDOUT && ends_with(filename, ".gz") && mOptions->thread > 1; @@ -28,9 +27,9 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ if (mFd < 0) error_exit("Failed to open for pwrite: " + mFilename); mOffsetRing = new OffsetSlot[OFFSET_RING_SIZE]; - mNextSeq = new size_t[mOptions->thread]; + mNextSeq = new std::atomic[mOptions->thread]; for (int t = 0; t < mOptions->thread; t++) - mNextSeq[t] = t; + mNextSeq[t].store(t, std::memory_order_relaxed); mCompressors = new libdeflate_compressor*[mOptions->thread]; for (int t = 0; t < mOptions->thread; t++) mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression); @@ -43,11 +42,13 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ } mWorkingBufferList = 0; mBufferLength = 0; + mWriterNotify = 0; } else { initWriter(filename, isSTDOUT); initBufferLists(); mWorkingBufferList = 0; mBufferLength = 0; + mWriterNotify = 0; } } @@ -58,29 +59,35 @@ WriterThread::~WriterThread() { bool WriterThread::isCompleted() { if (mPwriteMode) return true; // no writer thread needed - return mInputCompleted && (mBufferLength==0); + return mInputCompleted.load(std::memory_order_acquire) && (mBufferLength==0); } bool WriterThread::setInputCompleted() { if (mPwriteMode) { setInputCompletedPwrite(); - mInputCompleted = true; + mInputCompleted.store(true, std::memory_order_release); return true; } - mInputCompleted = true; + mInputCompleted.store(true, std::memory_order_release); for(int t=0; tthread; t++) { mBufferLists[t]->setProducerFinished(); } + // Wake the writer thread blocked in output() so it re-checks isCompleted() + mWriterNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mWriterNotify); return true; } void WriterThread::setInputCompletedPwrite() { + // Acquire fence: synchronize with the release stores in inputPwrite() + // so that all mNextSeq[t] writes from worker threads are visible here. + std::atomic_thread_fence(std::memory_order_acquire); int W = mOptions->thread; size_t lastSeq = 0; bool anyProcessed = false; for (int t = 0; t < W; t++) { - if (mNextSeq[t] != (size_t)t) { - size_t workerLastSeq = mNextSeq[t] - W; + if (mNextSeq[t].load(std::memory_order_relaxed) != (size_t)t) { + size_t workerLastSeq = mNextSeq[t].load(std::memory_order_relaxed) - W; if (!anyProcessed || workerLastSeq > lastSeq) { lastSeq = workerLastSeq; anyProcessed = true; @@ -96,14 +103,24 @@ void WriterThread::output(){ if (mPwriteMode) return; // no-op SingleProducerSingleConsumerList* list = mBufferLists[mWorkingBufferList]; if(!list->canBeConsumed()) { - usleep(100); - } else { - string* str = list->consume(); - mWriter1->write(str->data(), str->length()); - delete str; - mBufferLength--; - mWorkingBufferList = (mWorkingBufferList+1)%mOptions->thread; + // Snapshot mWriterNotify BEFORE checking mInputCompleted to avoid a + // lost-wakeup race: if setInputCompleted() runs between the state check + // and BlockUntilDifferent, it bumps mWriterNotify once and no further + // bumps arrive. Capturing cur first guarantees the bump is observable + // (cur != current value), so BlockUntilDifferent returns immediately. + uint32_t cur = mWriterNotify.load(std::memory_order_acquire); + if(list->canBeConsumed()) return; // producer raced in; let outer loop consume + if(mInputCompleted.load(std::memory_order_acquire)) return; + hwy::BlockUntilDifferent(cur, mWriterNotify); + // After wake, return to writerTask loop which re-checks isCompleted() + return; } + string* str = list->consume(); + mWriter1->write(str->data(), str->length()); + delete str; + mBufferLength.fetch_sub(1, std::memory_order_release); + hwy::WakeAll(mBufferLength); + mWorkingBufferList = (mWorkingBufferList+1)%mOptions->thread; } void WriterThread::input(int tid, string* data) { @@ -112,7 +129,9 @@ void WriterThread::input(int tid, string* data) { return; } mBufferLists[tid]->produce(data); - mBufferLength++; + mBufferLength.fetch_add(1, std::memory_order_release); + mWriterNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mWriterNotify); } void WriterThread::inputPwrite(int tid, string* data) { @@ -131,15 +150,17 @@ void WriterThread::inputPwrite(int tid, string* data) { const char* writeData = mCompBufs[tid]; size_t wsize = outsize; - size_t seq = mNextSeq[tid]; + size_t seq = mNextSeq[tid].load(std::memory_order_relaxed); - // Wait for previous batch's cumulative offset. - // Sleep yields CPU to prevent livelock under contention. + // Wait for previous batch's cumulative offset using futex. size_t offset = 0; if (seq > 0) { size_t prevSlot = (seq - 1) & (OFFSET_RING_SIZE - 1); - while (mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) != seq - 1) { - std::this_thread::sleep_for(std::chrono::microseconds(1)); + uint32_t needGen = static_cast((seq - 1) / OFFSET_RING_SIZE + 1); + while (mOffsetRing[prevSlot].generation.load(std::memory_order_acquire) < needGen) { + uint32_t cur = mOffsetRing[prevSlot].generation.load(std::memory_order_acquire); + if (cur < needGen) + hwy::BlockUntilDifferent(cur, mOffsetRing[prevSlot].generation); } offset = mOffsetRing[prevSlot].cumulative_offset.load(std::memory_order_relaxed); } @@ -147,7 +168,8 @@ void WriterThread::inputPwrite(int tid, string* data) { // Publish offset BEFORE pwrite — next worker starts immediately size_t mySlot = seq & (OFFSET_RING_SIZE - 1); mOffsetRing[mySlot].cumulative_offset.store(offset + wsize, std::memory_order_relaxed); - mOffsetRing[mySlot].published_seq.store(seq, std::memory_order_release); + mOffsetRing[mySlot].generation.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mOffsetRing[mySlot].generation); // pwrite (concurrent with other workers on non-overlapping regions) if (wsize > 0) { @@ -164,7 +186,11 @@ void WriterThread::inputPwrite(int tid, string* data) { } } - mNextSeq[tid] += mOptions->thread; + // Release store: ensures the pwrite and cumulative_offset publication + // happen-before the acquire fence in setInputCompletedPwrite(). + mNextSeq[tid].store( + mNextSeq[tid].load(std::memory_order_relaxed) + mOptions->thread, + std::memory_order_release); } void WriterThread::cleanup() { diff --git a/src/writerthread.h b/src/writerthread.h index 053d27f..ba4d4e2 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -8,7 +8,7 @@ #include "writer.h" #include "options.h" #include -#include +#include "hwy/contrib/thread_pool/futex.h" #include #include "singleproducersingleconsumerlist.h" @@ -18,7 +18,7 @@ static constexpr int OFFSET_RING_SIZE = 512; struct alignas(64) OffsetSlot { std::atomic cumulative_offset{0}; - std::atomic published_seq{SIZE_MAX}; + std::atomic generation{0}; // bumped each time slot is published }; class WriterThread{ @@ -36,7 +36,14 @@ class WriterThread{ void input(int tid, string* data); bool setInputCompleted(); - long bufferLength() {return mBufferLength;}; + uint32_t bufferLength() {return mBufferLength;}; + void waitForBufferBelow(uint32_t limit) { + for(;;) { + uint32_t cur = mBufferLength.load(std::memory_order_acquire); + if(cur <= limit) break; + hwy::BlockUntilDifferent(cur, mBufferLength); + } + } string getFilename() {return mFilename;} bool isPwriteMode() {return mPwriteMode;} @@ -50,8 +57,9 @@ class WriterThread{ Options* mOptions; string mFilename; - bool mInputCompleted; - atomic_long mBufferLength; + std::atomic mInputCompleted; + std::atomic mBufferLength; + std::atomic mWriterNotify; // incremented to wake writer thread SingleProducerSingleConsumerList** mBufferLists; int mWorkingBufferList; @@ -59,7 +67,7 @@ class WriterThread{ bool mPwriteMode; int mFd; OffsetSlot* mOffsetRing; - size_t* mNextSeq; + std::atomic* mNextSeq; libdeflate_compressor** mCompressors; char** mCompBufs; // per-worker pre-allocated compress output buffers size_t* mCompBufSizes; // per-worker buffer sizes