Skip to content

Commit 82f1667

Browse files
committed
Add support for CommandQueuePool
Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>
1 parent 86633b7 commit 82f1667

File tree

16 files changed

+319
-151
lines changed

16 files changed

+319
-151
lines changed

src/plugins/intel_npu/src/backend/include/zero_pipeline.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ class IPipeline {
4343
protected:
4444
void enable_profiling();
4545

46+
struct CommandQueueStateSnapshot {
47+
CommandQueueDesc desc;
48+
uint64_t version;
49+
};
50+
51+
CommandQueueStateSnapshot get_command_queue_state_snapshot();
52+
4653
std::shared_ptr<ZeroInitStructsHolder> _init_structs;
4754
std::shared_ptr<IGraph> _graph;
4855
const Config _config;
@@ -60,6 +67,8 @@ class IPipeline {
6067
*/
6168
size_t _batch_size;
6269

70+
std::shared_ptr<CommandQueue> _command_queue = nullptr;
71+
uint64_t _command_queue_version = 0;
6372
std::vector<std::unique_ptr<Fence>> _fences;
6473
std::shared_ptr<EventPool> _event_pool;
6574
std::vector<std::shared_ptr<Event>> _events;

src/plugins/intel_npu/src/backend/src/zero_dynamic_pipeline.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,8 @@ DynamicPipeline::DynamicPipeline(const std::shared_ptr<ZeroInitStructsHolder>& i
7777

7878
if (_sync_output_with_fences) {
7979
_fences.reserve(_batch_size);
80-
8180
for (size_t i = 0; i < _batch_size; i++) {
82-
_fences.emplace_back(std::make_unique<Fence>(_graph->get_command_queue()));
81+
_fences.emplace_back(std::make_unique<Fence>(_command_queue));
8382
}
8483
}
8584

@@ -181,7 +180,23 @@ void DynamicPipeline::push() {
181180
auto* dynamicGraph = dynamic_cast<IDynamicGraph*>(_graph.get());
182181
OPENVINO_ASSERT(dynamicGraph != nullptr, "Failed to cast graph to IDynamicGraph");
183182

184-
auto commandQueueHandle = _graph->get_command_queue()->handle();
183+
const auto command_queue_version = _graph->get_command_queue_desc_version();
184+
const bool command_queue_changed = (command_queue_version != _command_queue_version);
185+
if (command_queue_changed) {
186+
const auto command_queue_state = get_command_queue_state_snapshot();
187+
if (command_queue_state.version != _command_queue_version) {
188+
_command_queue = CommandQueuePool::getInstance().getCommandQueue(_init_structs, command_queue_state.desc);
189+
_command_queue_version = command_queue_state.version;
190+
191+
if (_sync_output_with_fences) {
192+
for (size_t i = 0; i < _fences.size(); i++) {
193+
_fences[i] = std::make_unique<Fence>(_command_queue);
194+
}
195+
}
196+
}
197+
}
198+
199+
auto commandQueueHandle = _command_queue->handle();
185200
for (size_t i = 0; i < _command_lists.size(); ++i) {
186201
OV_ITT_TASK_CHAIN(ZERO_PIPELINE_IP_PUSH, itt::domains::LevelZeroBackend, "Pipeline", "push");
187202

src/plugins/intel_npu/src/backend/src/zero_pipeline.cpp

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ IPipeline::IPipeline(const std::shared_ptr<ZeroInitStructsHolder>& init_structs,
5656
_config.get<RUN_INFERENCES_SEQUENTIALLY>()),
5757
_pipeline_unique_id_per_graph(get_graph_unique_id_or_throw(graph)),
5858
_logger(logName, _config.get<LOG_LEVEL>()) {
59+
const auto command_queue_state = get_command_queue_state_snapshot();
60+
_command_queue = CommandQueuePool::getInstance().getCommandQueue(_init_structs, command_queue_state.desc);
61+
_command_queue_version = command_queue_state.version;
62+
5963
bool perf_count_enabled = _config.has<PERF_COUNT>() && _config.get<PERF_COUNT>();
6064
std::optional<bool> compiled_with_profiling = _graph->is_profiling_blob();
6165

@@ -95,6 +99,18 @@ IPipeline::IPipeline(const std::shared_ptr<ZeroInitStructsHolder>& init_structs,
9599
}
96100
};
97101

102+
IPipeline::CommandQueueStateSnapshot IPipeline::get_command_queue_state_snapshot() {
103+
while (true) {
104+
const auto version_before = _graph->get_command_queue_desc_version();
105+
auto desc = _graph->get_command_queue_desc();
106+
const auto version_after = _graph->get_command_queue_desc_version();
107+
108+
if (version_before == version_after) {
109+
return {desc, version_after};
110+
}
111+
}
112+
}
113+
98114
std::vector<ov::ProfilingInfo> IPipeline::get_profiling_info() const {
99115
_logger.debug("get_profiling_info - started");
100116
if (!_config.has<PERF_COUNT>() || !_config.get<PERF_COUNT>()) {
@@ -159,19 +175,18 @@ Pipeline::Pipeline(const std::shared_ptr<ZeroInitStructsHolder>& init_structs,
159175
}
160176
}
161177

162-
_command_lists.reserve(_batch_size);
163-
for (size_t i = 0; i < _batch_size; i++) {
164-
_command_lists.emplace_back(std::make_unique<CommandList>(_init_structs));
165-
}
166-
167178
if (_sync_output_with_fences) {
168179
_fences.reserve(_batch_size);
169-
170180
for (size_t i = 0; i < _batch_size; i++) {
171-
_fences.emplace_back(std::make_unique<Fence>(_graph->get_command_queue()));
181+
_fences.emplace_back(std::make_unique<Fence>(_command_queue));
172182
}
173183
}
174184

185+
_command_lists.reserve(_batch_size);
186+
for (size_t i = 0; i < _batch_size; i++) {
187+
_command_lists.emplace_back(std::make_unique<CommandList>(_init_structs));
188+
}
189+
175190
for (size_t i = 0; i < _batch_size; i++) {
176191
_logger.debug("Pipeline - set args for command list number: %zu", i);
177192
size_t io_index = 0;
@@ -283,6 +298,22 @@ void Pipeline::push() {
283298
_graph->set_last_submitted_id(_pipeline_unique_id_per_graph);
284299
}
285300

301+
const auto command_queue_version = _graph->get_command_queue_desc_version();
302+
const bool command_queue_changed = (command_queue_version != _command_queue_version);
303+
if (command_queue_changed) {
304+
const auto command_queue_state = get_command_queue_state_snapshot();
305+
if (command_queue_state.version != _command_queue_version) {
306+
_command_queue = CommandQueuePool::getInstance().getCommandQueue(_init_structs, command_queue_state.desc);
307+
_command_queue_version = command_queue_state.version;
308+
309+
if (_sync_output_with_fences) {
310+
for (size_t i = 0; i < _fences.size(); i++) {
311+
_fences[i] = std::make_unique<Fence>(_command_queue);
312+
}
313+
}
314+
}
315+
}
316+
286317
for (size_t i = 0; i < _command_lists.size(); ++i) {
287318
_command_lists.at(i)->close();
288319
// Emit a marker for pipeline::push() with the command list handle as the metadata
@@ -292,9 +323,9 @@ void Pipeline::push() {
292323
(uintptr_t)_command_lists.at(i)->handle());
293324
OV_ITT_TASK_CHAIN(ZERO_PIPELINE_IP_PUSH, itt::domains::LevelZeroBackend, "Pipeline", "push");
294325
if (_sync_output_with_fences) {
295-
_graph->get_command_queue()->executeCommandList(*_command_lists.at(i), *_fences.at(i));
326+
_command_queue->executeCommandList(*_command_lists.at(i), *_fences.at(i));
296327
} else {
297-
_graph->get_command_queue()->executeCommandList(*_command_lists.at(i));
328+
_command_queue->executeCommandList(*_command_lists.at(i));
298329
}
299330
}
300331

src/plugins/intel_npu/src/common/include/intel_npu/common/igraph.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
4848

4949
virtual void update_network_name(std::string_view name);
5050

51-
virtual const std::shared_ptr<CommandQueue>& get_command_queue() const;
52-
53-
virtual void set_workload_type(const ov::WorkloadType workloadType) const;
51+
virtual CommandQueueDesc get_command_queue_desc() const;
52+
virtual uint64_t get_command_queue_desc_version() const;
53+
virtual void set_workload_type(const ov::WorkloadType workloadType);
5454

5555
std::mutex& get_mutex() {
5656
return _initialize_mutex;

src/plugins/intel_npu/src/common/src/igraph.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,15 @@ void IGraph::update_network_name(std::string_view) {
5050
OPENVINO_THROW("update_network_name not implemented");
5151
}
5252

53-
const std::shared_ptr<CommandQueue>& IGraph::get_command_queue() const {
54-
OPENVINO_THROW("get_command_queue not implemented");
53+
CommandQueueDesc IGraph::get_command_queue_desc() const {
54+
OPENVINO_THROW("get_command_queue_desc not implemented");
5555
}
5656

57-
void IGraph::set_workload_type(const ov::WorkloadType) const {
57+
uint64_t IGraph::get_command_queue_desc_version() const {
58+
OPENVINO_THROW("get_command_queue_desc_version not implemented");
59+
}
60+
61+
void IGraph::set_workload_type(const ov::WorkloadType) {
5862
OPENVINO_THROW("set_workload_type not implemented");
5963
}
6064

src/plugins/intel_npu/src/compiler_adapter/include/dynamic_graph.hpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
#pragma once
66

7+
#include <atomic>
8+
#include <mutex>
9+
710
#include <ze_graph_ext.h>
811

912
#include "intel_npu/common/idynamic_graph.hpp"
@@ -125,9 +128,9 @@ class DynamicGraph final : public IDynamicGraph {
125128

126129
void update_network_name(std::string_view name) override;
127130

128-
const std::shared_ptr<CommandQueue>& get_command_queue() const override;
129-
130-
void set_workload_type(const ov::WorkloadType workloadType) const override;
131+
CommandQueueDesc get_command_queue_desc() const override;
132+
uint64_t get_command_queue_desc_version() const override;
133+
void set_workload_type(const ov::WorkloadType workloadType) override;
131134

132135
void set_batch_size(std::size_t batch) override;
133136

@@ -170,8 +173,9 @@ class DynamicGraph final : public IDynamicGraph {
170173
*/
171174
uint64_t _num_of_subgraphs = 1;
172175

173-
mutable std::mutex _commandQueueMutex;
174-
std::shared_ptr<CommandQueue> _commandQueue;
176+
mutable std::mutex _commandQueueDescMutex;
177+
std::atomic<uint64_t> _commandQueueDescVersion{0};
178+
CommandQueueDesc _commandQueueDesc;
175179
std::vector<std::shared_ptr<Event>> _lastSubmittedEvent;
176180

177181
std::optional<ov::Tensor> _blob;

src/plugins/intel_npu/src/compiler_adapter/include/graph.hpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66

77
#pragma once
88

9+
#include <atomic>
10+
#include <mutex>
11+
912
#include <ze_graph_ext.h>
1013

1114
#include "intel_npu/common/igraph.hpp"
@@ -40,9 +43,9 @@ class Graph : public IGraph {
4043

4144
void update_network_name(std::string_view name) override;
4245

43-
const std::shared_ptr<CommandQueue>& get_command_queue() const override;
44-
45-
void set_workload_type(const ov::WorkloadType workloadType) const override;
46+
CommandQueueDesc get_command_queue_desc() const override;
47+
uint64_t get_command_queue_desc_version() const override;
48+
void set_workload_type(const ov::WorkloadType workloadType) override;
4649

4750
void set_last_submitted_event(const std::shared_ptr<Event>& event, size_t indexOfCommandList) override;
4851
const std::shared_ptr<Event>& get_last_submitted_event(size_t indexOfCommandList) const override;
@@ -72,8 +75,9 @@ class Graph : public IGraph {
7275
GraphDescriptor _graphDesc;
7376
NetworkMetadata _metadata;
7477

75-
mutable std::mutex _commandQueueMutex;
76-
std::shared_ptr<CommandQueue> _commandQueue;
78+
mutable std::mutex _commandQueueDescMutex;
79+
std::atomic<uint64_t> _commandQueueDescVersion{0};
80+
CommandQueueDesc _commandQueueDesc;
7781
std::vector<std::shared_ptr<Event>> _lastSubmittedEvent;
7882

7983
std::optional<ov::Tensor> _blob;

src/plugins/intel_npu/src/compiler_adapter/src/dynamic_graph.cpp

Lines changed: 36 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -489,29 +489,19 @@ void DynamicGraph::update_network_name(std::string_view name) {
489489
_metadata.name = name;
490490
}
491491

492-
const std::shared_ptr<CommandQueue>& DynamicGraph::get_command_queue() const {
493-
return _commandQueue;
492+
CommandQueueDesc DynamicGraph::get_command_queue_desc() const {
493+
std::lock_guard<std::mutex> lock(_commandQueueDescMutex);
494+
return _commandQueueDesc;
494495
}
495496

496-
void DynamicGraph::set_workload_type(const ov::WorkloadType workloadType) const {
497-
std::lock_guard<std::mutex> lock(_commandQueueMutex);
498-
if (_commandQueue == nullptr) {
499-
return;
500-
}
501-
502-
ze_command_queue_workload_type_t zeWorkloadType;
503-
switch (workloadType) {
504-
case ov::WorkloadType::DEFAULT:
505-
zeWorkloadType = ze_command_queue_workload_type_t::ZE_WORKLOAD_TYPE_DEFAULT;
506-
break;
507-
case ov::WorkloadType::EFFICIENT:
508-
zeWorkloadType = ze_command_queue_workload_type_t::ZE_WORKLOAD_TYPE_BACKGROUND;
509-
break;
510-
default:
511-
OPENVINO_THROW("Unknown value for WorkloadType!");
512-
}
497+
uint64_t DynamicGraph::get_command_queue_desc_version() const {
498+
return _commandQueueDescVersion.load(std::memory_order_acquire);
499+
}
513500

514-
_commandQueue->setWorkloadType(zeWorkloadType);
501+
void DynamicGraph::set_workload_type(const ov::WorkloadType workloadType) {
502+
std::lock_guard<std::mutex> lock(_commandQueueDescMutex);
503+
_commandQueueDesc.workload = zeroUtils::toZeQueueWorkloadType(std::optional<ov::WorkloadType>{workloadType});
504+
_commandQueueDescVersion.fetch_add(1, std::memory_order_release);
515505
}
516506

517507
void DynamicGraph::set_argument_value(uint32_t argi, const void* argv) const {
@@ -554,34 +544,37 @@ void DynamicGraph::initialize_impl(const FilteredConfig& config) {
554544
return;
555545
}
556546

557-
if (_commandQueue == nullptr) {
558-
_logger.debug("Graph initialize without graph handle");
547+
_logger.debug("Graph initialize without graph handle");
559548

560-
uint32_t commandQueueOptions = 0;
561-
if (config.has<TURBO>() && config.get<TURBO>()) {
562-
OPENVINO_ASSERT(_zeroInitStruct->getCommandQueueDdiTable().version() >= ZE_MAKE_VERSION(1, 0),
563-
"Turbo is not supported by the current driver");
564-
commandQueueOptions = commandQueueOptions | ZE_NPU_COMMAND_QUEUE_OPTION_TURBO;
565-
}
566-
OPENVINO_ASSERT(!(_zeroInitStruct->getCommandQueueDdiTable().version() < ZE_MAKE_VERSION(1, 1) &&
567-
config.has<RUN_INFERENCES_SEQUENTIALLY>() && config.get<RUN_INFERENCES_SEQUENTIALLY>()),
549+
uint32_t commandQueueOptions = 0;
550+
if (config.has<TURBO>() && config.get<TURBO>()) {
551+
OPENVINO_ASSERT(_zeroInitStruct->getCommandQueueDdiTable().version() >= ZE_MAKE_VERSION(1, 0),
552+
"Turbo is not supported by the current driver");
553+
_logger.debug("Set ZE_NPU_COMMAND_QUEUE_OPTION_TURBO in command queue options");
554+
commandQueueOptions = commandQueueOptions | ZE_NPU_COMMAND_QUEUE_OPTION_TURBO;
555+
}
556+
if (config.has<RUN_INFERENCES_SEQUENTIALLY>() && config.get<RUN_INFERENCES_SEQUENTIALLY>()) {
557+
OPENVINO_ASSERT(_zeroInitStruct->getCommandQueueDdiTable().version() >= ZE_MAKE_VERSION(1, 1),
568558
"Running inferences sequentially is not supported by the current driver");
559+
_logger.debug("Set ZE_NPU_COMMAND_QUEUE_OPTION_DEVICE_SYNC in command queue options");
560+
commandQueueOptions = commandQueueOptions | ZE_NPU_COMMAND_QUEUE_OPTION_DEVICE_SYNC;
561+
}
569562

570-
{
571-
std::lock_guard<std::mutex> lock(_commandQueueMutex);
572-
_commandQueue = std::make_shared<CommandQueue>(_zeroInitStruct,
573-
zeroUtils::toZeQueuePriority(config.get<MODEL_PRIORITY>()),
574-
commandQueueOptions);
575-
}
576-
577-
if (config.has<WORKLOAD_TYPE>()) {
578-
set_workload_type(config.get<WORKLOAD_TYPE>());
579-
}
563+
{
564+
std::lock_guard<std::mutex> lock(_commandQueueDescMutex);
565+
_commandQueueDesc = CommandQueueDesc{
566+
zeroUtils::toZeQueuePriority(config.get<MODEL_PRIORITY>()),
567+
zeroUtils::toZeQueueWorkloadType(config.has<WORKLOAD_TYPE>()
568+
? std::optional<ov::WorkloadType>{config.get<WORKLOAD_TYPE>()}
569+
: std::nullopt),
570+
commandQueueOptions,
571+
this};
572+
_commandQueueDescVersion.fetch_add(1, std::memory_order_release);
573+
}
580574

581-
_logger.debug("Graph initialize finish");
575+
_logger.debug("Graph initialize finish");
582576

583-
_batchSize = determine_batch_size();
584-
}
577+
_batchSize = determine_batch_size();
585578

586579
// To ensure that the initialization of the graph does not exit prematurely due to nullptrs
587580
_init_completed.store(true, std::memory_order_release);
@@ -668,10 +661,6 @@ DynamicGraph::~DynamicGraph() {
668661
if (!_lastSubmittedEvent.empty()) {
669662
_lastSubmittedEvent.clear();
670663
}
671-
672-
if (_commandQueue != nullptr) {
673-
_commandQueue.reset();
674-
}
675664
}
676665

677666
void DynamicGraph::execute(const std::shared_ptr<ZeroInitStructsHolder>& zeroInitStruct,

0 commit comments

Comments
 (0)