Skip to content

Commit aa31e16

Browse files
committed
Add minimal_data_sharing package
1 parent 970e48b commit aa31e16

File tree

10 files changed

+879
-1
lines changed

10 files changed

+879
-1
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
Collection of ROS 2 minimal examples showing how to leverage different POSIX and ROS 2 features to
44
create real-time applications.
55

6+
* [minimal_data_sharing](minimal_data_sharing/README.md): shows how to use different data sharing approached to avoid
7+
blocking calls when sharing data between real-time and non real-time threads
68

79
TODO: Add other packages after review (currently in rolling-experimental branch):
810
* minimal_memory_lock: shows how to lock the process memory and
@@ -15,7 +17,6 @@ does not allocate dynamic memory
1517
* minimal_memory_allocator: shows how to use different memory strategies to avoid dynamic
1618
memory allocations during the runtime phase
1719
* minimal_loaned_messages: shows how to use loaned messages APIs
18-
* minimal_data_sharing: shows how to use different data sharing approached to avoid
1920
blocking calls when sharing data between real-time and non real-time threads
2021
* minimal_dds_tuning: shows how to use DDS specific configurations to tune real-time
2122
related settings (TODO: add XML profiles for more DDS implementations)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
cmake_minimum_required(VERSION 3.5)
2+
project(minimal_data_sharing)
3+
4+
# Default to C++14
5+
if(NOT CMAKE_CXX_STANDARD)
6+
set(CMAKE_CXX_STANDARD 14)
7+
endif()
8+
9+
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
10+
add_compile_options(-Wall -Wextra -Wpedantic)
11+
endif()
12+
13+
find_package(ament_cmake REQUIRED)
14+
find_package(rclcpp REQUIRED)
15+
find_package(std_msgs REQUIRED)
16+
find_package(iceoryx_utils REQUIRED)
17+
find_package(apex_containers REQUIRED)
18+
find_package(sensor_msgs REQUIRED)
19+
find_package(trajectory_msgs REQUIRED)
20+
find_package(pendulum_msgs REQUIRED)
21+
22+
add_executable(minimal_atomic_message minimal_atomic_message.cpp)
23+
ament_target_dependencies(minimal_atomic_message rclcpp std_msgs)
24+
25+
add_executable(minimal_atomic_parameter minimal_atomic_parameter.cpp)
26+
ament_target_dependencies(minimal_atomic_parameter rclcpp std_msgs)
27+
28+
add_executable(minimal_lock_free_queue_logging minimal_lock_free_queue_logging.cpp)
29+
ament_target_dependencies(minimal_lock_free_queue_logging
30+
rclcpp std_msgs iceoryx_utils apex_containers)
31+
32+
add_executable(minimal_lock_free_queue minimal_lock_free_queue.cpp)
33+
ament_target_dependencies(minimal_lock_free_queue rclcpp iceoryx_utils trajectory_msgs)
34+
35+
add_executable(minimal_pendulum_controller minimal_pendulum_controller.cpp)
36+
ament_target_dependencies(minimal_pendulum_controller rclcpp iceoryx_utils trajectory_msgs pendulum_msgs)
37+
38+
install(TARGETS
39+
minimal_atomic_message
40+
minimal_atomic_parameter
41+
minimal_lock_free_queue
42+
minimal_lock_free_queue_logging
43+
minimal_pendulum_controller
44+
DESTINATION lib/${PROJECT_NAME}
45+
)
46+
47+
if(BUILD_TESTING)
48+
find_package(ament_lint_auto REQUIRED)
49+
ament_lint_auto_find_test_dependencies()
50+
endif()
51+
52+
ament_package()

minimal_data_sharing/README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# minimal_data_sharing
2+
3+
## How to run
4+
5+
### minimal_atomic_message
6+
7+
```bash
8+
$ ros2 run minimal_data_sharing minimal_atomic_message
9+
```
10+
11+
### minimal_atomic_parameter
12+
13+
```bash
14+
$ ros2 run minimal_data_sharing minimal_atomic_parameter
15+
```
16+
17+
Reset the count parameter:
18+
19+
```bash
20+
ros2 param set /minimal_publisher count 0
21+
```
22+
23+
### minimal_lock_free_queue
24+
25+
```bash
26+
$ ros2 run minimal_data_sharing minimal_lock_free_queue
27+
```
28+
29+
### minimal_lock_free_queue_logging
30+
31+
```bash
32+
$ ros2 run minimal_data_sharing minimal_lock_free_queue_logging
33+
```
34+
35+
## Resources
36+
37+
- Matthias Killat - Lock-free programming for real-time systems - Meeting C++ 2021
38+
([video](https://www.youtube.com/watch?v=j2AgjFSFgRc))
39+
- [Eclipse iceoryx hoofs overview](https://github.com/eclipse-iceoryx/iceoryx/tree/master/iceoryx_hoofs#concurrent)
40+
- Real time 101 - David Rowland & Fabian Renn Giles - Meeting C++ 2019
41+
([video](https://www.youtube.com/watch?v=ndeN983j_GQ))
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright 2022 Carlos San Vicente
2+
// Copyright 2016 Open Source Robotics Foundation, Inc.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
#include <chrono>
17+
#include <memory>
18+
#include <string>
19+
20+
#include "rclcpp/rclcpp.hpp"
21+
#include "std_msgs/msg/u_int32.hpp"
22+
23+
using namespace std::chrono_literals;
24+
25+
static_assert(std::atomic<std_msgs::msg::UInt32>::is_always_lock_free);
26+
27+
// Bigger data types are not lock free, this asserts would fail
28+
// static_assert(std::atomic<std_msgs::msg::String>::is_always_lock_free);
29+
30+
class MinimalPublisher : public rclcpp::Node
31+
{
32+
public:
33+
MinimalPublisher()
34+
: Node("minimal_publisher"), msg_(std_msgs::msg::UInt32())
35+
{
36+
// Make the callback groups reentrant so they can run concurrently
37+
auto reentrant_callback_group =
38+
this->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
39+
rclcpp::PublisherOptions options;
40+
options.callback_group = reentrant_callback_group;
41+
for (auto pub_index = 0; pub_index < 10; pub_index++) {
42+
auto publisher = this->create_publisher<std_msgs::msg::UInt32>("topic", 10, options);
43+
auto timer_callback =
44+
[this, pub_index, publisher]() -> void {
45+
auto msg = increment_message_data();
46+
RCLCPP_INFO(this->get_logger(), "Publisher %d: '%d'", pub_index, msg.data);
47+
publisher->publish(msg);
48+
};
49+
timers_.push_back(this->create_wall_timer(100ms, timer_callback, reentrant_callback_group));
50+
}
51+
}
52+
53+
std_msgs::msg::UInt32 increment_message_data()
54+
{
55+
auto old_value = msg_.load();
56+
auto new_value = old_value;
57+
new_value.data++;
58+
do {
59+
if (msg_.compare_exchange_strong(old_value, new_value)) {
60+
break;
61+
}
62+
} while(true);
63+
return new_value;
64+
}
65+
66+
private:
67+
std::vector<rclcpp::TimerBase::SharedPtr> timers_;
68+
std::atomic<std_msgs::msg::UInt32> msg_;
69+
};
70+
71+
int main(int argc, char * argv[])
72+
{
73+
rclcpp::init(argc, argv);
74+
75+
auto node = std::make_shared<MinimalPublisher>();
76+
rclcpp::executors::MultiThreadedExecutor executor;
77+
executor.add_node(node);
78+
executor.spin();
79+
rclcpp::shutdown();
80+
return 0;
81+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright 2022 Carlos San Vicente
2+
// Copyright 2016 Open Source Robotics Foundation, Inc.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
#include <chrono>
17+
#include <memory>
18+
#include <string>
19+
20+
#include "rclcpp/rclcpp.hpp"
21+
#include "std_msgs/msg/string.hpp"
22+
23+
using namespace std::chrono_literals;
24+
25+
static_assert(std::atomic<std::uint32_t>::is_always_lock_free);
26+
27+
class MinimalPublisher : public rclcpp::Node
28+
{
29+
public:
30+
MinimalPublisher()
31+
: Node("minimal_publisher"), count_(0)
32+
{
33+
publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);
34+
auto timer_callback =
35+
[this]() -> void {
36+
auto message = std_msgs::msg::String();
37+
message.data = "Hello, world! " + std::to_string(this->count_++);
38+
RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str());
39+
this->publisher_->publish(message);
40+
};
41+
42+
realtime_callback_group_ = this->create_callback_group(
43+
rclcpp::CallbackGroupType::MutuallyExclusive, false);
44+
timer_ = this->create_wall_timer(500ms, timer_callback, realtime_callback_group_);
45+
46+
count_ = declare_parameter("count", 0);
47+
parameter_event_handler_ = std::make_shared<rclcpp::ParameterEventHandler>(this);
48+
auto param_callback = [this](const rclcpp::Parameter & p) {
49+
RCLCPP_INFO(this->get_logger(), "Count changed to: '%ld'", p.as_int());
50+
count_.store(static_cast<std::uint32_t>(p.as_int()));
51+
};
52+
parameter_callback_handle_ =
53+
parameter_event_handler_->add_parameter_callback("count", param_callback);
54+
}
55+
56+
rclcpp::CallbackGroup::SharedPtr get_realtime_callback_group()
57+
{
58+
return realtime_callback_group_;
59+
}
60+
61+
private:
62+
rclcpp::TimerBase::SharedPtr timer_;
63+
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
64+
std::atomic<std::uint32_t> count_;
65+
rclcpp::CallbackGroup::SharedPtr realtime_callback_group_;
66+
std::shared_ptr<rclcpp::ParameterEventHandler> parameter_event_handler_;
67+
std::shared_ptr<rclcpp::ParameterCallbackHandle> parameter_callback_handle_;
68+
};
69+
70+
71+
int main(int argc, char * argv[])
72+
{
73+
rclcpp::init(argc, argv);
74+
75+
auto node = std::make_shared<MinimalPublisher>();
76+
rclcpp::executors::StaticSingleThreadedExecutor default_callback_group_executor;
77+
rclcpp::executors::StaticSingleThreadedExecutor realtime_executor;
78+
79+
default_callback_group_executor.add_node(node);
80+
realtime_executor.add_callback_group(
81+
node->get_realtime_callback_group(), node->get_node_base_interface());
82+
83+
// spin real-time tasks in a separate thread
84+
auto realtime_thread = std::thread(
85+
[&]() {
86+
realtime_executor.spin();
87+
});
88+
89+
default_callback_group_executor.spin();
90+
realtime_thread.join();
91+
92+
rclcpp::shutdown();
93+
return 0;
94+
}

0 commit comments

Comments
 (0)