Skip to content

Commit da809fd

Browse files
CarlosespicurMiguelCompany
authored andcommitted
Set different content filter signatures for each requester (#5972)
* Refs #23568: Set different content filter signatures for each requester Signed-off-by: Carlosespicur <[email protected]> * Refs #23568: Add test Signed-off-by: Carlosespicur <[email protected]> --------- Signed-off-by: Carlosespicur <[email protected]> (cherry picked from commit 5e01f49) # Conflicts: # src/cpp/fastdds/rpc/RequesterImpl.cpp # test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.cpp # test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.hpp # test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp # test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.hpp # test/blackbox/common/DDSBlackboxTestsRPC.cpp
1 parent 985ceeb commit da809fd

File tree

6 files changed

+202
-0
lines changed

6 files changed

+202
-0
lines changed

src/cpp/fastdds/rpc/RequesterImpl.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <fastdds/dds/domain/qos/RequesterQos.hpp>
2626
#include <fastdds/dds/log/Log.hpp>
2727
#include <fastdds/dds/rpc/RequestInfo.hpp>
28+
#include <fastdds/dds/topic/ContentFilteredTopic.hpp>
2829
#include <fastdds/rtps/common/Guid.hpp>
2930
#include <fastdds/rtps/common/SequenceNumber.hpp>
3031
#include <fastdds/rtps/common/WriteParams.hpp>
@@ -210,16 +211,35 @@ ReturnCode_t RequesterImpl::create_dds_entities(
210211
return RETCODE_ERROR;
211212
}
212213

214+
ContentFilteredTopic* reply_topic = service_->get_reply_filtered_topic();
215+
213216
requester_reader_ =
214217
service_->get_subscriber()->create_datareader(
218+
<<<<<<< HEAD
215219
service_->get_reply_filtered_topic(), qos.reader_qos, nullptr);
220+
=======
221+
reply_topic, qos.reader_qos, this, StatusMask::subscription_matched());
222+
>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))
216223

217224
if (!requester_reader_)
218225
{
219226
EPROSIMA_LOG_ERROR(REQUESTER, "Error creating reply reader");
220227
return RETCODE_ERROR;
221228
}
222229

230+
<<<<<<< HEAD
231+
=======
232+
// Set the content filter signature to be different from the one used in other requesters
233+
std::stringstream guid;
234+
guid << requester_reader_->guid();
235+
std::vector<std::string> expression_parameters;
236+
reply_topic->set_filter_expression(guid.str(), expression_parameters);
237+
238+
// Set the related entity key on both entities
239+
requester_reader_->set_related_datawriter(requester_writer_);
240+
requester_writer_->set_related_datareader(requester_reader_);
241+
242+
>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))
223243
return RETCODE_OK;
224244
}
225245

test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,25 @@ void ReqRepHelloWorldReplier::newNumber(
114114
}
115115

116116
void ReqRepHelloWorldReplier::wait_discovery()
117+
{
118+
wait_discovery(1, 1);
119+
}
120+
121+
void ReqRepHelloWorldReplier::wait_discovery(
122+
unsigned int min_pub_matched,
123+
unsigned int min_sub_matched)
117124
{
118125
std::unique_lock<std::mutex> lock(mutexDiscovery_);
119126

120127
std::cout << "Replier is waiting discovery..." << std::endl;
121128

122129
cvDiscovery_.wait(lock, [&]()
123130
{
131+
<<<<<<< HEAD
124132
return matched_ > 1;
133+
=======
134+
return pub_matched_ >= min_pub_matched && sub_matched_ >= min_sub_matched;
135+
>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))
125136
});
126137

127138
std::cout << "Replier discovery finished..." << std::endl;

test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,16 @@ class ReqRepHelloWorldReplier
6767

6868
void wait_discovery();
6969

70+
<<<<<<< HEAD
7071
void matched();
72+
=======
73+
void wait_discovery(
74+
unsigned int min_pub_matched,
75+
unsigned int min_sub_matched);
76+
77+
void matched(
78+
bool is_pub);
79+
>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))
7180

7281
eprosima::fastdds::dds::ReplierQos create_replier_qos();
7382

test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,25 @@ void ReqRepHelloWorldRequester::block(
148148
}
149149

150150
void ReqRepHelloWorldRequester::wait_discovery()
151+
{
152+
wait_discovery(1, 1);
153+
}
154+
155+
void ReqRepHelloWorldRequester::wait_discovery(
156+
unsigned int min_pub_matched,
157+
unsigned int min_sub_matched)
151158
{
152159
std::unique_lock<std::mutex> lock(mutexDiscovery_);
153160

154161
std::cout << "Requester is waiting discovery..." << std::endl;
155162

156163
cvDiscovery_.wait(lock, [&]()
157164
{
165+
<<<<<<< HEAD
158166
return matched_ > 1;
167+
=======
168+
return pub_matched_ >= min_pub_matched && sub_matched_ >= min_sub_matched;
169+
>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))
159170
});
160171

161172
std::cout << "Requester discovery finished..." << std::endl;

test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,16 @@ class ReqRepHelloWorldRequester
7676

7777
void wait_discovery();
7878

79+
<<<<<<< HEAD
7980
void matched();
81+
=======
82+
void wait_discovery(
83+
unsigned int min_pub_matched,
84+
unsigned int min_sub_matched);
85+
86+
void matched(
87+
bool is_pub);
88+
>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))
8089

8190
/**
8291
* Sends a request without checking the matching status.

test/blackbox/common/DDSBlackboxTestsRPC.cpp

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,145 @@ TEST(RPC, ThrowExceptions)
128128
EXPECT_THROW(throw rpc::RemoteUnsupportedError("Still not implemented"), rpc::RpcRemoteException);
129129
EXPECT_THROW(throw rpc::RemoteUnsupportedError("Still not implemented"), rpc::RpcException);
130130
}
131+
<<<<<<< HEAD
132+
=======
133+
134+
/**
135+
* RPC enhanced discovery algotithm.
136+
*
137+
* This test checks that the requester correctly behaves when
138+
* the replier is still unmatched and the request is sent.
139+
*/
140+
TEST(RPC, replier_unmatched_before_sending_request)
141+
{
142+
ReqRepHelloWorldRequester requester;
143+
ReqRepHelloWorldReplier replier;
144+
145+
// Initialize the requester and replier
146+
requester.init();
147+
ASSERT_TRUE(requester.isInitialized());
148+
149+
// Write a request, expecting it to fail
150+
requester.send(0, [](
151+
eprosima::fastdds::dds::rpc::Requester* requester,
152+
eprosima::fastdds::dds::rpc::RequestInfo* info,
153+
void* request)
154+
{
155+
ASSERT_EQ(requester->send_request(request,
156+
*info), eprosima::fastdds::dds::RETCODE_PRECONDITION_NOT_MET);
157+
});
158+
159+
auto future_send = std::async(std::launch::async, [&requester]()
160+
{
161+
// Write a request, this time matching after 300ms
162+
requester.send(0);
163+
});
164+
165+
// At the same time, initialize the replier
166+
std::this_thread::sleep_for(std::chrono::milliseconds(300));
167+
replier.init();
168+
ASSERT_TRUE(replier.isInitialized());
169+
170+
// The requester should now be able to receive the reply
171+
requester.block(std::chrono::seconds(5));
172+
}
173+
174+
/**
175+
* RPC enhanced discovery algotithm.
176+
*
177+
* Requester is unmatched during request processing in the server side.
178+
* This test checks that after waiting for the timeout, the send_reply()
179+
* fails.
180+
*/
181+
TEST(RPC, requester_unmatched_during_request_processing)
182+
{
183+
std::shared_ptr<ReqRepHelloWorldRequester> requester = std::make_shared<ReqRepHelloWorldRequester>();
184+
185+
std::condition_variable replier_finished_cv;
186+
std::atomic<bool> finished{false};
187+
std::mutex replier_finished_mutex;
188+
eprosima::fastdds::dds::Duration_t reply_elapsed;
189+
190+
// Simulate a Replier with heavy processing
191+
ReqRepHelloWorldReplier replier
192+
([&replier_finished_cv, &finished, &reply_elapsed](eprosima::fastdds::dds::rpc::RequestInfo& info,
193+
eprosima::fastdds::dds::rpc::Replier* replier,
194+
const void* const request)
195+
{
196+
// Simulate heavy processing
197+
std::this_thread::sleep_for(std::chrono::seconds(2));
198+
const HelloWorld* hello_request = static_cast<const HelloWorld*>(request);
199+
ASSERT_EQ(hello_request->message().compare("HelloWorld"), 0);
200+
HelloWorld reply;
201+
202+
Duration_t t0, t1;
203+
Duration_t::now(t0);
204+
205+
// send_reply() should fail because the requester will be unmatched
206+
ASSERT_EQ(replier->send_reply((void*)&reply, info), eprosima::fastdds::dds::RETCODE_NO_DATA);
207+
finished.store(true);
208+
Duration_t::now(t1);
209+
reply_elapsed = t1 - t0;
210+
replier_finished_cv.notify_one();
211+
});
212+
213+
// Initialize the requester and replier
214+
requester->init();
215+
ASSERT_TRUE(requester->isInitialized());
216+
replier.init();
217+
ASSERT_TRUE(replier.isInitialized());
218+
219+
// Wait for discovery
220+
requester->wait_discovery();
221+
replier.wait_discovery();
222+
223+
// Write a request
224+
requester->send(0);
225+
std::this_thread::sleep_for(std::chrono::milliseconds(300));
226+
requester.reset();
227+
228+
// Wait for the replier to finish processing
229+
std::unique_lock<std::mutex> lock(replier_finished_mutex);
230+
replier_finished_cv.wait(lock, [&finished]()
231+
{
232+
return finished.load();
233+
});
234+
ASSERT_TRUE(finished.load());
235+
// Check that the reply took at least the wait_matching timeout (3 secs)
236+
ASSERT_GT(reply_elapsed, Duration_t{2});
237+
}
238+
239+
/**
240+
* Test RPC communication with multiple requesters and one replier.
241+
*
242+
* This test checks that multiple requesters can send requests to a single replier
243+
* and receive replies correctly.
244+
*/
245+
TEST(RPC, multiple_requesters_one_replier)
246+
{
247+
ReqRepHelloWorldRequester requester_1;
248+
ReqRepHelloWorldRequester requester_2;
249+
ReqRepHelloWorldReplier replier;
250+
251+
// Initialize the requesters and the replier
252+
requester_1.init();
253+
ASSERT_TRUE(requester_1.isInitialized());
254+
requester_2.init();
255+
ASSERT_TRUE(requester_2.isInitialized());
256+
replier.init();
257+
ASSERT_TRUE(replier.isInitialized());
258+
259+
// Wait for discovery
260+
requester_1.wait_discovery();
261+
requester_2.wait_discovery();
262+
replier.wait_discovery(2, 2);
263+
264+
// Send requests from both requesters
265+
requester_1.send(1);
266+
requester_2.send(2);
267+
268+
// Block to wait for replies
269+
requester_1.block(std::chrono::seconds(5));
270+
requester_2.block(std::chrono::seconds(5));
271+
}
272+
>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))

0 commit comments

Comments
 (0)