Skip to content

Commit f2f1f66

Browse files
committed
Optimize dGPU path
1 parent 0bf2ac9 commit f2f1f66

File tree

4 files changed

+85
-40
lines changed

4 files changed

+85
-40
lines changed

src/common/util/include/openvino/util/parallel_mem_streambuf.hpp

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
# ifndef WIN32_LEAN_AND_MEAN
2222
# define WIN32_LEAN_AND_MEAN
2323
# endif
24-
# include <windows.h>
2524
# include <psapi.h>
25+
# include <windows.h>
2626
#else
2727
# include <sys/mman.h>
2828
#endif
@@ -78,21 +78,18 @@ class ParallelMemStreamBuf : public std::streambuf {
7878
MEMORY_BASIC_INFORMATION mbi{};
7979
if (VirtualQuery(data, &mbi, sizeof(mbi)) && mbi.Type == MEM_MAPPED) {
8080
wchar_t dev_path[MAX_PATH] = {};
81-
if (GetMappedFileNameW(GetCurrentProcess(),
82-
const_cast<void*>(data),
83-
dev_path,
84-
MAX_PATH) > 0) {
81+
if (GetMappedFileNameW(GetCurrentProcess(), const_cast<void*>(data), dev_path, MAX_PATH) > 0) {
8582
// Convert device path (\Device\HarddiskVolume3\...) to Win32 path.
8683
wchar_t win32_path[MAX_PATH] = {};
8784
if (resolve_device_path(dev_path, win32_path, MAX_PATH)) {
8885
// Compute the file offset: the region base may be at a
8986
// different page boundary than the pointer we received.
9087
// AllocationBase is the start of the mapped view.
9188
const std::streamoff file_offset =
92-
reinterpret_cast<const char*>(data) -
93-
reinterpret_cast<const char*>(mbi.AllocationBase);
94-
m_file_buf = std::make_unique<ParallelReadStreamBuf>(
95-
std::filesystem::path(win32_path), file_offset, threshold);
89+
reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(mbi.AllocationBase);
90+
m_file_buf = std::make_unique<ParallelReadStreamBuf>(std::filesystem::path(win32_path),
91+
file_offset,
92+
threshold);
9693
}
9794
}
9895
}
@@ -298,9 +295,7 @@ class ParallelMemStreamBuf : public std::streambuf {
298295
// Parse /proc/self/maps to find the file backing an mmap address.
299296
// Returns true and fills out_path/out_offset if the address is inside
300297
// a named file mapping (i.e. not anonymous / [stack] / [heap]).
301-
static bool get_mmap_file_info(const void* addr,
302-
std::filesystem::path& out_path,
303-
std::streamoff& out_offset) {
298+
static bool get_mmap_file_info(const void* addr, std::filesystem::path& out_path, std::streamoff& out_offset) {
304299
std::ifstream maps_file("/proc/self/maps");
305300
if (!maps_file.is_open())
306301
return false;
@@ -316,21 +311,17 @@ class ParallelMemStreamBuf : public std::streambuf {
316311
const auto dash = addr_range.find('-');
317312
if (dash == std::string::npos)
318313
continue;
319-
const auto range_start =
320-
static_cast<uintptr_t>(std::stoull(addr_range.substr(0, dash), nullptr, 16));
321-
const auto range_end =
322-
static_cast<uintptr_t>(std::stoull(addr_range.substr(dash + 1), nullptr, 16));
314+
const auto range_start = static_cast<uintptr_t>(std::stoull(addr_range.substr(0, dash), nullptr, 16));
315+
const auto range_end = static_cast<uintptr_t>(std::stoull(addr_range.substr(dash + 1), nullptr, 16));
323316
if (addr_val < range_start || addr_val >= range_end)
324317
continue;
325318
// Read optional pathname
326319
std::string path;
327320
if (!(iss >> path) || path.empty() || path[0] != '/')
328321
return false; // anonymous or special region, no benefit
329322
out_path = path;
330-
const auto map_offset =
331-
static_cast<std::streamoff>(std::stoull(offset_str, nullptr, 16));
332-
out_offset = map_offset +
333-
static_cast<std::streamoff>(addr_val - range_start);
323+
const auto map_offset = static_cast<std::streamoff>(std::stoull(offset_str, nullptr, 16));
324+
out_offset = map_offset + static_cast<std::streamoff>(addr_val - range_start);
334325
return true;
335326
}
336327
return false;

src/common/util/include/openvino/util/parallel_read_streambuf.hpp

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <atomic>
1010
#include <cstring>
1111
#include <filesystem>
12+
#include <fstream>
1213
#include <future>
1314
#include <iostream>
1415
#include <stdexcept>
@@ -166,9 +167,8 @@ class ParallelReadStreamBuf : public std::streambuf {
166167
}
167168
// Read a batch of up to UNDERFLOW_BUF bytes so that character-by-character
168169
// consumers (std::getline, operator>>) don't issue one pread per char.
169-
const size_t to_read = static_cast<size_t>(
170-
std::min(static_cast<std::streamoff>(UNDERFLOW_BUF),
171-
m_file_size - m_file_offset));
170+
const size_t to_read =
171+
static_cast<size_t>(std::min(static_cast<std::streamoff>(UNDERFLOW_BUF), m_file_size - m_file_offset));
172172
if (!single_read(m_underflow_buf.data(), to_read, static_cast<size_t>(m_file_offset))) {
173173
return traits_type::eof();
174174
}
@@ -335,20 +335,22 @@ class ParallelReadStreamBuf : public std::streambuf {
335335
CloseHandle(t_handle);
336336
}));
337337
#else
338-
const int fd = m_fd;
339-
futures.emplace_back(std::async(std::launch::async, [fd, thread_file_offset, ptr, read_size, &success] {
340-
char* cur = ptr;
341-
size_t remaining = read_size;
342-
off_t cur_file_offset = static_cast<off_t>(thread_file_offset);
343-
while (remaining > 0 && success) {
344-
const ssize_t n = ::pread(fd, cur, remaining, cur_file_offset);
345-
if (n <= 0) {
346-
success = false;
347-
break;
348-
}
349-
cur += n;
350-
cur_file_offset += n;
351-
remaining -= static_cast<size_t>(n);
338+
// Each worker opens its own std::ifstream so that Linux's per-file-
339+
// description readahead state (file_ra_state / f_ra) is independent
340+
// per thread. Sharing a single fd causes concurrent pread() calls to
341+
// corrupt each other's sequential readahead prediction, collapsing
342+
// throughput from ~3.5 GB/s sequential to ~0.5 GB/s.
343+
const std::filesystem::path t_path = m_path;
344+
futures.emplace_back(std::async(std::launch::async, [t_path, thread_file_offset, ptr, read_size, &success] {
345+
std::ifstream t_ifs(t_path, std::ios::binary);
346+
if (!t_ifs.is_open()) {
347+
success = false;
348+
return;
349+
}
350+
t_ifs.seekg(static_cast<std::streamoff>(thread_file_offset), std::ios::beg);
351+
t_ifs.read(ptr, static_cast<std::streamsize>(read_size));
352+
if (!t_ifs.good()) {
353+
success = false;
352354
}
353355
}));
354356
#endif

src/inference/src/cache_manager.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818

1919
#include "openvino/runtime/icache_manager.hpp"
2020
#include "openvino/runtime/shared_buffer.hpp"
21-
#include "openvino/util/parallel_read_streambuf.hpp"
2221
#include "openvino/runtime/tensor.hpp"
2322
#include "openvino/util/file_util.hpp"
2423
#include "openvino/util/mmap_object.hpp"
24+
#include "openvino/util/parallel_read_streambuf.hpp"
2525

2626
namespace ov {
2727

src/plugins/intel_gpu/include/intel_gpu/primitives/data.hpp

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -422,13 +422,65 @@ struct data : public primitive_base<data> {
422422
if (is_alloc_host_accessible(_allocation_type)) {
423423
ib >> make_data(mem->buffer_ptr(), data_size);
424424
} else {
425-
const size_t DATA_BLOCK_SIZE = 2 * 1024 * 1024;
426-
auto& strm = ib.get_engine().get_service_stream();
425+
const size_t DATA_BLOCK_SIZE = 4 * 1024 * 1024;
426+
auto& eng = ib.get_engine();
427+
auto& strm = eng.get_service_stream();
427428
if (data_size < DATA_BLOCK_SIZE || output_layout.format.is_image_2d()) {
428429
std::vector<uint8_t> _buf(data_size);
429430
ib >> make_data(_buf.data(), data_size);
430431
mem->copy_from(strm, _buf.data());
432+
} else if (eng.supports_allocation(allocation_type::usm_host)) {
433+
// Use USM host staging buffers so the driver can DMA directly
434+
// from pinned host memory to device, avoiding the hidden
435+
// pageable-to-pinned bounce copy that std::vector staging incurs.
436+
// The pipeline is the same ping-pong as the fallback below:
437+
// CPU reads into host_bufA while GPU copies from host_bufB
438+
// Safety: each staging buffer is reused every other iteration;
439+
// the wait() before re-submitting a copy ensures the previous
440+
// GPU read from that buffer has finished before the CPU writes
441+
// new data into it (the OTHER buffer's event is waited, but that
442+
// event was submitted after waiting for THIS buffer's previous
443+
// event, so THIS buffer is always free by the time we reuse it).
444+
const layout staging_layout{{static_cast<int64_t>(DATA_BLOCK_SIZE)},
445+
data_types::u8, format::bfyx};
446+
auto host_buf1 = eng.allocate_memory(staging_layout, allocation_type::usm_host, false);
447+
auto host_buf2 = eng.allocate_memory(staging_layout, allocation_type::usm_host, false);
448+
bool buf_flag = true;
449+
event::ptr ev1, ev2;
450+
ev1 = ev2 = nullptr;
451+
size_t dst_offset = 0;
452+
while (dst_offset < data_size) {
453+
const bool is_blocking = false;
454+
const size_t src_offset = 0;
455+
size_t copy_size =
456+
(data_size > (dst_offset + DATA_BLOCK_SIZE)) ? DATA_BLOCK_SIZE : (data_size - dst_offset);
457+
if (buf_flag) {
458+
ib >> make_data(static_cast<uint8_t*>(host_buf1->buffer_ptr()), copy_size);
459+
if (ev2 != nullptr) {
460+
ev2->wait();
461+
ev2 = nullptr;
462+
}
463+
ev1 = mem->copy_from(strm, *host_buf1, src_offset, dst_offset, copy_size, is_blocking);
464+
} else {
465+
ib >> make_data(static_cast<uint8_t*>(host_buf2->buffer_ptr()), copy_size);
466+
if (ev1 != nullptr) {
467+
ev1->wait();
468+
ev1 = nullptr;
469+
}
470+
ev2 = mem->copy_from(strm, *host_buf2, src_offset, dst_offset, copy_size, is_blocking);
471+
}
472+
dst_offset += DATA_BLOCK_SIZE;
473+
buf_flag = !buf_flag;
474+
}
475+
if (ev2 != nullptr) {
476+
ev2->wait();
477+
}
478+
if (ev1 != nullptr) {
479+
ev1->wait();
480+
}
431481
} else {
482+
// Fallback for devices that do not support USM host allocation:
483+
// use pageable std::vector staging buffers (original behaviour).
432484
std::vector<uint8_t> _buf1(DATA_BLOCK_SIZE);
433485
std::vector<uint8_t> _buf2(DATA_BLOCK_SIZE);
434486
bool buf_flag = true;

0 commit comments

Comments
 (0)