Skip to content

Commit 43d43a2

Browse files
committed
Add support for CommandQueuePool
Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>
1 parent 86633b7 commit 43d43a2

File tree

14 files changed

+277
-147
lines changed

14 files changed

+277
-147
lines changed

src/plugins/intel_npu/src/backend/include/zero_pipeline.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ class IPipeline {
4343
protected:
4444
void enable_profiling();
4545

46+
struct CommandQueueStateSnapshot {
47+
CommandQueueDesc desc;
48+
uint64_t version;
49+
};
50+
51+
CommandQueueStateSnapshot get_command_queue_state_snapshot();
52+
4653
std::shared_ptr<ZeroInitStructsHolder> _init_structs;
4754
std::shared_ptr<IGraph> _graph;
4855
const Config _config;
@@ -60,6 +67,8 @@ class IPipeline {
6067
*/
6168
size_t _batch_size;
6269

70+
std::shared_ptr<CommandQueue> _command_queue = nullptr;
71+
uint64_t _command_queue_version = 0;
6372
std::vector<std::unique_ptr<Fence>> _fences;
6473
std::shared_ptr<EventPool> _event_pool;
6574
std::vector<std::shared_ptr<Event>> _events;

src/plugins/intel_npu/src/backend/src/zero_dynamic_pipeline.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,8 @@ DynamicPipeline::DynamicPipeline(const std::shared_ptr<ZeroInitStructsHolder>& i
7777

7878
if (_sync_output_with_fences) {
7979
_fences.reserve(_batch_size);
80-
8180
for (size_t i = 0; i < _batch_size; i++) {
82-
_fences.emplace_back(std::make_unique<Fence>(_graph->get_command_queue()));
81+
_fences.emplace_back(std::make_unique<Fence>(_command_queue));
8382
}
8483
}
8584

@@ -181,7 +180,23 @@ void DynamicPipeline::push() {
181180
auto* dynamicGraph = dynamic_cast<IDynamicGraph*>(_graph.get());
182181
OPENVINO_ASSERT(dynamicGraph != nullptr, "Failed to cast graph to IDynamicGraph");
183182

184-
auto commandQueueHandle = _graph->get_command_queue()->handle();
183+
const auto command_queue_version = _graph->get_command_queue_desc_version();
184+
const bool command_queue_changed = (command_queue_version != _command_queue_version);
185+
if (command_queue_changed) {
186+
const auto command_queue_state = get_command_queue_state_snapshot();
187+
if (command_queue_state.version != _command_queue_version) {
188+
_command_queue = CommandQueuePool::getInstance().getCommandQueue(_init_structs, command_queue_state.desc);
189+
_command_queue_version = command_queue_state.version;
190+
191+
if (_sync_output_with_fences) {
192+
for (size_t i = 0; i < _fences.size(); i++) {
193+
_fences[i] = std::make_unique<Fence>(_command_queue);
194+
}
195+
}
196+
}
197+
}
198+
199+
auto commandQueueHandle = _command_queue->handle();
185200
for (size_t i = 0; i < _command_lists.size(); ++i) {
186201
OV_ITT_TASK_CHAIN(ZERO_PIPELINE_IP_PUSH, itt::domains::LevelZeroBackend, "Pipeline", "push");
187202

src/plugins/intel_npu/src/backend/src/zero_pipeline.cpp

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,17 @@ IPipeline::IPipeline(const std::shared_ptr<ZeroInitStructsHolder>& init_structs,
5151
_graph(graph),
5252
_config(config),
5353
_batch_size(batch_size),
54+
_command_queue(nullptr),
55+
_command_queue_version(0),
5456
_extension_version(init_structs->getCommandQueueDdiTable().version()),
5557
_run_inferences_sequentially(_extension_version < ZE_MAKE_VERSION(1, 1) &&
5658
_config.get<RUN_INFERENCES_SEQUENTIALLY>()),
5759
_pipeline_unique_id_per_graph(get_graph_unique_id_or_throw(graph)),
5860
_logger(logName, _config.get<LOG_LEVEL>()) {
61+
const auto command_queue_state = get_command_queue_state_snapshot();
62+
_command_queue = CommandQueuePool::getInstance().getCommandQueue(_init_structs, command_queue_state.desc);
63+
_command_queue_version = command_queue_state.version;
64+
5965
bool perf_count_enabled = _config.has<PERF_COUNT>() && _config.get<PERF_COUNT>();
6066
std::optional<bool> compiled_with_profiling = _graph->is_profiling_blob();
6167

@@ -95,6 +101,18 @@ IPipeline::IPipeline(const std::shared_ptr<ZeroInitStructsHolder>& init_structs,
95101
}
96102
};
97103

104+
IPipeline::CommandQueueStateSnapshot IPipeline::get_command_queue_state_snapshot() {
105+
while (true) {
106+
const auto version_before = _graph->get_command_queue_desc_version();
107+
auto desc = _graph->get_command_queue_desc();
108+
const auto version_after = _graph->get_command_queue_desc_version();
109+
110+
if (version_before == version_after) {
111+
return {desc, version_after};
112+
}
113+
}
114+
}
115+
98116
std::vector<ov::ProfilingInfo> IPipeline::get_profiling_info() const {
99117
_logger.debug("get_profiling_info - started");
100118
if (!_config.has<PERF_COUNT>() || !_config.get<PERF_COUNT>()) {
@@ -159,19 +177,18 @@ Pipeline::Pipeline(const std::shared_ptr<ZeroInitStructsHolder>& init_structs,
159177
}
160178
}
161179

162-
_command_lists.reserve(_batch_size);
163-
for (size_t i = 0; i < _batch_size; i++) {
164-
_command_lists.emplace_back(std::make_unique<CommandList>(_init_structs));
165-
}
166-
167180
if (_sync_output_with_fences) {
168181
_fences.reserve(_batch_size);
169-
170182
for (size_t i = 0; i < _batch_size; i++) {
171-
_fences.emplace_back(std::make_unique<Fence>(_graph->get_command_queue()));
183+
_fences.emplace_back(std::make_unique<Fence>(_command_queue));
172184
}
173185
}
174186

187+
_command_lists.reserve(_batch_size);
188+
for (size_t i = 0; i < _batch_size; i++) {
189+
_command_lists.emplace_back(std::make_unique<CommandList>(_init_structs));
190+
}
191+
175192
for (size_t i = 0; i < _batch_size; i++) {
176193
_logger.debug("Pipeline - set args for command list number: %zu", i);
177194
size_t io_index = 0;
@@ -283,6 +300,22 @@ void Pipeline::push() {
283300
_graph->set_last_submitted_id(_pipeline_unique_id_per_graph);
284301
}
285302

303+
const auto command_queue_version = _graph->get_command_queue_desc_version();
304+
const bool command_queue_changed = (command_queue_version != _command_queue_version);
305+
if (command_queue_changed) {
306+
const auto command_queue_state = get_command_queue_state_snapshot();
307+
if (command_queue_state.version != _command_queue_version) {
308+
_command_queue = CommandQueuePool::getInstance().getCommandQueue(_init_structs, command_queue_state.desc);
309+
_command_queue_version = command_queue_state.version;
310+
311+
if (_sync_output_with_fences) {
312+
for (size_t i = 0; i < _fences.size(); i++) {
313+
_fences[i] = std::make_unique<Fence>(_command_queue);
314+
}
315+
}
316+
}
317+
}
318+
286319
for (size_t i = 0; i < _command_lists.size(); ++i) {
287320
_command_lists.at(i)->close();
288321
// Emit a marker for pipeline::push() with the command list handle as the metadata
@@ -292,9 +325,9 @@ void Pipeline::push() {
292325
(uintptr_t)_command_lists.at(i)->handle());
293326
OV_ITT_TASK_CHAIN(ZERO_PIPELINE_IP_PUSH, itt::domains::LevelZeroBackend, "Pipeline", "push");
294327
if (_sync_output_with_fences) {
295-
_graph->get_command_queue()->executeCommandList(*_command_lists.at(i), *_fences.at(i));
328+
_command_queue->executeCommandList(*_command_lists.at(i), *_fences.at(i));
296329
} else {
297-
_graph->get_command_queue()->executeCommandList(*_command_lists.at(i));
330+
_command_queue->executeCommandList(*_command_lists.at(i));
298331
}
299332
}
300333

src/plugins/intel_npu/src/common/include/intel_npu/common/igraph.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
4848

4949
virtual void update_network_name(std::string_view name);
5050

51-
virtual const std::shared_ptr<CommandQueue>& get_command_queue() const;
52-
53-
virtual void set_workload_type(const ov::WorkloadType workloadType) const;
51+
virtual CommandQueueDesc get_command_queue_desc() const;
52+
virtual uint64_t get_command_queue_desc_version() const;
53+
virtual void set_workload_type(const ov::WorkloadType workloadType);
5454

5555
std::mutex& get_mutex() {
5656
return _initialize_mutex;

src/plugins/intel_npu/src/common/src/igraph.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,15 @@ void IGraph::update_network_name(std::string_view) {
5050
OPENVINO_THROW("update_network_name not implemented");
5151
}
5252

53-
const std::shared_ptr<CommandQueue>& IGraph::get_command_queue() const {
54-
OPENVINO_THROW("get_command_queue not implemented");
53+
CommandQueueDesc IGraph::get_command_queue_desc() const {
54+
OPENVINO_THROW("get_command_queue_desc not implemented");
5555
}
5656

57-
void IGraph::set_workload_type(const ov::WorkloadType) const {
57+
uint64_t IGraph::get_command_queue_desc_version() const {
58+
OPENVINO_THROW("get_command_queue_desc_version not implemented");
59+
}
60+
61+
void IGraph::set_workload_type(const ov::WorkloadType) {
5862
OPENVINO_THROW("set_workload_type not implemented");
5963
}
6064

src/plugins/intel_npu/src/compiler_adapter/include/dynamic_graph.hpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
#pragma once
66

7+
#include <atomic>
8+
#include <mutex>
9+
710
#include <ze_graph_ext.h>
811

912
#include "intel_npu/common/idynamic_graph.hpp"
@@ -125,9 +128,9 @@ class DynamicGraph final : public IDynamicGraph {
125128

126129
void update_network_name(std::string_view name) override;
127130

128-
const std::shared_ptr<CommandQueue>& get_command_queue() const override;
129-
130-
void set_workload_type(const ov::WorkloadType workloadType) const override;
131+
CommandQueueDesc get_command_queue_desc() const override;
132+
uint64_t get_command_queue_desc_version() const override;
133+
void set_workload_type(const ov::WorkloadType workloadType) override;
131134

132135
void set_batch_size(std::size_t batch) override;
133136

@@ -170,8 +173,9 @@ class DynamicGraph final : public IDynamicGraph {
170173
*/
171174
uint64_t _num_of_subgraphs = 1;
172175

173-
mutable std::mutex _commandQueueMutex;
174-
std::shared_ptr<CommandQueue> _commandQueue;
176+
mutable std::mutex _commandQueueDescMutex;
177+
std::atomic<uint64_t> _commandQueueVersion{0};
178+
CommandQueueDesc _commandQueueDesc;
175179
std::vector<std::shared_ptr<Event>> _lastSubmittedEvent;
176180

177181
std::optional<ov::Tensor> _blob;

src/plugins/intel_npu/src/compiler_adapter/include/graph.hpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66

77
#pragma once
88

9+
#include <atomic>
10+
#include <mutex>
11+
912
#include <ze_graph_ext.h>
1013

1114
#include "intel_npu/common/igraph.hpp"
@@ -40,9 +43,9 @@ class Graph : public IGraph {
4043

4144
void update_network_name(std::string_view name) override;
4245

43-
const std::shared_ptr<CommandQueue>& get_command_queue() const override;
44-
45-
void set_workload_type(const ov::WorkloadType workloadType) const override;
46+
CommandQueueDesc get_command_queue_desc() const override;
47+
uint64_t get_command_queue_desc_version() const override;
48+
void set_workload_type(const ov::WorkloadType workloadType) override;
4649

4750
void set_last_submitted_event(const std::shared_ptr<Event>& event, size_t indexOfCommandList) override;
4851
const std::shared_ptr<Event>& get_last_submitted_event(size_t indexOfCommandList) const override;
@@ -72,8 +75,9 @@ class Graph : public IGraph {
7275
GraphDescriptor _graphDesc;
7376
NetworkMetadata _metadata;
7477

75-
mutable std::mutex _commandQueueMutex;
76-
std::shared_ptr<CommandQueue> _commandQueue;
78+
mutable std::mutex _commandQueueDescMutex;
79+
std::atomic<uint64_t> _commandQueueVersion{0};
80+
CommandQueueDesc _commandQueueDesc;
7781
std::vector<std::shared_ptr<Event>> _lastSubmittedEvent;
7882

7983
std::optional<ov::Tensor> _blob;

src/plugins/intel_npu/src/compiler_adapter/src/dynamic_graph.cpp

Lines changed: 37 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -489,29 +489,19 @@ void DynamicGraph::update_network_name(std::string_view name) {
489489
_metadata.name = name;
490490
}
491491

492-
const std::shared_ptr<CommandQueue>& DynamicGraph::get_command_queue() const {
493-
return _commandQueue;
492+
CommandQueueDesc DynamicGraph::get_command_queue_desc() const {
493+
std::lock_guard<std::mutex> lock(_commandQueueDescMutex);
494+
return _commandQueueDesc;
494495
}
495496

496-
void DynamicGraph::set_workload_type(const ov::WorkloadType workloadType) const {
497-
std::lock_guard<std::mutex> lock(_commandQueueMutex);
498-
if (_commandQueue == nullptr) {
499-
return;
500-
}
501-
502-
ze_command_queue_workload_type_t zeWorkloadType;
503-
switch (workloadType) {
504-
case ov::WorkloadType::DEFAULT:
505-
zeWorkloadType = ze_command_queue_workload_type_t::ZE_WORKLOAD_TYPE_DEFAULT;
506-
break;
507-
case ov::WorkloadType::EFFICIENT:
508-
zeWorkloadType = ze_command_queue_workload_type_t::ZE_WORKLOAD_TYPE_BACKGROUND;
509-
break;
510-
default:
511-
OPENVINO_THROW("Unknown value for WorkloadType!");
512-
}
497+
uint64_t DynamicGraph::get_command_queue_desc_version() const {
498+
return _commandQueueVersion.load(std::memory_order_acquire);
499+
}
513500

514-
_commandQueue->setWorkloadType(zeWorkloadType);
501+
void DynamicGraph::set_workload_type(const ov::WorkloadType workloadType) {
502+
std::lock_guard<std::mutex> lock(_commandQueueDescMutex);
503+
_commandQueueDesc.workload = zeroUtils::toZeQueueWorkloadType(workloadType);
504+
_commandQueueVersion.fetch_add(1, std::memory_order_release);
515505
}
516506

517507
void DynamicGraph::set_argument_value(uint32_t argi, const void* argv) const {
@@ -554,34 +544,37 @@ void DynamicGraph::initialize_impl(const FilteredConfig& config) {
554544
return;
555545
}
556546

557-
if (_commandQueue == nullptr) {
558-
_logger.debug("Graph initialize without graph handle");
547+
_logger.debug("Graph initialize without graph handle");
559548

560-
uint32_t commandQueueOptions = 0;
561-
if (config.has<TURBO>() && config.get<TURBO>()) {
562-
OPENVINO_ASSERT(_zeroInitStruct->getCommandQueueDdiTable().version() >= ZE_MAKE_VERSION(1, 0),
563-
"Turbo is not supported by the current driver");
564-
commandQueueOptions = commandQueueOptions | ZE_NPU_COMMAND_QUEUE_OPTION_TURBO;
565-
}
566-
OPENVINO_ASSERT(!(_zeroInitStruct->getCommandQueueDdiTable().version() < ZE_MAKE_VERSION(1, 1) &&
567-
config.has<RUN_INFERENCES_SEQUENTIALLY>() && config.get<RUN_INFERENCES_SEQUENTIALLY>()),
568-
"Running inferences sequentially is not supported by the current driver");
569-
570-
{
571-
std::lock_guard<std::mutex> lock(_commandQueueMutex);
572-
_commandQueue = std::make_shared<CommandQueue>(_zeroInitStruct,
573-
zeroUtils::toZeQueuePriority(config.get<MODEL_PRIORITY>()),
574-
commandQueueOptions);
575-
}
549+
uint32_t commandQueueOptions = 0;
550+
if (config.has<TURBO>() && config.get<TURBO>()) {
551+
OPENVINO_ASSERT(_zeroInitStruct->getCommandQueueDdiTable().version() >= ZE_MAKE_VERSION(1, 0),
552+
"Turbo is not supported by the current driver");
553+
commandQueueOptions = commandQueueOptions | ZE_NPU_COMMAND_QUEUE_OPTION_TURBO;
554+
}
555+
OPENVINO_ASSERT(!(_zeroInitStruct->getCommandQueueDdiTable().version() < ZE_MAKE_VERSION(1, 1) &&
556+
config.has<RUN_INFERENCES_SEQUENTIALLY>() && config.get<RUN_INFERENCES_SEQUENTIALLY>()),
557+
"Running inferences sequentially is not supported by the current driver");
576558

577-
if (config.has<WORKLOAD_TYPE>()) {
578-
set_workload_type(config.get<WORKLOAD_TYPE>());
579-
}
559+
{
560+
std::lock_guard<std::mutex> lock(_commandQueueDescMutex);
561+
_commandQueueDesc = CommandQueueDesc{
562+
zeroUtils::toZeQueuePriority(config.get<MODEL_PRIORITY>()),
563+
zeroUtils::toZeQueueWorkloadType(config.has<WORKLOAD_TYPE>()
564+
? std::optional<ov::WorkloadType>{config.get<WORKLOAD_TYPE>()}
565+
: std::nullopt),
566+
commandQueueOptions,
567+
this};
568+
_commandQueueVersion.fetch_add(1, std::memory_order_release);
569+
}
580570

581-
_logger.debug("Graph initialize finish");
571+
_logger.debug("Graph initialize finish");
582572

583-
_batchSize = determine_batch_size();
584-
}
573+
_batchSize = determine_batch_size();
574+
575+
OPENVINO_ASSERT(_zeroInitStruct->getCommandQueueDdiTable().version() >= ZE_MAKE_VERSION(1, 1) ||
576+
!config.get<RUN_INFERENCES_SEQUENTIALLY>(),
577+
"Running inferences sequentially is not supported by the current driver");
585578

586579
// To ensure that the initialization of the graph does not exit prematurely due to nullptrs
587580
_init_completed.store(true, std::memory_order_release);
@@ -668,10 +661,6 @@ DynamicGraph::~DynamicGraph() {
668661
if (!_lastSubmittedEvent.empty()) {
669662
_lastSubmittedEvent.clear();
670663
}
671-
672-
if (_commandQueue != nullptr) {
673-
_commandQueue.reset();
674-
}
675664
}
676665

677666
void DynamicGraph::execute(const std::shared_ptr<ZeroInitStructsHolder>& zeroInitStruct,

0 commit comments

Comments
 (0)