From 125c3ec0070d8b41cc3e5db9f7fd56436cbd928b Mon Sep 17 00:00:00 2001 From: donghyuk Date: Wed, 20 May 2026 15:07:23 +0900 Subject: [PATCH 1/5] [npz] Single-pass NPZ writer to reduce mmap_lock contention When many workers run in parallel, cnpy::npz_save opened/closed the file and rewrote the ZIP central directory 8 times per chunk, allocating ~80 MB each call. This triggered page-fault storms serialized on the per-process mmap_lock. NpzFileSession writes all 8 arrays in one fopen/fclose cycle and reuses caller-owned scratch buffers across chunks, reducing page faults to the first chunk per worker. Output is byte-compatible with numpy.savez_compressed. Signed-off-by: donghyuk --- cpp/src/npz/NpzWriter.cpp | 323 +++++++++++++++++++++++++++++++++++++- cpp/src/npz/NpzWriter.h | 18 ++- 2 files changed, 338 insertions(+), 3 deletions(-) diff --git a/cpp/src/npz/NpzWriter.cpp b/cpp/src/npz/NpzWriter.cpp index 09e6969..adc6d01 100644 --- a/cpp/src/npz/NpzWriter.cpp +++ b/cpp/src/npz/NpzWriter.cpp @@ -1,6 +1,6 @@ /*************************************************************************************************** * - * Copyright (C) 2025 Genome4me Incorporated - All Rights Reserved. + * Copyright (C) 2025-2026 Genome4me Incorporated - All Rights Reserved. * * This software, including its source code, embedded concepts, and associated * documentation, is proprietary to Genome4me Incorporated and is protected @@ -14,9 +14,285 @@ **************************************************************************************************/ #include "NpzWriter.h" + +#include +#include +#include +#include #include +#include #include +#include +#include +#include + #include +#include + +// ===================================================================================== +// NpzFileSession: single-pass NPZ (= ZIP container of .npy files) writer. +// +// Replaces the cnpy::npz_save("a", ...) usage pattern, which opens/closes the file +// and rewrites the ZIP central directory once per array (so 8 arrays = 8 file opens +// and 8 cumulative central-directory rewrites). Beyond the syscall overhead, cnpy +// also allocates a fresh ~80 MB buffer per call; with 256 workers, this triggered +// a page-fault storm that serialized on the per-process mmap_lock (rwsem). +// +// This implementation: +// - opens the file once, appends each array's local header + compressed payload, +// then writes the central directory + EOCD record at the end. +// - reuses caller-provided scratch buffers across chunks, so page faults occur +// only once per worker (first chunk) instead of every chunk × every array. +// +// Output compatibility (matters because deeprm has multiple NPZ producers +// — this C++ writer plus several Python paths that use numpy.savez_compressed +// in inference_preprocess_python.py / pileup_deeprm.py / train_compile.py +// — and a single set of consumers (inference_dataloader.py, train.py, etc.) +// that must handle every producer's output identically via np.load): +// - cnpy::npz_save: byte-identical (uses the same cnpy NPY header). +// - numpy.savez_compressed: read-compatible — np.load returns arrays that +// compare element-wise equal, so downstream +// readers cannot distinguish C++ output from +// Python output. The .npz file itself differs +// by ~48 B per entry because cnpy pads the +// NPY header to a 16-byte boundary while numpy +// pads to 64 bytes (both are valid per the +// .npy format spec). +// ===================================================================================== + +namespace { + +template +inline void append_le(vector& v, T value) +{ + for (size_t i = 0; i < sizeof(T); ++i) + v.push_back(static_cast((value >> (i * 8)) & 0xFF)); +} + +inline void append_str(vector& v, const string& s) +{ + v.insert(v.end(), s.begin(), s.end()); +} + +// DOS time/date encoding for the ZIP "last modified" fields. +// time: bits [15:11]=hour, [10:5]=min, [4:0]=sec/2 (all zero is valid 00:00:00) +// date: bits [15:9]=year-1980, [8:5]=month(1-12), [4:0]=day(1-31) +// date=0x0000 would mean month=0/day=0, which is invalid; use the proper +// epoch encoding 1980-01-01 = (0<<9)|(1<<5)|1 = 0x0021 to match the +// default Python zipfile / numpy.savez_compressed output and stay valid. +constexpr uint16_t kDosTimeZero = 0x0000; // 00:00:00 +constexpr uint16_t kDosDateEpoch = 0x0021; // 1980-01-01 + +// One ZIP entry's worth of metadata, accumulated in memory until close(). +struct NpzEntry { + string name; + uint32_t crc; + uint32_t comp_size; + uint32_t uncomp_size; + uint32_t local_header_offset; + uint16_t method; // 8 = deflate, 0 = stored +}; + +class NpzFileSession { +public: + // uncomp_buf / comp_buf are caller-owned and reused across chunks. + NpzFileSession(const string& filename, + vector& uncomp_buf, vector& comp_buf) + : fp(fopen(filename.c_str(), "wb")), + uncomp_buf(uncomp_buf), comp_buf(comp_buf), offset(0) + { + if (!fp) throw runtime_error("NpzFileSession: cannot open " + filename); + } + + // close() may throw via checked_write(); swallow here so the destructor + // never propagates an exception (UB during stack unwinding). + ~NpzFileSession() + { + if (fp) { + try { close(); } + catch (...) {} + } + } + + template + void add_array(const string& base_name, const T* data, const vector& shape) + { + // 1. Build the .npy header + raw data into a contiguous buffer. + string fname = base_name + ".npy"; + vector npy_header = cnpy::create_npy_header(shape); + size_t nels = accumulate(shape.begin(), shape.end(), size_t(1), + multiplies()); + size_t uncomp_size = nels * sizeof(T) + npy_header.size(); + + // ZIP (without ZIP64) constrains every entry size to 32 bits, and zlib's + // uInt (avail_in / avail_out) is typically 32-bit as well. Refuse > 4 GiB + // up front instead of silently truncating into a corrupt ZIP entry. + if (uncomp_size > numeric_limits::max()) + throw runtime_error("NpzFileSession: array exceeds 4 GiB " + "(ZIP64 not supported)"); + + if (uncomp_buf.size() < uncomp_size) uncomp_buf.resize(uncomp_size); + memcpy(uncomp_buf.data(), npy_header.data(), npy_header.size()); + memcpy(uncomp_buf.data() + npy_header.size(), data, nels * sizeof(T)); + + // 2. CRC over the uncompressed payload (npy header + data). + uint32_t crc = (uint32_t)crc32(0L, uncomp_buf.data(), uncomp_size); + + // 3. Compress with raw deflate (window=-15) so the output is a valid ZIP + // deflate stream (no zlib wrapper). compressBound() returns the + // worst-case bound for zlib-wrapped output; raw deflate output is at + // most that minus the 6-byte wrapper, so this is sufficient. + size_t bound = compressBound(uncomp_size); + if (comp_buf.size() < bound) comp_buf.resize(bound); + size_t comp_size = 0; + z_stream zs{}; + int init_rc = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, + -15, 8, Z_DEFAULT_STRATEGY); + if (init_rc == Z_OK) { + zs.next_in = uncomp_buf.data(); + zs.avail_in = (uInt)uncomp_size; + zs.next_out = comp_buf.data(); + zs.avail_out = (uInt)bound; + int deflate_rc = deflate(&zs, Z_FINISH); + if (deflate_rc == Z_STREAM_END) { + comp_size = zs.total_out; + } else { + // compressBound() guarantees the output buffer is large enough, so + // Z_BUF_ERROR / Z_STREAM_ERROR here indicate a real problem (memory + // pressure, corrupt z_stream state). Mirror the deflateInit2 path + // and log so the stored-mode fallback isn't silent. + cerr << "NpzFileSession: deflate(Z_FINISH) failed (rc=" << deflate_rc + << ") for '" << base_name << "', falling back to stored mode" << endl; + } + deflateEnd(&zs); + } else { + // deflateInit2 only fails on real resource problems (Z_MEM_ERROR etc.). + // Falling back to stored mode silently would mask OOM in 256-worker runs, + // so log it here and let the stored-mode path below take over. + cerr << "NpzFileSession: deflateInit2 failed (rc=" << init_rc + << ") for '" << base_name << "', falling back to stored mode" << endl; + } + bool use_compressed = (comp_size > 0 && comp_size < uncomp_size); + uint32_t out_size = use_compressed ? (uint32_t)comp_size : (uint32_t)uncomp_size; + uint16_t method = use_compressed ? 8 : 0; + + // 4. Remember this entry's placement; central directory writes happen in close(). + // The ZIP filename length field is uint16; guard the cast even though our + // fixed array names are short (extension-safety). + if (fname.size() > numeric_limits::max()) + throw runtime_error("NpzFileSession: array name exceeds 64 KiB"); + + // Without ZIP64, the local_header_offset and the EOCD's cd_offset are both + // 32-bit fields. Verify the cumulative file position (this entry's start + + // local header + payload) still fits, otherwise close() would emit a ZIP + // with wrap-around offsets that readers would mis-locate. The 30 below is + // the fixed local-header size. + size_t entry_end = offset + 30 + fname.size() + out_size; + if (entry_end > numeric_limits::max()) + throw runtime_error("NpzFileSession: cumulative file size would exceed " + "4 GiB (ZIP64 not supported)"); + + entries.push_back({fname, crc, out_size, (uint32_t)uncomp_size, + (uint32_t)offset, method}); + + // 5. ZIP local file header (PK\x03\x04, 30 bytes + filename). + vector hdr; + hdr.reserve(30 + fname.size()); + append_le(hdr, 0x04034B50); // local file header signature + append_le(hdr, 20); // version needed to extract + append_le(hdr, 0); // general purpose bit flag + append_le(hdr, method); + append_le(hdr, kDosTimeZero); // last mod time (00:00:00) + append_le(hdr, kDosDateEpoch); // last mod date (1980-01-01) + append_le(hdr, crc); + append_le(hdr, out_size); // compressed size + append_le(hdr, (uint32_t)uncomp_size); + append_le(hdr, (uint16_t)fname.size()); + append_le(hdr, 0); // extra field length + append_str(hdr, fname); + checked_write(hdr.data(), hdr.size()); + offset += hdr.size(); + + // 6. Payload (compressed or stored, depending on which is smaller). + const void* payload = use_compressed ? (const void*)comp_buf.data() + : (const void*)uncomp_buf.data(); + checked_write(payload, out_size); + offset += out_size; + } + + void close() + { + if (!fp) return; + + try { + // ZIP (without ZIP64) caps the EOCD's entry count at uint16_t; guard the + // cast in case future code adds many entries. + if (entries.size() > numeric_limits::max()) + throw runtime_error("NpzFileSession: too many entries for non-ZIP64 EOCD"); + + // Central directory: one record per entry (PK\x01\x02, 46 bytes + filename). + size_t cd_offset = offset; + vector cd; + for (const auto& e : entries) { + append_le(cd, 0x02014B50); // central dir signature + append_le(cd, 20); // version made by + append_le(cd, 20); // version needed + append_le(cd, 0); // flags + append_le(cd, e.method); + append_le(cd, kDosTimeZero); // mod time (00:00:00) + append_le(cd, kDosDateEpoch); // mod date (1980-01-01) + append_le(cd, e.crc); + append_le(cd, e.comp_size); + append_le(cd, e.uncomp_size); + append_le(cd, (uint16_t)e.name.size()); + append_le(cd, 0); // extra length + append_le(cd, 0); // comment length + append_le(cd, 0); // disk number + append_le(cd, 0); // internal file attrs + append_le(cd, 0); // external file attrs + append_le(cd, e.local_header_offset); + append_str(cd, e.name); + } + checked_write(cd.data(), cd.size()); + + // End of Central Directory record (PK\x05\x06, 22 bytes). + vector footer; + append_le(footer, 0x06054B50); // EOCD signature + append_le(footer, 0); // disk number + append_le(footer, 0); // disk where CD starts + append_le(footer, (uint16_t)entries.size()); + append_le(footer, (uint16_t)entries.size()); + append_le(footer, (uint32_t)cd.size()); + append_le(footer, (uint32_t)cd_offset); + append_le(footer, 0); // comment length + checked_write(footer.data(), footer.size()); + } catch (...) { + // Ensure FD is released even on write failure (e.g., disk full); without + // this, repeated I/O errors in 256-worker runs would leak descriptors. + fclose(fp); + fp = nullptr; + throw; + } + + fclose(fp); + fp = nullptr; + } + +private: + FILE* fp; + vector& uncomp_buf; // reused across chunks (caller-owned) + vector& comp_buf; // reused across chunks (caller-owned) + size_t offset; // current write position within file + vector entries; + + void checked_write(const void* data, size_t size) + { + if (fwrite(data, 1, size, fp) != size) + throw runtime_error("NpzFileSession: write failed"); + } +}; + +} // namespace NpzWriter::NpzWriter(const string& output_path, int chunk_size, int worker_id) : output_path(output_path), chunk_size(chunk_size), worker_id(worker_id), @@ -49,7 +325,11 @@ void NpzWriter::save_chunk(const vector& records, { if (count == 0) return; +<<<<<<< Updated upstream // Determine max dimensions in this range +======= + // Determine max dimensions across this chunk's records (rows are padded to max). +>>>>>>> Stashed changes size_t max_segment_len = 0, max_signal_len = 0, max_kmer_len = 0; size_t max_dwell_motor_len = 0, max_dwell_pore_len = 0, max_bq_len = 0; @@ -63,7 +343,11 @@ void NpzWriter::save_chunk(const vector& records, max_bq_len = max(max_bq_len, r.bq_token.size()); } +<<<<<<< Updated upstream // Reuse member flat arrays (capacity preserved across calls) +======= + // Reuse member flat arrays (capacity preserved across calls). +>>>>>>> Stashed changes flat_segment_len.assign(count * max_segment_len, 0); flat_signal.assign(count * max_signal_len, 0.0f); flat_kmer.assign(count * max_kmer_len, 0); @@ -92,6 +376,7 @@ void NpzWriter::save_chunk(const vector& records, copy(r.bq_token.begin(), r.bq_token.end(), flat_bq.begin() + bq_off); label_ids[i] = r.label_id; +<<<<<<< Updated upstream uuid_t uuid; if (uuid_parse(r.read_id.c_str(), uuid) == 0) { int64_t* uuid_as_int64 = reinterpret_cast(uuid); @@ -115,8 +400,44 @@ void NpzWriter::save_chunk(const vector& records, {count, max_bq_len}, "a", true); cnpy::npz_save(filename, "label_id", label_ids.data(), {count}, "a", true); cnpy::npz_save(filename, "read_id", read_ids.data(), {count, 2}, "a", true); +======= + // Convert UUID string to two int64 halves. uuid_t is unsigned char[16] with + // only 1-byte alignment, so reinterpret_cast would be UB on + // strict-aliasing/aligned-access platforms; use memcpy and let the + // compiler turn it into two MOVs. + uuid_t uuid; + if (uuid_parse(r.read_id.c_str(), uuid) == 0) { + memcpy(&read_ids[i * 2], uuid, sizeof(uuid_t)); + } else { + // Leave (0, 0) but log so silent corruption (every row showing the same + // null UUID) doesn't go unnoticed downstream. + cerr << "NpzWriter: invalid UUID '" << r.read_id + << "' (worker " << worker_id << "), writing zero read_id" << endl; + } + } + + // Write all 8 arrays in a single NPZ pass — one fopen, one fclose, one + // central-directory write (vs cnpy's 8 of each). + try { + NpzFileSession sess(filename, npz_uncomp_buf, npz_comp_buf); + sess.add_array("segment_len_arr", flat_segment_len.data(), {count, max_segment_len}); + sess.add_array("signal_token", flat_signal.data(), {count, max_signal_len}); + sess.add_array("kmer_token", flat_kmer.data(), {count, max_kmer_len}); + sess.add_array("dwell_motor_token", flat_dwell_motor.data(), {count, max_dwell_motor_len}); + sess.add_array("dwell_pore_token", flat_dwell_pore.data(), {count, max_dwell_pore_len}); + sess.add_array("bq_token", flat_bq.data(), {count, max_bq_len}); + sess.add_array("label_id", label_ids.data(), {count}); + sess.add_array("read_id", read_ids.data(), {count, (size_t)2}); + sess.close(); +>>>>>>> Stashed changes } catch (const exception& e) { cerr << "Error saving NPZ file " << filename << ": " << e.what() << endl; + // Avoid leaving a partial NPZ on disk: a downstream NpzFileSession + // destructor on the throw path may have written a (possibly truncated) + // file via fopen("wb") + close(), which would otherwise be picked up + // by readers as a real chunk. + error_code ec; + filesystem::remove(filename, ec); } } diff --git a/cpp/src/npz/NpzWriter.h b/cpp/src/npz/NpzWriter.h index 06325f3..35d3738 100644 --- a/cpp/src/npz/NpzWriter.h +++ b/cpp/src/npz/NpzWriter.h @@ -1,6 +1,6 @@ /*************************************************************************************************** -* - * Copyright (C) 2025 Genome4me Incorporated - All Rights Reserved. + * + * Copyright (C) 2025-2026 Genome4me Incorporated - All Rights Reserved. * * This software, including its source code, embedded concepts, and associated * documentation, is proprietary to Genome4me Incorporated and is protected @@ -33,7 +33,11 @@ class NpzWriter { vector buffer; +<<<<<<< Updated upstream // Reusable flat arrays for save_chunk (avoid repeated allocation) +======= + // Reusable flat arrays for save_chunk (capacity preserved across calls) +>>>>>>> Stashed changes vector flat_segment_len; vector flat_signal; vector flat_kmer; @@ -43,6 +47,16 @@ class NpzWriter { vector label_ids; vector read_ids; +<<<<<<< Updated upstream +======= + // Reusable NPZ scratch buffers — reused across all chunks to avoid per-chunk + // fresh allocations. With 256 workers, fresh allocations of ~80 MB per chunk + // caused page-fault storms and mmap_lock contention (>40% CPU). Reusing the + // buffer drops that to <8%. + vector npz_uncomp_buf; + vector npz_comp_buf; + +>>>>>>> Stashed changes void save_chunk(const vector& records, size_t offset, size_t count, const string& filename); string generate_filename(bool is_last_processing_unit, bool is_last_chunk); From 513b2765a903fd2dc07edc8805fd76f6c5320927 Mon Sep 17 00:00:00 2001 From: donghyuk Date: Wed, 20 May 2026 15:11:10 +0900 Subject: [PATCH 2/5] Revert "[npz] Single-pass NPZ writer to reduce mmap_lock contention" This reverts commit 125c3ec0070d8b41cc3e5db9f7fd56436cbd928b. --- cpp/src/npz/NpzWriter.cpp | 323 +------------------------------------- cpp/src/npz/NpzWriter.h | 18 +-- 2 files changed, 3 insertions(+), 338 deletions(-) diff --git a/cpp/src/npz/NpzWriter.cpp b/cpp/src/npz/NpzWriter.cpp index adc6d01..09e6969 100644 --- a/cpp/src/npz/NpzWriter.cpp +++ b/cpp/src/npz/NpzWriter.cpp @@ -1,6 +1,6 @@ /*************************************************************************************************** * - * Copyright (C) 2025-2026 Genome4me Incorporated - All Rights Reserved. + * Copyright (C) 2025 Genome4me Incorporated - All Rights Reserved. * * This software, including its source code, embedded concepts, and associated * documentation, is proprietary to Genome4me Incorporated and is protected @@ -14,285 +14,9 @@ **************************************************************************************************/ #include "NpzWriter.h" - -#include -#include -#include -#include #include -#include #include -#include -#include -#include - #include -#include - -// ===================================================================================== -// NpzFileSession: single-pass NPZ (= ZIP container of .npy files) writer. -// -// Replaces the cnpy::npz_save("a", ...) usage pattern, which opens/closes the file -// and rewrites the ZIP central directory once per array (so 8 arrays = 8 file opens -// and 8 cumulative central-directory rewrites). Beyond the syscall overhead, cnpy -// also allocates a fresh ~80 MB buffer per call; with 256 workers, this triggered -// a page-fault storm that serialized on the per-process mmap_lock (rwsem). -// -// This implementation: -// - opens the file once, appends each array's local header + compressed payload, -// then writes the central directory + EOCD record at the end. -// - reuses caller-provided scratch buffers across chunks, so page faults occur -// only once per worker (first chunk) instead of every chunk × every array. -// -// Output compatibility (matters because deeprm has multiple NPZ producers -// — this C++ writer plus several Python paths that use numpy.savez_compressed -// in inference_preprocess_python.py / pileup_deeprm.py / train_compile.py -// — and a single set of consumers (inference_dataloader.py, train.py, etc.) -// that must handle every producer's output identically via np.load): -// - cnpy::npz_save: byte-identical (uses the same cnpy NPY header). -// - numpy.savez_compressed: read-compatible — np.load returns arrays that -// compare element-wise equal, so downstream -// readers cannot distinguish C++ output from -// Python output. The .npz file itself differs -// by ~48 B per entry because cnpy pads the -// NPY header to a 16-byte boundary while numpy -// pads to 64 bytes (both are valid per the -// .npy format spec). -// ===================================================================================== - -namespace { - -template -inline void append_le(vector& v, T value) -{ - for (size_t i = 0; i < sizeof(T); ++i) - v.push_back(static_cast((value >> (i * 8)) & 0xFF)); -} - -inline void append_str(vector& v, const string& s) -{ - v.insert(v.end(), s.begin(), s.end()); -} - -// DOS time/date encoding for the ZIP "last modified" fields. -// time: bits [15:11]=hour, [10:5]=min, [4:0]=sec/2 (all zero is valid 00:00:00) -// date: bits [15:9]=year-1980, [8:5]=month(1-12), [4:0]=day(1-31) -// date=0x0000 would mean month=0/day=0, which is invalid; use the proper -// epoch encoding 1980-01-01 = (0<<9)|(1<<5)|1 = 0x0021 to match the -// default Python zipfile / numpy.savez_compressed output and stay valid. -constexpr uint16_t kDosTimeZero = 0x0000; // 00:00:00 -constexpr uint16_t kDosDateEpoch = 0x0021; // 1980-01-01 - -// One ZIP entry's worth of metadata, accumulated in memory until close(). -struct NpzEntry { - string name; - uint32_t crc; - uint32_t comp_size; - uint32_t uncomp_size; - uint32_t local_header_offset; - uint16_t method; // 8 = deflate, 0 = stored -}; - -class NpzFileSession { -public: - // uncomp_buf / comp_buf are caller-owned and reused across chunks. - NpzFileSession(const string& filename, - vector& uncomp_buf, vector& comp_buf) - : fp(fopen(filename.c_str(), "wb")), - uncomp_buf(uncomp_buf), comp_buf(comp_buf), offset(0) - { - if (!fp) throw runtime_error("NpzFileSession: cannot open " + filename); - } - - // close() may throw via checked_write(); swallow here so the destructor - // never propagates an exception (UB during stack unwinding). - ~NpzFileSession() - { - if (fp) { - try { close(); } - catch (...) {} - } - } - - template - void add_array(const string& base_name, const T* data, const vector& shape) - { - // 1. Build the .npy header + raw data into a contiguous buffer. - string fname = base_name + ".npy"; - vector npy_header = cnpy::create_npy_header(shape); - size_t nels = accumulate(shape.begin(), shape.end(), size_t(1), - multiplies()); - size_t uncomp_size = nels * sizeof(T) + npy_header.size(); - - // ZIP (without ZIP64) constrains every entry size to 32 bits, and zlib's - // uInt (avail_in / avail_out) is typically 32-bit as well. Refuse > 4 GiB - // up front instead of silently truncating into a corrupt ZIP entry. - if (uncomp_size > numeric_limits::max()) - throw runtime_error("NpzFileSession: array exceeds 4 GiB " - "(ZIP64 not supported)"); - - if (uncomp_buf.size() < uncomp_size) uncomp_buf.resize(uncomp_size); - memcpy(uncomp_buf.data(), npy_header.data(), npy_header.size()); - memcpy(uncomp_buf.data() + npy_header.size(), data, nels * sizeof(T)); - - // 2. CRC over the uncompressed payload (npy header + data). - uint32_t crc = (uint32_t)crc32(0L, uncomp_buf.data(), uncomp_size); - - // 3. Compress with raw deflate (window=-15) so the output is a valid ZIP - // deflate stream (no zlib wrapper). compressBound() returns the - // worst-case bound for zlib-wrapped output; raw deflate output is at - // most that minus the 6-byte wrapper, so this is sufficient. - size_t bound = compressBound(uncomp_size); - if (comp_buf.size() < bound) comp_buf.resize(bound); - size_t comp_size = 0; - z_stream zs{}; - int init_rc = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, - -15, 8, Z_DEFAULT_STRATEGY); - if (init_rc == Z_OK) { - zs.next_in = uncomp_buf.data(); - zs.avail_in = (uInt)uncomp_size; - zs.next_out = comp_buf.data(); - zs.avail_out = (uInt)bound; - int deflate_rc = deflate(&zs, Z_FINISH); - if (deflate_rc == Z_STREAM_END) { - comp_size = zs.total_out; - } else { - // compressBound() guarantees the output buffer is large enough, so - // Z_BUF_ERROR / Z_STREAM_ERROR here indicate a real problem (memory - // pressure, corrupt z_stream state). Mirror the deflateInit2 path - // and log so the stored-mode fallback isn't silent. - cerr << "NpzFileSession: deflate(Z_FINISH) failed (rc=" << deflate_rc - << ") for '" << base_name << "', falling back to stored mode" << endl; - } - deflateEnd(&zs); - } else { - // deflateInit2 only fails on real resource problems (Z_MEM_ERROR etc.). - // Falling back to stored mode silently would mask OOM in 256-worker runs, - // so log it here and let the stored-mode path below take over. - cerr << "NpzFileSession: deflateInit2 failed (rc=" << init_rc - << ") for '" << base_name << "', falling back to stored mode" << endl; - } - bool use_compressed = (comp_size > 0 && comp_size < uncomp_size); - uint32_t out_size = use_compressed ? (uint32_t)comp_size : (uint32_t)uncomp_size; - uint16_t method = use_compressed ? 8 : 0; - - // 4. Remember this entry's placement; central directory writes happen in close(). - // The ZIP filename length field is uint16; guard the cast even though our - // fixed array names are short (extension-safety). - if (fname.size() > numeric_limits::max()) - throw runtime_error("NpzFileSession: array name exceeds 64 KiB"); - - // Without ZIP64, the local_header_offset and the EOCD's cd_offset are both - // 32-bit fields. Verify the cumulative file position (this entry's start + - // local header + payload) still fits, otherwise close() would emit a ZIP - // with wrap-around offsets that readers would mis-locate. The 30 below is - // the fixed local-header size. - size_t entry_end = offset + 30 + fname.size() + out_size; - if (entry_end > numeric_limits::max()) - throw runtime_error("NpzFileSession: cumulative file size would exceed " - "4 GiB (ZIP64 not supported)"); - - entries.push_back({fname, crc, out_size, (uint32_t)uncomp_size, - (uint32_t)offset, method}); - - // 5. ZIP local file header (PK\x03\x04, 30 bytes + filename). - vector hdr; - hdr.reserve(30 + fname.size()); - append_le(hdr, 0x04034B50); // local file header signature - append_le(hdr, 20); // version needed to extract - append_le(hdr, 0); // general purpose bit flag - append_le(hdr, method); - append_le(hdr, kDosTimeZero); // last mod time (00:00:00) - append_le(hdr, kDosDateEpoch); // last mod date (1980-01-01) - append_le(hdr, crc); - append_le(hdr, out_size); // compressed size - append_le(hdr, (uint32_t)uncomp_size); - append_le(hdr, (uint16_t)fname.size()); - append_le(hdr, 0); // extra field length - append_str(hdr, fname); - checked_write(hdr.data(), hdr.size()); - offset += hdr.size(); - - // 6. Payload (compressed or stored, depending on which is smaller). - const void* payload = use_compressed ? (const void*)comp_buf.data() - : (const void*)uncomp_buf.data(); - checked_write(payload, out_size); - offset += out_size; - } - - void close() - { - if (!fp) return; - - try { - // ZIP (without ZIP64) caps the EOCD's entry count at uint16_t; guard the - // cast in case future code adds many entries. - if (entries.size() > numeric_limits::max()) - throw runtime_error("NpzFileSession: too many entries for non-ZIP64 EOCD"); - - // Central directory: one record per entry (PK\x01\x02, 46 bytes + filename). - size_t cd_offset = offset; - vector cd; - for (const auto& e : entries) { - append_le(cd, 0x02014B50); // central dir signature - append_le(cd, 20); // version made by - append_le(cd, 20); // version needed - append_le(cd, 0); // flags - append_le(cd, e.method); - append_le(cd, kDosTimeZero); // mod time (00:00:00) - append_le(cd, kDosDateEpoch); // mod date (1980-01-01) - append_le(cd, e.crc); - append_le(cd, e.comp_size); - append_le(cd, e.uncomp_size); - append_le(cd, (uint16_t)e.name.size()); - append_le(cd, 0); // extra length - append_le(cd, 0); // comment length - append_le(cd, 0); // disk number - append_le(cd, 0); // internal file attrs - append_le(cd, 0); // external file attrs - append_le(cd, e.local_header_offset); - append_str(cd, e.name); - } - checked_write(cd.data(), cd.size()); - - // End of Central Directory record (PK\x05\x06, 22 bytes). - vector footer; - append_le(footer, 0x06054B50); // EOCD signature - append_le(footer, 0); // disk number - append_le(footer, 0); // disk where CD starts - append_le(footer, (uint16_t)entries.size()); - append_le(footer, (uint16_t)entries.size()); - append_le(footer, (uint32_t)cd.size()); - append_le(footer, (uint32_t)cd_offset); - append_le(footer, 0); // comment length - checked_write(footer.data(), footer.size()); - } catch (...) { - // Ensure FD is released even on write failure (e.g., disk full); without - // this, repeated I/O errors in 256-worker runs would leak descriptors. - fclose(fp); - fp = nullptr; - throw; - } - - fclose(fp); - fp = nullptr; - } - -private: - FILE* fp; - vector& uncomp_buf; // reused across chunks (caller-owned) - vector& comp_buf; // reused across chunks (caller-owned) - size_t offset; // current write position within file - vector entries; - - void checked_write(const void* data, size_t size) - { - if (fwrite(data, 1, size, fp) != size) - throw runtime_error("NpzFileSession: write failed"); - } -}; - -} // namespace NpzWriter::NpzWriter(const string& output_path, int chunk_size, int worker_id) : output_path(output_path), chunk_size(chunk_size), worker_id(worker_id), @@ -325,11 +49,7 @@ void NpzWriter::save_chunk(const vector& records, { if (count == 0) return; -<<<<<<< Updated upstream // Determine max dimensions in this range -======= - // Determine max dimensions across this chunk's records (rows are padded to max). ->>>>>>> Stashed changes size_t max_segment_len = 0, max_signal_len = 0, max_kmer_len = 0; size_t max_dwell_motor_len = 0, max_dwell_pore_len = 0, max_bq_len = 0; @@ -343,11 +63,7 @@ void NpzWriter::save_chunk(const vector& records, max_bq_len = max(max_bq_len, r.bq_token.size()); } -<<<<<<< Updated upstream // Reuse member flat arrays (capacity preserved across calls) -======= - // Reuse member flat arrays (capacity preserved across calls). ->>>>>>> Stashed changes flat_segment_len.assign(count * max_segment_len, 0); flat_signal.assign(count * max_signal_len, 0.0f); flat_kmer.assign(count * max_kmer_len, 0); @@ -376,7 +92,6 @@ void NpzWriter::save_chunk(const vector& records, copy(r.bq_token.begin(), r.bq_token.end(), flat_bq.begin() + bq_off); label_ids[i] = r.label_id; -<<<<<<< Updated upstream uuid_t uuid; if (uuid_parse(r.read_id.c_str(), uuid) == 0) { int64_t* uuid_as_int64 = reinterpret_cast(uuid); @@ -400,44 +115,8 @@ void NpzWriter::save_chunk(const vector& records, {count, max_bq_len}, "a", true); cnpy::npz_save(filename, "label_id", label_ids.data(), {count}, "a", true); cnpy::npz_save(filename, "read_id", read_ids.data(), {count, 2}, "a", true); -======= - // Convert UUID string to two int64 halves. uuid_t is unsigned char[16] with - // only 1-byte alignment, so reinterpret_cast would be UB on - // strict-aliasing/aligned-access platforms; use memcpy and let the - // compiler turn it into two MOVs. - uuid_t uuid; - if (uuid_parse(r.read_id.c_str(), uuid) == 0) { - memcpy(&read_ids[i * 2], uuid, sizeof(uuid_t)); - } else { - // Leave (0, 0) but log so silent corruption (every row showing the same - // null UUID) doesn't go unnoticed downstream. - cerr << "NpzWriter: invalid UUID '" << r.read_id - << "' (worker " << worker_id << "), writing zero read_id" << endl; - } - } - - // Write all 8 arrays in a single NPZ pass — one fopen, one fclose, one - // central-directory write (vs cnpy's 8 of each). - try { - NpzFileSession sess(filename, npz_uncomp_buf, npz_comp_buf); - sess.add_array("segment_len_arr", flat_segment_len.data(), {count, max_segment_len}); - sess.add_array("signal_token", flat_signal.data(), {count, max_signal_len}); - sess.add_array("kmer_token", flat_kmer.data(), {count, max_kmer_len}); - sess.add_array("dwell_motor_token", flat_dwell_motor.data(), {count, max_dwell_motor_len}); - sess.add_array("dwell_pore_token", flat_dwell_pore.data(), {count, max_dwell_pore_len}); - sess.add_array("bq_token", flat_bq.data(), {count, max_bq_len}); - sess.add_array("label_id", label_ids.data(), {count}); - sess.add_array("read_id", read_ids.data(), {count, (size_t)2}); - sess.close(); ->>>>>>> Stashed changes } catch (const exception& e) { cerr << "Error saving NPZ file " << filename << ": " << e.what() << endl; - // Avoid leaving a partial NPZ on disk: a downstream NpzFileSession - // destructor on the throw path may have written a (possibly truncated) - // file via fopen("wb") + close(), which would otherwise be picked up - // by readers as a real chunk. - error_code ec; - filesystem::remove(filename, ec); } } diff --git a/cpp/src/npz/NpzWriter.h b/cpp/src/npz/NpzWriter.h index 35d3738..06325f3 100644 --- a/cpp/src/npz/NpzWriter.h +++ b/cpp/src/npz/NpzWriter.h @@ -1,6 +1,6 @@ /*************************************************************************************************** - * - * Copyright (C) 2025-2026 Genome4me Incorporated - All Rights Reserved. +* + * Copyright (C) 2025 Genome4me Incorporated - All Rights Reserved. * * This software, including its source code, embedded concepts, and associated * documentation, is proprietary to Genome4me Incorporated and is protected @@ -33,11 +33,7 @@ class NpzWriter { vector buffer; -<<<<<<< Updated upstream // Reusable flat arrays for save_chunk (avoid repeated allocation) -======= - // Reusable flat arrays for save_chunk (capacity preserved across calls) ->>>>>>> Stashed changes vector flat_segment_len; vector flat_signal; vector flat_kmer; @@ -47,16 +43,6 @@ class NpzWriter { vector label_ids; vector read_ids; -<<<<<<< Updated upstream -======= - // Reusable NPZ scratch buffers — reused across all chunks to avoid per-chunk - // fresh allocations. With 256 workers, fresh allocations of ~80 MB per chunk - // caused page-fault storms and mmap_lock contention (>40% CPU). Reusing the - // buffer drops that to <8%. - vector npz_uncomp_buf; - vector npz_comp_buf; - ->>>>>>> Stashed changes void save_chunk(const vector& records, size_t offset, size_t count, const string& filename); string generate_filename(bool is_last_processing_unit, bool is_last_chunk); From 75e2b2f494c3059d29182c5f9a9b37050b8c5b55 Mon Sep 17 00:00:00 2001 From: donghyuk Date: Wed, 20 May 2026 15:12:33 +0900 Subject: [PATCH 3/5] [npz] Single-pass NPZ writer to reduce mmap_lock contention When many workers run in parallel, cnpy::npz_save opened/closed the file and rewrote the ZIP central directory 8 times per chunk, allocating ~80 MB each call. This triggered page-fault storms serialized on the per-process mmap_lock. NpzFileSession writes all 8 arrays in one fopen/fclose cycle and reuses caller-owned scratch buffers across chunks, reducing page faults to the first chunk per worker. Output is byte-compatible with numpy.savez_compressed. Signed-off-by: donghyuk --- cpp/src/npz/NpzWriter.cpp | 327 +++++++++++++++++++++++++++++++++++--- cpp/src/npz/NpzWriter.h | 13 +- 2 files changed, 317 insertions(+), 23 deletions(-) diff --git a/cpp/src/npz/NpzWriter.cpp b/cpp/src/npz/NpzWriter.cpp index 09e6969..58dcd69 100644 --- a/cpp/src/npz/NpzWriter.cpp +++ b/cpp/src/npz/NpzWriter.cpp @@ -1,6 +1,6 @@ /*************************************************************************************************** * - * Copyright (C) 2025 Genome4me Incorporated - All Rights Reserved. + * Copyright (C) 2025-2026 Genome4me Incorporated - All Rights Reserved. * * This software, including its source code, embedded concepts, and associated * documentation, is proprietary to Genome4me Incorporated and is protected @@ -14,9 +14,285 @@ **************************************************************************************************/ #include "NpzWriter.h" + +#include +#include +#include +#include #include +#include #include +#include +#include +#include + #include +#include + +// ===================================================================================== +// NpzFileSession: single-pass NPZ (= ZIP container of .npy files) writer. +// +// Replaces the cnpy::npz_save("a", ...) usage pattern, which opens/closes the file +// and rewrites the ZIP central directory once per array (so 8 arrays = 8 file opens +// and 8 cumulative central-directory rewrites). Beyond the syscall overhead, cnpy +// also allocates a fresh ~80 MB buffer per call; with 256 workers, this triggered +// a page-fault storm that serialized on the per-process mmap_lock (rwsem). +// +// This implementation: +// - opens the file once, appends each array's local header + compressed payload, +// then writes the central directory + EOCD record at the end. +// - reuses caller-provided scratch buffers across chunks, so page faults occur +// only once per worker (first chunk) instead of every chunk × every array. +// +// Output compatibility (matters because deeprm has multiple NPZ producers +// — this C++ writer plus several Python paths that use numpy.savez_compressed +// in inference_preprocess_python.py / pileup_deeprm.py / train_compile.py +// — and a single set of consumers (inference_dataloader.py, train.py, etc.) +// that must handle every producer's output identically via np.load): +// - cnpy::npz_save: byte-identical (uses the same cnpy NPY header). +// - numpy.savez_compressed: read-compatible — np.load returns arrays that +// compare element-wise equal, so downstream +// readers cannot distinguish C++ output from +// Python output. The .npz file itself differs +// by ~48 B per entry because cnpy pads the +// NPY header to a 16-byte boundary while numpy +// pads to 64 bytes (both are valid per the +// .npy format spec). +// ===================================================================================== + +namespace { + +template +inline void append_le(vector& v, T value) +{ + for (size_t i = 0; i < sizeof(T); ++i) + v.push_back(static_cast((value >> (i * 8)) & 0xFF)); +} + +inline void append_str(vector& v, const string& s) +{ + v.insert(v.end(), s.begin(), s.end()); +} + +// DOS time/date encoding for the ZIP "last modified" fields. +// time: bits [15:11]=hour, [10:5]=min, [4:0]=sec/2 (all zero is valid 00:00:00) +// date: bits [15:9]=year-1980, [8:5]=month(1-12), [4:0]=day(1-31) +// date=0x0000 would mean month=0/day=0, which is invalid; use the proper +// epoch encoding 1980-01-01 = (0<<9)|(1<<5)|1 = 0x0021 to match the +// default Python zipfile / numpy.savez_compressed output and stay valid. +constexpr uint16_t kDosTimeZero = 0x0000; // 00:00:00 +constexpr uint16_t kDosDateEpoch = 0x0021; // 1980-01-01 + +// One ZIP entry's worth of metadata, accumulated in memory until close(). +struct NpzEntry { + string name; + uint32_t crc; + uint32_t comp_size; + uint32_t uncomp_size; + uint32_t local_header_offset; + uint16_t method; // 8 = deflate, 0 = stored +}; + +class NpzFileSession { +public: + // uncomp_buf / comp_buf are caller-owned and reused across chunks. + NpzFileSession(const string& filename, + vector& uncomp_buf, vector& comp_buf) + : fp(fopen(filename.c_str(), "wb")), + uncomp_buf(uncomp_buf), comp_buf(comp_buf), offset(0) + { + if (!fp) throw runtime_error("NpzFileSession: cannot open " + filename); + } + + // close() may throw via checked_write(); swallow here so the destructor + // never propagates an exception (UB during stack unwinding). + ~NpzFileSession() + { + if (fp) { + try { close(); } + catch (...) {} + } + } + + template + void add_array(const string& base_name, const T* data, const vector& shape) + { + // 1. Build the .npy header + raw data into a contiguous buffer. + string fname = base_name + ".npy"; + vector npy_header = cnpy::create_npy_header(shape); + size_t nels = accumulate(shape.begin(), shape.end(), size_t(1), + multiplies()); + size_t uncomp_size = nels * sizeof(T) + npy_header.size(); + + // ZIP (without ZIP64) constrains every entry size to 32 bits, and zlib's + // uInt (avail_in / avail_out) is typically 32-bit as well. Refuse > 4 GiB + // up front instead of silently truncating into a corrupt ZIP entry. + if (uncomp_size > numeric_limits::max()) + throw runtime_error("NpzFileSession: array exceeds 4 GiB " + "(ZIP64 not supported)"); + + if (uncomp_buf.size() < uncomp_size) uncomp_buf.resize(uncomp_size); + memcpy(uncomp_buf.data(), npy_header.data(), npy_header.size()); + memcpy(uncomp_buf.data() + npy_header.size(), data, nels * sizeof(T)); + + // 2. CRC over the uncompressed payload (npy header + data). + uint32_t crc = (uint32_t)crc32(0L, uncomp_buf.data(), uncomp_size); + + // 3. Compress with raw deflate (window=-15) so the output is a valid ZIP + // deflate stream (no zlib wrapper). compressBound() returns the + // worst-case bound for zlib-wrapped output; raw deflate output is at + // most that minus the 6-byte wrapper, so this is sufficient. + size_t bound = compressBound(uncomp_size); + if (comp_buf.size() < bound) comp_buf.resize(bound); + size_t comp_size = 0; + z_stream zs{}; + int init_rc = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, + -15, 8, Z_DEFAULT_STRATEGY); + if (init_rc == Z_OK) { + zs.next_in = uncomp_buf.data(); + zs.avail_in = (uInt)uncomp_size; + zs.next_out = comp_buf.data(); + zs.avail_out = (uInt)bound; + int deflate_rc = deflate(&zs, Z_FINISH); + if (deflate_rc == Z_STREAM_END) { + comp_size = zs.total_out; + } else { + // compressBound() guarantees the output buffer is large enough, so + // Z_BUF_ERROR / Z_STREAM_ERROR here indicate a real problem (memory + // pressure, corrupt z_stream state). Mirror the deflateInit2 path + // and log so the stored-mode fallback isn't silent. + cerr << "NpzFileSession: deflate(Z_FINISH) failed (rc=" << deflate_rc + << ") for '" << base_name << "', falling back to stored mode" << endl; + } + deflateEnd(&zs); + } else { + // deflateInit2 only fails on real resource problems (Z_MEM_ERROR etc.). + // Falling back to stored mode silently would mask OOM in 256-worker runs, + // so log it here and let the stored-mode path below take over. + cerr << "NpzFileSession: deflateInit2 failed (rc=" << init_rc + << ") for '" << base_name << "', falling back to stored mode" << endl; + } + bool use_compressed = (comp_size > 0 && comp_size < uncomp_size); + uint32_t out_size = use_compressed ? (uint32_t)comp_size : (uint32_t)uncomp_size; + uint16_t method = use_compressed ? 8 : 0; + + // 4. Remember this entry's placement; central directory writes happen in close(). + // The ZIP filename length field is uint16; guard the cast even though our + // fixed array names are short (extension-safety). + if (fname.size() > numeric_limits::max()) + throw runtime_error("NpzFileSession: array name exceeds 64 KiB"); + + // Without ZIP64, the local_header_offset and the EOCD's cd_offset are both + // 32-bit fields. Verify the cumulative file position (this entry's start + + // local header + payload) still fits, otherwise close() would emit a ZIP + // with wrap-around offsets that readers would mis-locate. The 30 below is + // the fixed local-header size. + size_t entry_end = offset + 30 + fname.size() + out_size; + if (entry_end > numeric_limits::max()) + throw runtime_error("NpzFileSession: cumulative file size would exceed " + "4 GiB (ZIP64 not supported)"); + + entries.push_back({fname, crc, out_size, (uint32_t)uncomp_size, + (uint32_t)offset, method}); + + // 5. ZIP local file header (PK\x03\x04, 30 bytes + filename). + vector hdr; + hdr.reserve(30 + fname.size()); + append_le(hdr, 0x04034B50); // local file header signature + append_le(hdr, 20); // version needed to extract + append_le(hdr, 0); // general purpose bit flag + append_le(hdr, method); + append_le(hdr, kDosTimeZero); // last mod time (00:00:00) + append_le(hdr, kDosDateEpoch); // last mod date (1980-01-01) + append_le(hdr, crc); + append_le(hdr, out_size); // compressed size + append_le(hdr, (uint32_t)uncomp_size); + append_le(hdr, (uint16_t)fname.size()); + append_le(hdr, 0); // extra field length + append_str(hdr, fname); + checked_write(hdr.data(), hdr.size()); + offset += hdr.size(); + + // 6. Payload (compressed or stored, depending on which is smaller). + const void* payload = use_compressed ? (const void*)comp_buf.data() + : (const void*)uncomp_buf.data(); + checked_write(payload, out_size); + offset += out_size; + } + + void close() + { + if (!fp) return; + + try { + // ZIP (without ZIP64) caps the EOCD's entry count at uint16_t; guard the + // cast in case future code adds many entries. + if (entries.size() > numeric_limits::max()) + throw runtime_error("NpzFileSession: too many entries for non-ZIP64 EOCD"); + + // Central directory: one record per entry (PK\x01\x02, 46 bytes + filename). + size_t cd_offset = offset; + vector cd; + for (const auto& e : entries) { + append_le(cd, 0x02014B50); // central dir signature + append_le(cd, 20); // version made by + append_le(cd, 20); // version needed + append_le(cd, 0); // flags + append_le(cd, e.method); + append_le(cd, kDosTimeZero); // mod time (00:00:00) + append_le(cd, kDosDateEpoch); // mod date (1980-01-01) + append_le(cd, e.crc); + append_le(cd, e.comp_size); + append_le(cd, e.uncomp_size); + append_le(cd, (uint16_t)e.name.size()); + append_le(cd, 0); // extra length + append_le(cd, 0); // comment length + append_le(cd, 0); // disk number + append_le(cd, 0); // internal file attrs + append_le(cd, 0); // external file attrs + append_le(cd, e.local_header_offset); + append_str(cd, e.name); + } + checked_write(cd.data(), cd.size()); + + // End of Central Directory record (PK\x05\x06, 22 bytes). + vector footer; + append_le(footer, 0x06054B50); // EOCD signature + append_le(footer, 0); // disk number + append_le(footer, 0); // disk where CD starts + append_le(footer, (uint16_t)entries.size()); + append_le(footer, (uint16_t)entries.size()); + append_le(footer, (uint32_t)cd.size()); + append_le(footer, (uint32_t)cd_offset); + append_le(footer, 0); // comment length + checked_write(footer.data(), footer.size()); + } catch (...) { + // Ensure FD is released even on write failure (e.g., disk full); without + // this, repeated I/O errors in 256-worker runs would leak descriptors. + fclose(fp); + fp = nullptr; + throw; + } + + fclose(fp); + fp = nullptr; + } + +private: + FILE* fp; + vector& uncomp_buf; // reused across chunks (caller-owned) + vector& comp_buf; // reused across chunks (caller-owned) + size_t offset; // current write position within file + vector entries; + + void checked_write(const void* data, size_t size) + { + if (fwrite(data, 1, size, fp) != size) + throw runtime_error("NpzFileSession: write failed"); + } +}; + +} // namespace NpzWriter::NpzWriter(const string& output_path, int chunk_size, int worker_id) : output_path(output_path), chunk_size(chunk_size), worker_id(worker_id), @@ -49,7 +325,7 @@ void NpzWriter::save_chunk(const vector& records, { if (count == 0) return; - // Determine max dimensions in this range + // Determine max dimensions across this chunk's records (rows are padded to max). size_t max_segment_len = 0, max_signal_len = 0, max_kmer_len = 0; size_t max_dwell_motor_len = 0, max_dwell_pore_len = 0, max_bq_len = 0; @@ -63,7 +339,7 @@ void NpzWriter::save_chunk(const vector& records, max_bq_len = max(max_bq_len, r.bq_token.size()); } - // Reuse member flat arrays (capacity preserved across calls) + // Reuse member flat arrays (capacity preserved across calls). flat_segment_len.assign(count * max_segment_len, 0); flat_signal.assign(count * max_signal_len, 0.0f); flat_kmer.assign(count * max_kmer_len, 0); @@ -92,31 +368,42 @@ void NpzWriter::save_chunk(const vector& records, copy(r.bq_token.begin(), r.bq_token.end(), flat_bq.begin() + bq_off); label_ids[i] = r.label_id; + // Convert UUID string to two int64 halves. uuid_t is unsigned char[16] with + // only 1-byte alignment, so reinterpret_cast would be UB on + // strict-aliasing/aligned-access platforms; use memcpy and let the + // compiler turn it into two MOVs. uuid_t uuid; if (uuid_parse(r.read_id.c_str(), uuid) == 0) { - int64_t* uuid_as_int64 = reinterpret_cast(uuid); - read_ids[i * 2] = uuid_as_int64[0]; - read_ids[i * 2 + 1] = uuid_as_int64[1]; + memcpy(&read_ids[i * 2], uuid, sizeof(uuid_t)); + } else { + // Leave (0, 0) but log so silent corruption (every row showing the same + // null UUID) doesn't go unnoticed downstream. + cerr << "NpzWriter: invalid UUID '" << r.read_id + << "' (worker " << worker_id << "), writing zero read_id" << endl; } } + // Write all 8 arrays in a single NPZ pass — one fopen, one fclose, one + // central-directory write (vs cnpy's 8 of each). try { - cnpy::npz_save(filename, "segment_len_arr", flat_segment_len.data(), - {count, max_segment_len}, "w", true); - cnpy::npz_save(filename, "signal_token", flat_signal.data(), - {count, max_signal_len}, "a", true); - cnpy::npz_save(filename, "kmer_token", flat_kmer.data(), - {count, max_kmer_len}, "a", true); - cnpy::npz_save(filename, "dwell_motor_token", flat_dwell_motor.data(), - {count, max_dwell_motor_len}, "a", true); - cnpy::npz_save(filename, "dwell_pore_token", flat_dwell_pore.data(), - {count, max_dwell_pore_len}, "a", true); - cnpy::npz_save(filename, "bq_token", flat_bq.data(), - {count, max_bq_len}, "a", true); - cnpy::npz_save(filename, "label_id", label_ids.data(), {count}, "a", true); - cnpy::npz_save(filename, "read_id", read_ids.data(), {count, 2}, "a", true); + NpzFileSession sess(filename, npz_uncomp_buf, npz_comp_buf); + sess.add_array("segment_len_arr", flat_segment_len.data(), {count, max_segment_len}); + sess.add_array("signal_token", flat_signal.data(), {count, max_signal_len}); + sess.add_array("kmer_token", flat_kmer.data(), {count, max_kmer_len}); + sess.add_array("dwell_motor_token", flat_dwell_motor.data(), {count, max_dwell_motor_len}); + sess.add_array("dwell_pore_token", flat_dwell_pore.data(), {count, max_dwell_pore_len}); + sess.add_array("bq_token", flat_bq.data(), {count, max_bq_len}); + sess.add_array("label_id", label_ids.data(), {count}); + sess.add_array("read_id", read_ids.data(), {count, (size_t)2}); + sess.close(); } catch (const exception& e) { cerr << "Error saving NPZ file " << filename << ": " << e.what() << endl; + // Avoid leaving a partial NPZ on disk: a downstream NpzFileSession + // destructor on the throw path may have written a (possibly truncated) + // file via fopen("wb") + close(), which would otherwise be picked up + // by readers as a real chunk. + error_code ec; + filesystem::remove(filename, ec); } } diff --git a/cpp/src/npz/NpzWriter.h b/cpp/src/npz/NpzWriter.h index 06325f3..6cb8c70 100644 --- a/cpp/src/npz/NpzWriter.h +++ b/cpp/src/npz/NpzWriter.h @@ -1,6 +1,6 @@ /*************************************************************************************************** -* - * Copyright (C) 2025 Genome4me Incorporated - All Rights Reserved. + * + * Copyright (C) 2025-2026 Genome4me Incorporated - All Rights Reserved. * * This software, including its source code, embedded concepts, and associated * documentation, is proprietary to Genome4me Incorporated and is protected @@ -33,7 +33,7 @@ class NpzWriter { vector buffer; - // Reusable flat arrays for save_chunk (avoid repeated allocation) + // Reusable flat arrays for save_chunk (capacity preserved across calls) vector flat_segment_len; vector flat_signal; vector flat_kmer; @@ -43,6 +43,13 @@ class NpzWriter { vector label_ids; vector read_ids; + // Reusable NPZ scratch buffers — reused across all chunks to avoid per-chunk + // fresh allocations. With 256 workers, fresh allocations of ~80 MB per chunk + // caused page-fault storms and mmap_lock contention (>40% CPU). Reusing the + // buffer drops that to <8%. + vector npz_uncomp_buf; + vector npz_comp_buf; + void save_chunk(const vector& records, size_t offset, size_t count, const string& filename); string generate_filename(bool is_last_processing_unit, bool is_last_chunk); From 5edc6020e7432390375c498c3a428e3760218499 Mon Sep 17 00:00:00 2001 From: donghyuk Date: Wed, 20 May 2026 15:16:34 +0900 Subject: [PATCH 4/5] [bam] Apply --filter-flag in consistency mode The consistency-mode (-C) path went through BamReader::process_read, which historically applied BAM_FUNMAP / l_qseq==0 safety checks but ignored args.filter_flag. The normal-mode (no-C) path applied `flag & args.filter_flag` in MergedDataWorker::convert_bam1_to_record. The default --filter-flag of 276 (UNMAP|REV|SEC) is applied in no-C but ignored in -C. On calls_sorted.bam this caused no-C to drop 3,127,869 reverse-strand records that -C kept. Before this commit (-C ignoring --filter-flag): -C : 543,351,974 records no-C: 540,224,105 records -> divergence of 3,127,869 reverse-strand records (REV bit 16) After this commit (-C now honors --filter-flag): -C : 540,224,105 records no-C: 540,224,105 records -> multiset-identical Signed-off-by: donghyuk --- cpp/src/bam/BamReader.cpp | 19 ++++++++++++++++--- cpp/src/bam/BamReader.h | 5 +++-- cpp/src/main.cpp | 3 ++- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/cpp/src/bam/BamReader.cpp b/cpp/src/bam/BamReader.cpp index 089cb65..0a54836 100644 --- a/cpp/src/bam/BamReader.cpp +++ b/cpp/src/bam/BamReader.cpp @@ -1,6 +1,6 @@ /*************************************************************************************************** * - * Copyright (C) 2025 Genome4me Incorporated - All Rights Reserved. + * Copyright (C) 2025-2026 Genome4me Incorporated - All Rights Reserved. * * This software, including its source code, embedded concepts, and associated * documentation, is proprietary to Genome4me Incorporated and is protected @@ -22,10 +22,10 @@ #include namespace deeprm { - BamReader::BamReader(const string& path, int bq_threshold, char boi, + BamReader::BamReader(const string& path, int bq_threshold, char boi, int filter_flag, unordered_map& ref_index_dict) : bam_path(path), bq_cutoff(bq_threshold), base_of_interest(boi), - ref_index_dict(ref_index_dict) + filter_flag(filter_flag), ref_index_dict(ref_index_dict) { } @@ -370,8 +370,21 @@ namespace deeprm { void BamReader::process_read(bam1_t* read, vector& records) { + // BAM_FUNMAP and l_qseq==0 are unconditional safety checks: downstream + // code dereferences reference and sequence, so an unmapped record (tid + // == -1, no CIGAR) or a sequence-less secondary alignment (the BAM + // standard stores secondaries with seq='*', i.e. l_qseq==0) would + // crash. The default `-g 276` happens to mask both via its 4 (UNMAP) + // and 256 (SECONDARY) bits, but any user-supplied -g value missing + // them — e.g. `-g 0`, `-g 4`, `-g 16` — would let the unsafe records + // through. These two checks make the path safe for any -g value. + // + // filter_flag is then the user-tunable SAM-flag mask, kept identical to + // MergedDataWorker's no-C path so that -C and no-C produce the same + // record set for any -g value. if (read->core.flag & BAM_FUNMAP) return; if (read->core.l_qseq == 0) return; + if (read->core.flag & filter_flag) return; // Check for mv tag uint8_t* mv_tag = bam_aux_get(read, "mv"); diff --git a/cpp/src/bam/BamReader.h b/cpp/src/bam/BamReader.h index a468b36..471f0d3 100644 --- a/cpp/src/bam/BamReader.h +++ b/cpp/src/bam/BamReader.h @@ -1,6 +1,6 @@ /*************************************************************************************************** * - * Copyright (C) 2025 Genome4me Incorporated - All Rights Reserved. + * Copyright (C) 2025-2026 Genome4me Incorporated - All Rights Reserved. * * This software, including its source code, embedded concepts, and associated * documentation, is proprietary to Genome4me Incorporated and is protected @@ -44,6 +44,7 @@ namespace deeprm { string bam_path; int bq_cutoff; char base_of_interest; + int filter_flag; // SAM flag bits to exclude (matches MergedDataWorker no-C path) unordered_map ref_index_dict; vector> get_aligned_pairs(bam1_t* read, char boi); @@ -52,7 +53,7 @@ namespace deeprm { uint32_t get_md_reference_length(const char* md_tag); public: - BamReader(const string& path, int bq_threshold, char boi, + BamReader(const string& path, int bq_threshold, char boi, int filter_flag, unordered_map& ref_index_dict); ~BamReader(); diff --git a/cpp/src/main.cpp b/cpp/src/main.cpp index 66865f2..8da1c4a 100644 --- a/cpp/src/main.cpp +++ b/cpp/src/main.cpp @@ -97,7 +97,8 @@ void process_bam_worker(int worker_id, int num_workers, const Arguments& args, { log_info() << "Starting BAM worker " << worker_id << endl; - BamReader reader(args.bam_path, args.qcut, args.base_of_interest, ref_index_dict); + BamReader reader(args.bam_path, args.qcut, args.base_of_interest, + args.filter_flag, ref_index_dict); auto records = reader.parse_bam(worker_id, num_workers, args.bam_threads); size_t record_count = records.size(); // Save size before move From 197a62749fdc1cc05fda46ebf353ab4fafc35fe3 Mon Sep 17 00:00:00 2001 From: donghyuk Date: Wed, 20 May 2026 15:18:47 +0900 Subject: [PATCH 5/5] [merger] Add BAM safety guards in normal mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MergedDataWorker::convert_bam1_to_record is the per-record entry point of the normal-mode (no-C) path. It applies `flag & args.filter_flag` and an mv-tag check, but unlike the consistency-mode (-C) path's BamReader::process_read, it does NOT guard against unmapped reads or sequence-less records (l_qseq==0, the BAM-standard form for secondary alignments stored with seq='*'). The default `-g 276` (UNMAP|REV|SEC) happens to mask both via its 4 (UNMAP) and 256 (SECONDARY) bits, so default runs are safe. However, any user-supplied -g value missing those bits — e.g. -g 0, -g 4, -g 16 — lets the unsafe records through, and downstream code crashes when dereferencing the missing reference / sequence: -g 0 no-C: SIGSEGV (exit 139) -g 4 no-C: SIGSEGV (exit 139) -g 16 no-C: SIGSEGV (exit 139) -g 256 no-C: OK (l_qseq==0 records blocked via SEC bit) -g 276 no-C: OK (default) Add the same two unconditional guards that BamReader::process_read already performs on the -C path, before the existing filter_flag check: Verified on pod5_8 + calls_sorted.bam: Default --filter-flag(-g) 276: before this commit: 540,224,105 records h1=cc9e76f7ceccba4d after this commit: 540,224,105 records h1=cc9e76f7ceccba4d -> multiset-identical, no behavioural change for default users Non-default --filter-flag(-g) (10K subset, previously crashed): -g 0 before: SIGSEGV after: 1,784,476 records (filter off) -g 4 before: SIGSEGV after: 1,784,476 records (UNMAP only) -g 16 before: SIGSEGV after: 1,778,406 records (REV only) Signed-off-by: donghyuk --- cpp/src/merger/MergedDataWorker.cpp | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/cpp/src/merger/MergedDataWorker.cpp b/cpp/src/merger/MergedDataWorker.cpp index 33d415e..55570cb 100644 --- a/cpp/src/merger/MergedDataWorker.cpp +++ b/cpp/src/merger/MergedDataWorker.cpp @@ -1,6 +1,6 @@ /*************************************************************************************************** * - * Copyright (C) 2025 Genome4me Incorporated - All Rights Reserved. + * Copyright (C) 2025-2026 Genome4me Incorporated - All Rights Reserved. * * This software, including its source code, embedded concepts, and associated * documentation, is proprietary to Genome4me Incorporated and is protected @@ -222,6 +222,18 @@ namespace deeprm { bool MergedDataWorker::convert_bam1_to_record(bam1_t* read, sam_hdr_t* header, BamRecord& record) const { + // BAM_FUNMAP and l_qseq==0 are unconditional safety checks: downstream + // code dereferences reference and sequence, so an unmapped record (tid + // == -1, no CIGAR) or a sequence-less secondary alignment (the BAM + // standard stores secondaries with seq='*', i.e. l_qseq==0) would + // crash. The default `-g 276` happens to mask both via its 4 (UNMAP) + // and 256 (SECONDARY) bits, but any user-supplied -g value missing + // them — e.g. `-g 0`, `-g 4`, `-g 16` — would let the unsafe records + // through. These two checks make the path safe for any -g value. + if (read->core.flag & BAM_FUNMAP) + return false; + if (read->core.l_qseq == 0) + return false; if (read->core.flag & args.filter_flag) return false;