Skip to content

Commit a7cc6bd

Browse files
committed
Add test cases
1 parent bef41ef commit a7cc6bd

File tree

5 files changed

+1053
-79
lines changed

5 files changed

+1053
-79
lines changed

src/common/util/include/openvino/util/parallel_mem_streambuf.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (C) 2026 Intel Corporation
1+
// Copyright (C) 2018-2026 Intel Corporation
22
// SPDX-License-Identifier: Apache-2.0
33
//
44

src/common/util/include/openvino/util/parallel_read_streambuf.hpp

Lines changed: 83 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (C) 2026 Intel Corporation
1+
// Copyright (C) 2018-2026 Intel Corporation
22
// SPDX-License-Identifier: Apache-2.0
33
//
44

@@ -9,13 +9,11 @@
99
#include <atomic>
1010
#include <cstring>
1111
#include <filesystem>
12-
#include <fstream>
13-
#include <future>
1412
#include <iostream>
1513
#include <stdexcept>
1614
#include <streambuf>
17-
#include <thread>
18-
#include <vector>
15+
16+
#include "openvino/core/parallel.hpp"
1917

2018
#ifdef _WIN32
2119
# ifndef NOMINMAX
@@ -71,6 +69,7 @@ class ParallelReadStreamBuf : public std::streambuf {
7169
size_t threshold = DEFAULT_THRESHOLD)
7270
: m_path(path),
7371
m_file_offset(header_offset),
72+
m_header_offset(header_offset),
7473
m_threshold(threshold) {
7574
#ifdef _WIN32
7675
m_handle = CreateFileW(path.native().c_str(),
@@ -114,8 +113,7 @@ class ParallelReadStreamBuf : public std::streambuf {
114113
m_fd = -1;
115114
}
116115
#endif
117-
throw std::out_of_range("ParallelReadStreamBuf: header_offset is out of range for file: " +
118-
path.string());
116+
throw std::out_of_range("ParallelReadStreamBuf: header_offset is out of range for file: " + path.string());
119117
}
120118
}
121119

@@ -199,24 +197,30 @@ class ParallelReadStreamBuf : public std::streambuf {
199197
// Seek support
200198
// -----------------------------------------------------------------------
201199
pos_type seekoff(off_type off, std::ios_base::seekdir way, std::ios_base::openmode /* which */) override {
200+
// All internal positions (m_file_offset, m_file_size, m_header_offset) are
201+
// absolute byte offsets from the start of the file. The public-facing
202+
// stream positions are *logical* offsets: 0 == header_offset in the file.
202203
std::streamoff new_pos = 0;
203204
if (way == std::ios_base::beg) {
204-
new_pos = off;
205+
// off is a logical offset; translate to absolute file offset.
206+
new_pos = m_header_offset + off;
205207
} else if (way == std::ios_base::cur) {
206-
// Account for the buffered char from underflow() if it hasn't been consumed
208+
// Account for the buffered chars from underflow() not yet consumed.
207209
const std::streamsize ahead = static_cast<std::streamsize>(egptr() - gptr());
208-
new_pos = m_file_offset - ahead + off;
210+
new_pos = m_file_offset - ahead + off; // stays absolute
209211
} else {
210-
new_pos = m_file_size + off;
212+
new_pos = m_file_size + off; // stays absolute
211213
}
212214

213-
if (new_pos < 0 || new_pos > m_file_size) {
215+
// Reject seeks before the logical stream start or past the file end.
216+
if (new_pos < m_header_offset || new_pos > m_file_size) {
214217
return pos_type(off_type(-1));
215218
}
216219

217220
setg(nullptr, nullptr, nullptr); // invalidate get-area
218221
m_file_offset = new_pos;
219-
return pos_type(m_file_offset);
222+
// Return the logical position (0 == start of exposed stream).
223+
return pos_type(m_file_offset - m_header_offset);
220224
}
221225

222226
pos_type seekpos(pos_type pos, std::ios_base::openmode /* which */) override {
@@ -277,7 +281,7 @@ class ParallelReadStreamBuf : public std::streambuf {
277281
// Parallel positional read
278282
// -----------------------------------------------------------------------
279283
bool parallel_read(char* dst, size_t size, size_t file_offset) {
280-
const size_t hw_threads = static_cast<size_t>(std::thread::hardware_concurrency());
284+
const size_t hw_threads = static_cast<size_t>(parallel_get_max_threads());
281285
const size_t max_by_size = size / (1024 * 1024); // 1 thread per MB
282286
const size_t num_threads = std::max(size_t{1}, std::min(hw_threads, max_by_size));
283287

@@ -290,91 +294,91 @@ class ParallelReadStreamBuf : public std::streambuf {
290294
chunk_size = (chunk_size + 4095u) & ~size_t{4095u};
291295

292296
std::atomic<bool> success{true};
293-
std::vector<std::future<void>> futures;
294-
futures.reserve(num_threads);
295297

296298
#if ENABLE_BD_PROFILING_LOG
297299
const auto t0 = std::chrono::steady_clock::now();
298300
#endif
299301

300-
size_t cur_offset = 0;
301-
for (size_t i = 0; i < num_threads; ++i) {
302-
const size_t read_size = (i == num_threads - 1u) ? (size - cur_offset) : chunk_size;
303-
if (read_size == 0) {
304-
break;
302+
// Dispatch via OpenVINO's thread pool (TBB/OMP/SEQ) so threads are reused
303+
// across calls and there is no per-read create/destroy overhead.
304+
// Each worker opens its own file descriptor so that Linux's per-file-
305+
// description readahead state (file_ra_state / f_ra) is independent per
306+
// thread. Sharing a single fd causes concurrent pread() calls to corrupt
307+
// each other's sequential readahead prediction, collapsing throughput from
308+
// ~3.5 GB/s sequential to ~0.5 GB/s.
309+
ov::parallel_nt_static(static_cast<int>(num_threads), [&](int ithr, int nthr) {
310+
const size_t cur_offset = static_cast<size_t>(ithr) * chunk_size;
311+
if (cur_offset >= size) {
312+
return; // chunk rounding may leave trailing workers with nothing to do
305313
}
306-
307-
char* ptr = dst + cur_offset;
314+
const size_t read_size = (ithr == nthr - 1) ? (size - cur_offset) : chunk_size;
315+
char* const ptr = dst + cur_offset;
308316
const size_t thread_file_offset = file_offset + cur_offset;
309317

310318
#ifdef _WIN32
311319
const std::wstring wpath = m_path.native();
312-
futures.emplace_back(std::async(std::launch::async, [wpath, thread_file_offset, ptr, read_size, &success] {
313-
HANDLE t_handle = CreateFileW(wpath.c_str(),
314-
GENERIC_READ,
315-
FILE_SHARE_READ | FILE_SHARE_WRITE,
316-
nullptr,
317-
OPEN_EXISTING,
318-
FILE_ATTRIBUTE_NORMAL,
319-
nullptr);
320-
if (t_handle == INVALID_HANDLE_VALUE) {
321-
success = false;
322-
return;
323-
}
320+
HANDLE t_handle = CreateFileW(wpath.c_str(),
321+
GENERIC_READ,
322+
FILE_SHARE_READ | FILE_SHARE_WRITE,
323+
nullptr,
324+
OPEN_EXISTING,
325+
FILE_ATTRIBUTE_NORMAL,
326+
nullptr);
327+
if (t_handle == INVALID_HANDLE_VALUE) {
328+
success = false;
329+
return;
330+
}
324331

325-
char* cur = ptr;
326-
size_t remaining = read_size;
327-
size_t cur_file_offset = thread_file_offset;
328-
329-
while (remaining > 0 && success) {
330-
const DWORD to_read =
331-
static_cast<DWORD>(std::min(remaining, static_cast<size_t>(UINT_MAX - 1024u)));
332-
OVERLAPPED ov = {};
333-
ov.Offset = static_cast<DWORD>(cur_file_offset & 0xFFFFFFFFu);
334-
ov.OffsetHigh = static_cast<DWORD>((cur_file_offset >> 32) & 0xFFFFFFFFu);
335-
DWORD bytes_read = 0;
336-
if (!ReadFile(t_handle, cur, to_read, &bytes_read, &ov)) {
337-
if (GetLastError() != ERROR_IO_PENDING) {
338-
success = false;
339-
break;
340-
}
341-
}
342-
if (bytes_read == 0) {
332+
char* cur = ptr;
333+
size_t remaining = read_size;
334+
size_t cur_file_offset = thread_file_offset;
335+
336+
while (remaining > 0 && success) {
337+
const DWORD to_read = static_cast<DWORD>(std::min(remaining, static_cast<size_t>(UINT_MAX - 1024u)));
338+
OVERLAPPED ov = {};
339+
ov.Offset = static_cast<DWORD>(cur_file_offset & 0xFFFFFFFFu);
340+
ov.OffsetHigh = static_cast<DWORD>((cur_file_offset >> 32) & 0xFFFFFFFFu);
341+
DWORD bytes_read = 0;
342+
if (!ReadFile(t_handle, cur, to_read, &bytes_read, &ov)) {
343+
if (GetLastError() != ERROR_IO_PENDING) {
343344
success = false;
344345
break;
345346
}
346-
cur += bytes_read;
347-
cur_file_offset += bytes_read;
348-
remaining -= bytes_read;
349347
}
350-
CloseHandle(t_handle);
351-
}));
352-
#else
353-
// Each worker opens its own std::ifstream so that Linux's per-file-
354-
// description readahead state (file_ra_state / f_ra) is independent
355-
// per thread. Sharing a single fd causes concurrent pread() calls to
356-
// corrupt each other's sequential readahead prediction, collapsing
357-
// throughput from ~3.5 GB/s sequential to ~0.5 GB/s.
358-
const std::filesystem::path t_path = m_path;
359-
futures.emplace_back(std::async(std::launch::async, [t_path, thread_file_offset, ptr, read_size, &success] {
360-
std::ifstream t_ifs(t_path, std::ios::binary);
361-
if (!t_ifs.is_open()) {
348+
if (bytes_read == 0) {
362349
success = false;
363-
return;
350+
break;
364351
}
365-
t_ifs.seekg(static_cast<std::streamoff>(thread_file_offset), std::ios::beg);
366-
t_ifs.read(ptr, static_cast<std::streamsize>(read_size));
367-
if (!t_ifs.good()) {
352+
cur += bytes_read;
353+
cur_file_offset += bytes_read;
354+
remaining -= bytes_read;
355+
}
356+
CloseHandle(t_handle);
357+
#else
358+
const int t_fd = ::open(m_path.c_str(), O_RDONLY);
359+
if (t_fd == -1) {
360+
success = false;
361+
return;
362+
}
363+
364+
char* cur = ptr;
365+
size_t remaining = read_size;
366+
off_t cur_off = static_cast<off_t>(thread_file_offset);
367+
368+
while (remaining > 0 && success) {
369+
const ssize_t n = ::pread(t_fd, cur, remaining, cur_off);
370+
if (n <= 0) {
368371
success = false;
372+
break;
369373
}
370-
}));
374+
cur += n;
375+
cur_off += n;
376+
remaining -= static_cast<size_t>(n);
377+
}
378+
::close(t_fd);
371379
#endif
372-
cur_offset += read_size;
373-
}
380+
});
374381

375-
for (auto& f : futures) {
376-
f.get();
377-
}
378382
#if ENABLE_BD_PROFILING_LOG
379383
{
380384
const auto t1 = std::chrono::steady_clock::now();
@@ -397,6 +401,7 @@ class ParallelReadStreamBuf : public std::streambuf {
397401
int m_fd = -1;
398402
#endif
399403
std::streamoff m_file_offset;
404+
std::streamoff m_header_offset = 0; // absolute file offset of logical stream start
400405
std::streamoff m_file_size = 0;
401406
size_t m_threshold;
402407
std::array<char_type, UNDERFLOW_BUF> m_underflow_buf{}; // buffer for underflow()

0 commit comments

Comments
 (0)