Skip to content

Commit c2e8d82

Browse files
committed
gtests: graph: api: test concurrent execution
1 parent f8843fe commit c2e8d82

File tree

2 files changed

+226
-0
lines changed

2 files changed

+226
-0
lines changed

tests/gtests/graph/api/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ FILE(GLOB API_TEST_ENGINE_DEPENDENT_SOURCES
7070
${CMAKE_CURRENT_SOURCE_DIR}/test_c_api_compile.cpp
7171
${CMAKE_CURRENT_SOURCE_DIR}/test_cpp_api_compile.cpp
7272
${CMAKE_CURRENT_SOURCE_DIR}/test_cpp_api_partition.cpp
73+
${CMAKE_CURRENT_SOURCE_DIR}/test_cpp_api_concurrent_execution.cpp
7374
)
7475

7576
foreach(TEST_FILE ${API_TEST_ENGINE_INDEPENDENT_SOURCES})
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*******************************************************************************
2+
* Copyright 2026 Intel Corporation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*******************************************************************************/
16+
17+
#include <atomic>
18+
#include <iostream>
19+
#include <memory>
20+
#include <thread>
21+
#include <vector>
22+
23+
#include "oneapi/dnnl/dnnl_graph.hpp"
24+
#include "test_api_common.hpp"
25+
#include "gtest/gtest.h"
26+
27+
using namespace dnnl::graph;
28+
29+
struct sdpa_dims_t {
30+
logical_tensor::dim mb;
31+
logical_tensor::dim seq_len;
32+
logical_tensor::dim head_num;
33+
logical_tensor::dim head_size;
34+
logical_tensor::dim query_num;
35+
};
36+
37+
const int num_threads = 4;
38+
// execution times for each thread to run the compiled partition.
39+
const int num_executions = 500;
40+
41+
// Helper function to create SDPA graph
42+
std::pair<dnnl::graph::graph, std::vector<logical_tensor>> create_sdpa_graph(
43+
dnnl::engine::kind engine_kind, logical_tensor::data_type dt,
44+
const sdpa_dims_t &p) {
45+
46+
// Prepare input and output shapes
47+
const dims_t qv_sz = {p.mb, p.head_num, p.query_num, p.head_size};
48+
const dims_t k_sz = {p.mb, p.head_num, p.seq_len, p.head_size};
49+
const dims_t score_sz = {p.mb, p.head_num, p.query_num, p.seq_len};
50+
const dims_t scale_sz = {1};
51+
const dims_t mask_sz = {p.mb, 1, p.query_num, p.seq_len};
52+
53+
// Incremental IDs for logical tensors and operations
54+
size_t id = 0;
55+
56+
// Intermediate data type
57+
const logical_tensor::data_type dt_inter = logical_tensor::data_type::f32;
58+
59+
// Create logical tensors
60+
auto query = logical_tensor(
61+
id++, dt, qv_sz, logical_tensor::layout_type::strided);
62+
auto key = logical_tensor(
63+
id++, dt, k_sz, logical_tensor::layout_type::strided);
64+
auto score = logical_tensor(
65+
id++, dt_inter, score_sz, logical_tensor::layout_type::strided);
66+
auto bmm1 = op(id++, op::kind::MatMul, "bmm1");
67+
bmm1.set_attr<bool>(op::attr::transpose_b, true);
68+
bmm1.add_inputs({query, key});
69+
bmm1.add_outputs({score});
70+
71+
// Scale operation
72+
auto scale = logical_tensor(
73+
id++, dt, scale_sz, logical_tensor::layout_type::strided);
74+
auto scaled_score = logical_tensor(
75+
id++, dt_inter, score_sz, logical_tensor::layout_type::strided);
76+
auto scale_div = op(id++, op::kind::Divide, "scale_div");
77+
scale_div.add_inputs({score, scale});
78+
scale_div.add_outputs({scaled_score});
79+
80+
// Mask operation
81+
auto mask = logical_tensor(
82+
id++, dt, mask_sz, logical_tensor::layout_type::strided);
83+
auto masked_score = logical_tensor(
84+
id++, dt_inter, score_sz, logical_tensor::layout_type::strided);
85+
auto mask_add = op(id++, op::kind::Add, "mask_add");
86+
mask_add.add_inputs({scaled_score, mask});
87+
mask_add.add_outputs({masked_score});
88+
89+
// Softmax
90+
auto probs = logical_tensor(
91+
id++, dt, score_sz, logical_tensor::layout_type::strided);
92+
auto softmax = op(id++, op::kind::SoftMax, "softmax");
93+
softmax.set_attr<int64_t>(op::attr::axis, -1);
94+
softmax.add_inputs({masked_score});
95+
softmax.add_outputs({probs});
96+
97+
// Final matmul
98+
auto value = logical_tensor(
99+
id++, dt, k_sz, logical_tensor::layout_type::strided);
100+
auto output = logical_tensor(
101+
id++, dt, qv_sz, logical_tensor::layout_type::strided);
102+
auto bmm2 = op(id++, op::kind::MatMul, "bmm2");
103+
bmm2.add_inputs({probs, value});
104+
bmm2.add_outputs({output});
105+
106+
// Construct graph
107+
dnnl::graph::graph sdpa_graph(engine_kind);
108+
sdpa_graph.add_op(bmm1);
109+
sdpa_graph.add_op(scale_div);
110+
sdpa_graph.add_op(mask_add);
111+
sdpa_graph.add_op(softmax);
112+
sdpa_graph.add_op(bmm2);
113+
sdpa_graph.finalize();
114+
115+
// Return graph and input/output tensors
116+
std::vector<logical_tensor> tensors;
117+
tensors.push_back(query);
118+
tensors.push_back(key);
119+
tensors.push_back(scale);
120+
tensors.push_back(mask);
121+
tensors.push_back(value);
122+
tensors.push_back(output);
123+
return std::make_pair(std::move(sdpa_graph), std::move(tensors));
124+
}
125+
126+
// Thread worker function for concurrent execution
127+
void execute_partition_worker(int thread_id, const compiled_partition &cp,
128+
std::vector<logical_tensor> input_tensors, logical_tensor output_tensor,
129+
const dnnl::engine &eng, std::atomic<int> &success_count,
130+
std::atomic<int> &error_count) {
131+
std::cout << "Thread " << thread_id << " starting execution" << std::endl;
132+
try {
133+
// Create stream for this thread
134+
dnnl::stream strm(eng);
135+
136+
// each thread creates its own tensors to avoid data races.
137+
auto ts_query = tensor(input_tensors[0], eng);
138+
auto ts_key = tensor(input_tensors[1], eng);
139+
auto ts_scale = tensor(input_tensors[2], eng);
140+
auto ts_mask = tensor(input_tensors[3], eng);
141+
auto ts_value = tensor(input_tensors[4], eng);
142+
auto ts_output = tensor(output_tensor, eng);
143+
144+
std::vector<tensor> input_tensors
145+
= {ts_query, ts_key, ts_scale, ts_mask, ts_value};
146+
std::vector<tensor> output_tensors = {ts_output};
147+
for (int i = 0; i < num_executions; ++i) {
148+
cp.execute(strm, input_tensors, output_tensors);
149+
strm.wait();
150+
}
151+
152+
success_count.fetch_add(num_executions);
153+
} catch (const std::exception &e) {
154+
std::cerr << "Thread " << thread_id << " error: " << e.what()
155+
<< std::endl;
156+
error_count.fetch_add(num_executions); // Mark all executions as failed
157+
}
158+
159+
std::cout << "Thread " << thread_id << " finished execution" << std::endl;
160+
}
161+
162+
TEST(APIConcurrentExecution, SDPAConcurrentTest) {
163+
using namespace dnnl::graph;
164+
165+
dnnl::engine::kind engine_kind
166+
= static_cast<dnnl::engine::kind>(api_test_engine_kind);
167+
dnnl::engine eng = cpp_api_test_dnnl_engine_create(engine_kind);
168+
169+
// Define SDPA dimensions for test
170+
sdpa_dims_t dims = {
171+
.mb = 2, // batch size
172+
.seq_len = 128, // sequence length
173+
.head_num = 8, // number of heads
174+
.head_size = 64, // head dimension
175+
.query_num = 128 // query length
176+
};
177+
178+
logical_tensor::data_type dt = logical_tensor::data_type::f32;
179+
180+
// Create SDPA graph
181+
std::pair<dnnl::graph::graph, std::vector<logical_tensor>> graph_tensor_pair
182+
= create_sdpa_graph(engine_kind, dt, dims);
183+
dnnl::graph::graph sdpa_graph = graph_tensor_pair.first;
184+
std::vector<logical_tensor> tensors = graph_tensor_pair.second;
185+
186+
// Get partitions
187+
std::vector<partition> partitions = sdpa_graph.get_partitions();
188+
ASSERT_EQ(partitions.size(), 1U) << "Should be only one partition";
189+
190+
// Compile the partition
191+
const auto &part = partitions[0];
192+
std::vector<logical_tensor> inputs(tensors.begin(), tensors.end() - 1);
193+
std::vector<logical_tensor> outputs = {tensors.back()};
194+
compiled_partition cp = part.compile(inputs, outputs, eng);
195+
196+
// Create atomic counters to track execution results
197+
std::atomic<int> success_count {0};
198+
std::atomic<int> error_count {0};
199+
200+
// Launch the concurrent threads
201+
std::vector<std::thread> threads;
202+
for (int i = 0; i < num_threads; ++i) {
203+
std::vector<logical_tensor> thread_inputs(
204+
tensors.begin(), tensors.end() - 1);
205+
logical_tensor thread_output = tensors.back();
206+
207+
threads.emplace_back(execute_partition_worker, i, cp, thread_inputs,
208+
thread_output, eng, std::ref(success_count),
209+
std::ref(error_count));
210+
}
211+
212+
// Wait for all threads to complete
213+
for (auto &thread : threads) {
214+
thread.join();
215+
}
216+
217+
// Verify results
218+
const int expected_total = num_threads * num_executions;
219+
220+
EXPECT_EQ(error_count.load(), 0)
221+
<< "Encountered " << error_count.load() << " execution errors";
222+
EXPECT_EQ(success_count.load(), expected_total)
223+
<< "Expected " << expected_total << " successful executions, got "
224+
<< success_count.load();
225+
}

0 commit comments

Comments
 (0)