Skip to content

Conversation

@carlossvg
Copy link
Contributor

@carlossvg carlossvg commented Apr 28, 2022

This PR adds a first implemetation of the minimal_data_sharing package to show how to share data between threads in a real-time safe way.

Currently, there are some examples showing how to use std::atomics and the iceoyx library lock-free queue. I'm considering adding additional examples with more strategies:

  • exchange buffer
  • try-lock
  • timed mutex => probably not a good idea
  • mutexes with priority inheritance attributes enabled

The examples should be representative of typical use cases in ros2 code. For example, shared data set from parameters, sharing data to publish from non a non real-time context, etc

@carlossvg
Copy link
Contributor Author

FYI @MatthiasKillat

@carlossvg carlossvg force-pushed the add-minimal-data-sharing branch from aa31e16 to 5e94256 Compare April 28, 2022 12:28
Copy link
Member

@shuhaowu shuhaowu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked at the pendulum demo yet but I don't want to delay the comments any longer as it's been sitting in my queue for a few days. I'll review the pendulum demo in a bit when I get more time.

In any case this looks great. Thanks for doing this!

rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();
rclcpp::shutdown();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe print the value of msg_ at the end? This way we can ensure no race condition occured by asserting against an excepted total? That would require shutdown tho and I don't think the code shuts down by itself?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not know how long it runs though, so there is no expected total. We could store the maximum published value of each timer callback and check that the maximum over those values matches the final msg_. Not sure whether this is worth it since this does not tell us that we really published each value only once.

For this we would need bookkeeping while publishing (and maximum only would not suffice). This gets complicated fast if one thread can lag behind arbitrarily (and hence not publish its value).

We ignore overflow issues here (system does not run long enough).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about just showing the published message using ros2 topic echo. If this is enough I will add this in the README explaining what to expect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shuhaowu @MatthiasKillat I updated the demo following your suggestions. What I'm doing now is creating a subscriber and checking that there are no repeated count values. Also, I compare 2 cases, first using atomics and the second incrementing a normal integer.

I found it's hard to enforce race conditions, in my laptop, this happens with the system in IDLE state while in the RPI this happens when I stress the system.

Here are some recordings:

Let me know if this approach makes sense to you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enforcing race conditions is always difficult. However I think the demo you have here is very good as it shows the difference quickly (even though this is not necessarily a proof that atomics won't have a race, but we know from first principle that it should not).

Comment on lines 61 to 62
using JointTrajectoryPointQueue =
iox::concurrent::LockFreeQueue<trajectory_msgs::msg::JointTrajectoryPoint, 100>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to quickly document why you chose 100 to avoid copy pasting from the examples without thinking. In this case, I see the publisher is executing at 1Hz, and the subscriber is executing at 10Hz, so theoretically we can never fill up 100 unless something bad has happened and the subscriber failed to execute for 100 seconds. (in fact, you might want to lower the capacity to make error detection faster?)

Suggested change
using JointTrajectoryPointQueue =
iox::concurrent::LockFreeQueue<trajectory_msgs::msg::JointTrajectoryPoint, 100>;
// Chose a capacity of 100 as the rate of push is 1Hz and the rate of pop is 10Hz.
// Theoretically, there should never be a buffer overrun unless the pop operation froze for
// 100 seconds.
using JointTrajectoryPointQueue =
iox::concurrent::LockFreeQueue<trajectory_msgs::msg::JointTrajectoryPoint, 100>;

Also, is there a reason for this type alias to be public? I don't see it used outside of this class. It could just be defined near the queue definition itself so the code is closer together and easier to read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to quickly document why you chose 100 to avoid copy pasting from the examples without thinking. In this case, I see the publisher is executing at 1Hz, and the subscriber is executing at 10Hz, so theoretically we can never fill up 100 unless something bad has happened and the subscriber failed to execute for 100 seconds. (in fact, you might want to lower the capacity to make error detection faster?)

My first idea was that the TrajectoryController receives JointTrajectory messages at arbitrary times with arbitrary (but bounded) JointTrajectoryPoint`s. The use case would be, some GUI or higher-level task manager sending trajectories depending on different events. For example, the user sends a trajectory to pick an object, we run a program to execute a sequence of trajectories or event better, a a real-time planner sending trajectories based on computer vision. Using a fixed publishing period was just a simple way to run the example.

I guess we can make this more explcit by simulating one of this use cases so we don't use a fixed publishing period. Then we can reason about the queue size, and why we decided to buffer this amount.

Also, is there a reason for this type alias to be public? I don't see it used outside of this class. It could just be defined near the queue definition itself so the code is closer together and easier to read?

Yes, I think it makes sense to move it next to the private section if it's not used by any API.

log_message(log_msg.value());
log_msg = logger_queue_.pop();
}
rclcpp::sleep_for(100ms);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100ms is kind of a magical number and may deserve some explanation. Generally people should keep this number lower than the rate of data generation (which is 500ms) in this case. So it is a good idea to leave a comment in case people copy paste and do the wrong thing 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it's worth adding a comment. Regarding the rate, the data logging could be slower if our queue has enough capacity, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there's actually a relatively simple math formula to derive the maximum sleep time if you know the average time to process a single queue message. That said, assuming the queue emptying can keep up, the sleep is then the approximate maximum latency between logs appearing in the queue and logs appearing in the console. This value should be kept smallish to keep the latency short (and satisfy the queue length constraint), but large enough to not steal too much cycles from the CPU.

Maybe this is too much detail to explain here, which is why I thought choosing something smaller than the data generation rate at the order of 100ms is a good idea as it will not result in a very large latency in the logs appearing and also not steal too much CPU.

For the purpose of this repo, maybe we can just leave it as is this kind of detail is probably unnecessary.

@carlossvg carlossvg force-pushed the add-minimal-data-sharing branch from 815f682 to f9cd91b Compare May 11, 2022 19:03
@carlossvg carlossvg changed the title Add minimal data sharing WIP: Add minimal data sharing May 12, 2022
@carlossvg
Copy link
Contributor Author

Reposting my comment here.

I'm considering removing the minimal_atomic_message.cpp example for two reasons:

  • The example is probably not a good case. I would rather use a more complex message type with more fields and show how to update the whole message atomically.
  • One of the conclusions for this package should be: don't create your own lock-free algorithms. It's better to rely upon widely used libraries and well-tested code because is very error-prone. This example might send the wrong message (no pun intended).

Copy link
Collaborator

@MatthiasKillat MatthiasKillat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only partial review, will review the queue use cases later. I think the scenarios presented require some more description (use case, how it works in a nutshell, key takeaways).

std_msgs::msg::UInt32 increment_message_data()
{
auto old_value = msg_.load();
auto new_value = old_value;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant, loop also does this (including increment)

do {
new_value = old_value;
new_value.data++;
} while(!msg_.compare_exchange_weak(old_value, new_value));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we need this since the counter itself is not atomic (but the message is). And we check that it is atomic without mutexes being introduced (static_assert(std::atomic<std_msgs::msg::UInt32>::is_always_lock_free);).

In this simple case we could otherwise have data itself atomic and then use fetch_add or even operator++ on the atomic. This should even be possible with the following trick:

The type of data itself is int (or similar) but we can reinterpret it to be an atomic<int> and then invoke atomic operations on it. The memory layout of both is the same (if there are no hidden locks, in atomic<int> which we could check), the only difference is that the compiler will be required to ensure atomic operations and memory synchronization in the latter case.

This is required by the standard (I need to look it up again...), so not really a hack. But not intuitive either.

auto publisher = this->create_publisher<std_msgs::msg::UInt32>("topic", 10, options);
auto timer_callback =
[this, pub_index, publisher]() -> void {
auto msg = increment_message_data();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we basically share msg_ and create individual messages from it. The atomic message increment ensures we see each value only once. We do not know which value gets published by which callback though (this depends on ordering of the execution and may change from run to run).

I think it might be useful to explain the intention in a @brief section (or similar) of the MinimalPublisher class.

rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();
rclcpp::shutdown();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not know how long it runs though, so there is no expected total. We could store the maximum published value of each timer callback and check that the maximum over those values matches the final msg_. Not sure whether this is worth it since this does not tell us that we really published each value only once.

For this we would need bookkeeping while publishing (and maximum only would not suffice). This gets complicated fast if one thread can lag behind arbitrarily (and hence not publish its value).

We ignore overflow issues here (system does not run long enough).

auto timer_callback =
[this]() -> void {
auto message = std_msgs::msg::String();
message.data = "Hello, world! " + std::to_string(this->count_++);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, updates count_ atomically (visible to any calls) and publishes value.

Maybe we also need a description of the scenario in a comment, especially the key takeaway.

parameter_event_handler_ = std::make_shared<rclcpp::ParameterEventHandler>(this);
auto param_callback = [this](const rclcpp::Parameter & p) {
RCLCPP_INFO(this->get_logger(), "Count changed to: '%ld'", p.as_int());
count_.store(static_cast<std::uint32_t>(p.as_int()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this callback called? In case of the concurrent modification the last updated value wins but anyone concurrently reading (publishing) may see it (eventually) or see a later modification.

What I do not understand is that apparently this callback modifies count_, but also publish itself (operator++). Is this intended? What is the use case? I would imagine the parameter changes independently of publishing and the publish call itself does not change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this callback called? In case of the concurrent modification the last updated value wins but anyone concurrently reading (publishing) may see it (eventually) or see a later modification.

The parameter callback is invoked when the parameter is set externally (the parameter is a service under the hood). For example:

ros2 param set /minimal_publisher count 0

What I do not understand is that apparently this callback modifies count_, but also publish itself (operator++). Is this intended? What is the use case? I would imagine the parameter changes independently of publishing and the publish call itself does not change it.

In this example, I wanted to show a ROS parameter that is used from a real-time thread (publishing) but also can be modified from a non-real-time thread (parameter callback).

I used the minimal_publisher as a base to construct this example, so the count was the first thing that came to my mind to make it a parameter. It is probably not a meanful example so we can change this with other thing so it's more descriptive. As we do with the PID parameter example.

One option could be to use something like a "multiplier" parameter. The count would be a normal value which is increased in each publishing. We publish mulitplier x count. The multiplier value could be changed from the parameter callback.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shuhaowu @MatthiasKillat I updated this example. I re-used the example from

In this case, there is nothing interesting to show in the example output besides some sine wave data. We could simply show some code snippets.

});

default_callback_group_executor.spin();
realtime_thread.join();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be wrong, but this thread will never join as it spins forever right? I think Ctrl+C or similar will not properly shutdown the application by calling rclcpp::shutdown. How is the shutdown supposed to work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should work. The default signal handler will shut down the context when Ctrl+C is used (see utilities.hpp. Then, the spin will unblock when the context is shut down (see executor.cpp).

@carlossvg carlossvg force-pushed the add-minimal-data-sharing branch from 9723622 to 38ce0cb Compare May 13, 2022 21:34
@carlossvg carlossvg force-pushed the add-minimal-data-sharing branch 2 times, most recently from 9e28c40 to 097d5c9 Compare June 4, 2022 10:43
@carlossvg carlossvg force-pushed the add-minimal-data-sharing branch from 097d5c9 to 511c92a Compare June 4, 2022 11:22
@carlossvg carlossvg force-pushed the add-minimal-data-sharing branch from d707394 to 8c00f8a Compare June 5, 2022 15:00
@carlossvg carlossvg changed the title WIP: Add minimal data sharing Add minimal data sharing Jun 6, 2022
### minimal_atomic_message

In this example, we use a message as a counter for some publishers publishing concurrently. A
subcriber received the messages from the publishers and checks there are no repeated values. If
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
subcriber received the messages from the publishers and checks there are no repeated values. If
subscriber received the messages from the publishers and checks there are no repeated values. If

that is the case it will log a warning message.

In the first case, a standard message is used and the message data is incremented in a non
tread-safe fashion. This could lead to race conditions and some repeated values might be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tread-safe fashion. This could lead to race conditions and some repeated values might be
thread-safe fashion. This could lead to race conditions and some repeated values might be

log_message(log_msg.value());
log_msg = logger_queue_.pop();
}
rclcpp::sleep_for(100ms);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there's actually a relatively simple math formula to derive the maximum sleep time if you know the average time to process a single queue message. That said, assuming the queue emptying can keep up, the sleep is then the approximate maximum latency between logs appearing in the queue and logs appearing in the console. This value should be kept smallish to keep the latency short (and satisfy the queue length constraint), but large enough to not steal too much cycles from the CPU.

Maybe this is too much detail to explain here, which is why I thought choosing something smaller than the data generation rate at the order of 100ms is a good idea as it will not result in a very large latency in the logs appearing and also not steal too much CPU.

For the purpose of this repo, maybe we can just leave it as is this kind of detail is probably unnecessary.

Comment on lines +292 to +293
using PIDPropertiesBuffer = iox::concurrent::LockFreeQueue<PIDProperties, 1>;
PIDPropertiesBuffer pid_buffer_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be unused?

// \param[in] properties Struct representing the desired properties.
void set_pid_properties(const PIDProperties & properties)
{
// TODO(carlos): set data using a lock-free mechanism
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs to be done, right? As it stands this has a data race, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed in the RTWG meeting and maybe a try-lock will be easier to implement here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants