Skip to content

Commit a85e388

Browse files
Carlosespicurmergify[bot]
authored andcommitted
Set different content filter signatures for each requester (#5972)
* Refs #23568: Set different content filter signatures for each requester Signed-off-by: Carlosespicur <[email protected]> * Refs #23568: Add test Signed-off-by: Carlosespicur <[email protected]> --------- Signed-off-by: Carlosespicur <[email protected]> (cherry picked from commit 5e01f49)
1 parent 29bc46a commit a85e388

File tree

6 files changed

+68
-3
lines changed

6 files changed

+68
-3
lines changed

src/cpp/fastdds/rpc/RequesterImpl.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <fastdds/dds/domain/qos/RequesterQos.hpp>
2727
#include <fastdds/dds/log/Log.hpp>
2828
#include <fastdds/dds/rpc/RequestInfo.hpp>
29+
#include <fastdds/dds/topic/ContentFilteredTopic.hpp>
2930
#include <fastdds/rtps/common/Guid.hpp>
3031
#include <fastdds/rtps/common/SequenceNumber.hpp>
3132
#include <fastdds/rtps/common/WriteParams.hpp>
@@ -248,16 +249,24 @@ ReturnCode_t RequesterImpl::create_dds_entities(
248249
return RETCODE_ERROR;
249250
}
250251

252+
ContentFilteredTopic* reply_topic = service_->get_reply_filtered_topic();
253+
251254
requester_reader_ =
252255
service_->get_subscriber()->create_datareader(
253-
service_->get_reply_filtered_topic(), qos.reader_qos, this, StatusMask::subscription_matched());
256+
reply_topic, qos.reader_qos, this, StatusMask::subscription_matched());
254257

255258
if (!requester_reader_)
256259
{
257260
EPROSIMA_LOG_ERROR(REQUESTER, "Error creating reply reader");
258261
return RETCODE_ERROR;
259262
}
260263

264+
// Set the content filter signature to be different from the one used in other requesters
265+
std::stringstream guid;
266+
guid << requester_reader_->guid();
267+
std::vector<std::string> expression_parameters;
268+
reply_topic->set_filter_expression(guid.str(), expression_parameters);
269+
261270
// Set the related entity key on both entities
262271
requester_reader_->set_related_datawriter(requester_writer_);
263272
requester_writer_->set_related_datareader(requester_reader_);

test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,21 @@ void ReqRepHelloWorldReplier::init_with_custom_qos(
134134
}
135135

136136
void ReqRepHelloWorldReplier::wait_discovery()
137+
{
138+
wait_discovery(1, 1);
139+
}
140+
141+
void ReqRepHelloWorldReplier::wait_discovery(
142+
unsigned int min_pub_matched,
143+
unsigned int min_sub_matched)
137144
{
138145
std::unique_lock<std::mutex> lock(mutexDiscovery_);
139146

140147
std::cout << "Replier is waiting discovery..." << std::endl;
141148

142149
cvDiscovery_.wait(lock, [&]()
143150
{
144-
return pub_matched_ > 0 && sub_matched_ > 0;
151+
return pub_matched_ >= min_pub_matched && sub_matched_ >= min_sub_matched;
145152
});
146153

147154
std::cout << "Replier discovery finished..." << std::endl;

test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ class ReqRepHelloWorldReplier
7070

7171
void wait_discovery();
7272

73+
void wait_discovery(
74+
unsigned int min_pub_matched,
75+
unsigned int min_sub_matched);
76+
7377
void matched(
7478
bool is_pub);
7579

test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,21 @@ void ReqRepHelloWorldRequester::block(
149149
}
150150

151151
void ReqRepHelloWorldRequester::wait_discovery()
152+
{
153+
wait_discovery(1, 1);
154+
}
155+
156+
void ReqRepHelloWorldRequester::wait_discovery(
157+
unsigned int min_pub_matched,
158+
unsigned int min_sub_matched)
152159
{
153160
std::unique_lock<std::mutex> lock(mutexDiscovery_);
154161

155162
std::cout << "Requester is waiting discovery..." << std::endl;
156163

157164
cvDiscovery_.wait(lock, [&]()
158165
{
159-
return pub_matched_ > 0 && sub_matched_ > 0;
166+
return pub_matched_ >= min_pub_matched && sub_matched_ >= min_sub_matched;
160167
});
161168

162169
std::cout << "Requester discovery finished..." << std::endl;

test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ class ReqRepHelloWorldRequester
7676

7777
void wait_discovery();
7878

79+
void wait_discovery(
80+
unsigned int min_pub_matched,
81+
unsigned int min_sub_matched);
82+
7983
void matched(
8084
bool is_pub);
8185

test/blackbox/common/DDSBlackboxTestsRPC.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,3 +236,37 @@ TEST(RPC, requester_unmatched_during_request_processing)
236236
// Check that the reply took at least the wait_matching timeout (3 secs)
237237
ASSERT_GT(reply_elapsed, Duration_t{2});
238238
}
239+
240+
/**
241+
* Test RPC communication with multiple requesters and one replier.
242+
*
243+
* This test checks that multiple requesters can send requests to a single replier
244+
* and receive replies correctly.
245+
*/
246+
TEST(RPC, multiple_requesters_one_replier)
247+
{
248+
ReqRepHelloWorldRequester requester_1;
249+
ReqRepHelloWorldRequester requester_2;
250+
ReqRepHelloWorldReplier replier;
251+
252+
// Initialize the requesters and the replier
253+
requester_1.init();
254+
ASSERT_TRUE(requester_1.isInitialized());
255+
requester_2.init();
256+
ASSERT_TRUE(requester_2.isInitialized());
257+
replier.init();
258+
ASSERT_TRUE(replier.isInitialized());
259+
260+
// Wait for discovery
261+
requester_1.wait_discovery();
262+
requester_2.wait_discovery();
263+
replier.wait_discovery(2, 2);
264+
265+
// Send requests from both requesters
266+
requester_1.send(1);
267+
requester_2.send(2);
268+
269+
// Block to wait for replies
270+
requester_1.block(std::chrono::seconds(5));
271+
requester_2.block(std::chrono::seconds(5));
272+
}

0 commit comments

Comments
 (0)