Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ create real-time applications.
process memory to avoid memory page faults
* [minimal_scheduling](minimal_scheduling/README.md): shows how to configure the thread(s)
scheduling policy and priority
* [minimal_data_sharing](minimal_data_sharing/README.md): shows how to use different data sharing approached to avoid
blocking calls when sharing data between real-time and non real-time threads

TODO: Add other packages after review (currently in rolling-experimental branch):
* minimal_cpu_affinity: shows how to set the process and threads CPU affinity
Expand All @@ -19,7 +21,6 @@ does not allocate dynamic memory
* minimal_memory_allocator: shows how to use different memory strategies to avoid dynamic
memory allocations during the runtime phase
* minimal_loaned_messages: shows how to use loaned messages APIs
* minimal_data_sharing: shows how to use different data sharing approached to avoid
blocking calls when sharing data between real-time and non real-time threads
* minimal_dds_tuning: shows how to use DDS specific configurations to tune real-time
related settings (TODO: add XML profiles for more DDS implementations)
Expand Down
52 changes: 52 additions & 0 deletions minimal_data_sharing/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
cmake_minimum_required(VERSION 3.5)
project(minimal_data_sharing)

# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 14)
endif()

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)
find_package(iceoryx_utils REQUIRED)
find_package(apex_containers REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(trajectory_msgs REQUIRED)
find_package(pendulum_msgs REQUIRED)

add_executable(minimal_atomic_message minimal_atomic_message.cpp)
ament_target_dependencies(minimal_atomic_message rclcpp std_msgs)

add_executable(minimal_atomic_parameter minimal_atomic_parameter.cpp)
ament_target_dependencies(minimal_atomic_parameter rclcpp std_msgs)

add_executable(minimal_lock_free_queue_logging minimal_lock_free_queue_logging.cpp)
ament_target_dependencies(minimal_lock_free_queue_logging
rclcpp std_msgs iceoryx_utils apex_containers)

add_executable(minimal_lock_free_queue minimal_lock_free_queue.cpp)
ament_target_dependencies(minimal_lock_free_queue rclcpp iceoryx_utils trajectory_msgs)

add_executable(minimal_pendulum_controller minimal_pendulum_controller.cpp)
ament_target_dependencies(minimal_pendulum_controller rclcpp iceoryx_utils trajectory_msgs pendulum_msgs)

install(TARGETS
minimal_atomic_message
minimal_atomic_parameter
minimal_lock_free_queue
minimal_lock_free_queue_logging
minimal_pendulum_controller
DESTINATION lib/${PROJECT_NAME}
)

if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()
endif()

ament_package()
63 changes: 63 additions & 0 deletions minimal_data_sharing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# minimal_data_sharing

## How to run

### minimal_atomic_message

In this example, we use a message as a counter for some publishers publishing concurrently. A
subcriber received the messages from the publishers and checks there are no repeated values. If
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
subcriber received the messages from the publishers and checks there are no repeated values. If
subscriber received the messages from the publishers and checks there are no repeated values. If

that is the case it will log a warning message.

In the first case, a standard message is used and the message data is incremented in a non
tread-safe fashion. This could lead to race conditions and some repeated values might be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tread-safe fashion. This could lead to race conditions and some repeated values might be
thread-safe fashion. This could lead to race conditions and some repeated values might be

observed by the subscriber.

```bash
$ ros2 run minimal_data_sharing minimal_atomic_message
```

<script id="asciicast-rMaUcBvvdnnbNWhnywsM2Yy8y" src="https://asciinema.org/a/rMaUcBvvdnnbNWhnywsM2Yy8y.js" async></script>

In the second case, using `--use-atomic` we will use an atomic message. Note, this is only
possible for very small messages, usually with just one field. For larger messages it's not
possible to define an atomic message type. The message is incremented using a CAS loop, therefore
we expect the increment to be executed atomically and the subscriber should not observe any
repeated value in this case.

```bash
$ ros2 run minimal_data_sharing minimal_atomic_message --use-atomic
```

<script id="asciicast-ZNvZ2jnrVvQy89su8vnyPd9qZ" src="https://asciinema.org/a/ZNvZ2jnrVvQy89su8vnyPd9qZ.js" async></script>

### minimal_atomic_parameter

```bash
$ ros2 run minimal_data_sharing minimal_atomic_parameter
```

Reset the count parameter:

```bash
ros2 param set /minimal_publisher count 0
```

### minimal_lock_free_queue

```bash
$ ros2 run minimal_data_sharing minimal_lock_free_queue
```

### minimal_lock_free_queue_logging

```bash
$ ros2 run minimal_data_sharing minimal_lock_free_queue_logging
```

## Resources

- Matthias Killat - Lock-free programming for real-time systems - Meeting C++ 2021
([video](https://www.youtube.com/watch?v=j2AgjFSFgRc))
- [Eclipse iceoryx hoofs overview](https://github.com/eclipse-iceoryx/iceoryx/tree/master/iceoryx_hoofs#concurrent)
- Real time 101 - David Rowland & Fabian Renn Giles - Meeting C++ 2019
([video](https://www.youtube.com/watch?v=ndeN983j_GQ))
157 changes: 157 additions & 0 deletions minimal_data_sharing/minimal_atomic_message.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2022 Carlos San Vicente
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <rcutils/cmdline_parser.h>

#include <chrono>
#include <memory>
#include <string>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/u_int32.hpp"

using namespace std::chrono_literals;

namespace
{
constexpr const std::string_view OPTION_USE_ATOMIC = "--use-atomic";

void print_usage()
{
printf("\nUsage:\n");
printf(
"%s : use atomic increment for the message data."
" Defaults to false.\n", OPTION_USE_ATOMIC.data());
}
} // namespace

class MinimalPublisher : public rclcpp::Node
{
public:
explicit MinimalPublisher(bool use_atomic)
: Node("minimal_publisher"),
atomic_msg_(std_msgs::msg::UInt32()),
msg_(std_msgs::msg::UInt32()),
use_atomic_{use_atomic}
{
// Make the callback groups reentrant so they can run concurrently
auto reentrant_callback_group =
this->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
rclcpp::PublisherOptions options;
options.callback_group = reentrant_callback_group;
auto publisher = this->create_publisher<std_msgs::msg::UInt32>("topic", 10U, options);
auto timer_callback =
[this, publisher]() -> void {
// The message data will be incremented concurrently and then published
std_msgs::msg::UInt32 msg;
if (use_atomic_) {
msg = increment_atomic_message_data();
} else {
msg = increment_message_data();
}
publisher->publish(msg);
};
// Crate as many publishers and CPU cores
const auto num_pub = std::thread::hardware_concurrency();
for (auto pub_index = 0U; pub_index < num_pub; pub_index++) {
timers_.push_back(this->create_wall_timer(1ms, timer_callback, reentrant_callback_group));
}
}

std_msgs::msg::UInt32 increment_atomic_message_data()
{
// we use a CAS loop to increment the data
// Note it is not recommended to implement custom lock-free algorithms since it is
// prone to errors. This is just for demonstration purposes only.
auto old_value = atomic_msg_.load();
auto new_value = old_value;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant, loop also does this (including increment)

new_value.data++;
do {
new_value = old_value;
new_value.data++;
} while(!atomic_msg_.compare_exchange_weak(old_value, new_value));
return new_value;
}

std_msgs::msg::UInt32 increment_message_data()
{
// this increment is not thread safe, race conditions are expected
msg_.data++;
return msg_;
}

private:
using AtomicUint32Msg = std::atomic<std_msgs::msg::UInt32>;

// Note: atomic can generate code with mutexes in it (also platform-dependent).
// When using atomics, always check if it is lock-free.
static_assert(AtomicUint32Msg::is_always_lock_free);

std::vector<rclcpp::TimerBase::SharedPtr> timers_;
AtomicUint32Msg atomic_msg_;
std_msgs::msg::UInt32 msg_;
bool use_atomic_;
};

class MinimalSubscriber : public rclcpp::Node
{
public:
MinimalSubscriber()
: Node("minimal_subscriber")
{
subscription_ = this->create_subscription<std_msgs::msg::UInt32>(
"topic",
10U,
[this](std_msgs::msg::UInt32::UniquePtr msg) {
// The received message is expected to be unique if the data was incremented atomically
// We print a warning if this is not the case
auto search = received_values_.find(msg->data);
if (search != received_values_.end()) {
RCLCPP_WARN(this->get_logger(), "I heard a repeated value: '%d'", msg->data);
}
received_values_.insert(msg->data);
});
}

private:
rclcpp::Subscription<std_msgs::msg::UInt32>::SharedPtr subscription_;
std::set<std::uint32_t> received_values_;
};

int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);

// Argument count and usage
if (rcutils_cli_option_exist(argv, argv + argc, "-h")) {
print_usage();
return EXIT_SUCCESS;
}

bool use_atomic{false};
if (rcutils_cli_option_exist(argv, argv + argc, OPTION_USE_ATOMIC.data())) {
use_atomic = true;
}

std::cout << "Use atomic: " << std::to_string(use_atomic) << std::endl;

auto publishers = std::make_shared<MinimalPublisher>(use_atomic);
auto subscriber = std::make_shared<MinimalSubscriber>();
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(publishers);
executor.add_node(subscriber);
executor.spin();
rclcpp::shutdown();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe print the value of msg_ at the end? This way we can ensure no race condition occured by asserting against an excepted total? That would require shutdown tho and I don't think the code shuts down by itself?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not know how long it runs though, so there is no expected total. We could store the maximum published value of each timer callback and check that the maximum over those values matches the final msg_. Not sure whether this is worth it since this does not tell us that we really published each value only once.

For this we would need bookkeeping while publishing (and maximum only would not suffice). This gets complicated fast if one thread can lag behind arbitrarily (and hence not publish its value).

We ignore overflow issues here (system does not run long enough).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about just showing the published message using ros2 topic echo. If this is enough I will add this in the README explaining what to expect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shuhaowu @MatthiasKillat I updated the demo following your suggestions. What I'm doing now is creating a subscriber and checking that there are no repeated count values. Also, I compare 2 cases, first using atomics and the second incrementing a normal integer.

I found it's hard to enforce race conditions, in my laptop, this happens with the system in IDLE state while in the RPI this happens when I stress the system.

Here are some recordings:

Let me know if this approach makes sense to you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enforcing race conditions is always difficult. However I think the demo you have here is very good as it shows the difference quickly (even though this is not necessarily a proof that atomics won't have a race, but we know from first principle that it should not).

return 0;
}
Loading