diff --git a/README.md b/README.md index 410bdb4..7f57f75 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,8 @@ create real-time applications. process memory to avoid memory page faults * [minimal_scheduling](minimal_scheduling/README.md): shows how to configure the thread(s) scheduling policy and priority +* [minimal_data_sharing](minimal_data_sharing/README.md): shows how to use different data sharing approached to avoid + blocking calls when sharing data between real-time and non real-time threads TODO: Add other packages after review (currently in rolling-experimental branch): * minimal_cpu_affinity: shows how to set the process and threads CPU affinity @@ -19,7 +21,6 @@ does not allocate dynamic memory * minimal_memory_allocator: shows how to use different memory strategies to avoid dynamic memory allocations during the runtime phase * minimal_loaned_messages: shows how to use loaned messages APIs -* minimal_data_sharing: shows how to use different data sharing approached to avoid blocking calls when sharing data between real-time and non real-time threads * minimal_dds_tuning: shows how to use DDS specific configurations to tune real-time related settings (TODO: add XML profiles for more DDS implementations) diff --git a/minimal_data_sharing/CMakeLists.txt b/minimal_data_sharing/CMakeLists.txt new file mode 100644 index 0000000..7be48fe --- /dev/null +++ b/minimal_data_sharing/CMakeLists.txt @@ -0,0 +1,52 @@ +cmake_minimum_required(VERSION 3.5) +project(minimal_data_sharing) + +# Default to C++14 +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 14) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +find_package(ament_cmake REQUIRED) +find_package(rclcpp REQUIRED) +find_package(std_msgs REQUIRED) +find_package(iceoryx_utils REQUIRED) +find_package(apex_containers REQUIRED) +find_package(sensor_msgs REQUIRED) +find_package(trajectory_msgs REQUIRED) +find_package(pendulum_msgs REQUIRED) + +add_executable(minimal_atomic_message minimal_atomic_message.cpp) +ament_target_dependencies(minimal_atomic_message rclcpp std_msgs) + +add_executable(minimal_atomic_parameter minimal_atomic_parameter.cpp) +ament_target_dependencies(minimal_atomic_parameter rclcpp std_msgs) + +add_executable(minimal_lock_free_queue_logging minimal_lock_free_queue_logging.cpp) +ament_target_dependencies(minimal_lock_free_queue_logging + rclcpp std_msgs iceoryx_utils apex_containers) + +add_executable(minimal_lock_free_queue minimal_lock_free_queue.cpp) +ament_target_dependencies(minimal_lock_free_queue rclcpp iceoryx_utils trajectory_msgs) + +add_executable(minimal_pendulum_controller minimal_pendulum_controller.cpp) +ament_target_dependencies(minimal_pendulum_controller rclcpp iceoryx_utils trajectory_msgs pendulum_msgs) + +install(TARGETS + minimal_atomic_message + minimal_atomic_parameter + minimal_lock_free_queue + minimal_lock_free_queue_logging + minimal_pendulum_controller + DESTINATION lib/${PROJECT_NAME} +) + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + ament_lint_auto_find_test_dependencies() +endif() + +ament_package() diff --git a/minimal_data_sharing/README.md b/minimal_data_sharing/README.md new file mode 100644 index 0000000..67cffd0 --- /dev/null +++ b/minimal_data_sharing/README.md @@ -0,0 +1,63 @@ +# minimal_data_sharing + +## How to run + +### minimal_atomic_message + +In this example, we use a message as a counter for some publishers publishing concurrently. A +subcriber received the messages from the publishers and checks there are no repeated values. If +that is the case it will log a warning message. + +In the first case, a standard message is used and the message data is incremented in a non +tread-safe fashion. This could lead to race conditions and some repeated values might be +observed by the subscriber. + +```bash +$ ros2 run minimal_data_sharing minimal_atomic_message +``` + + + +In the second case, using `--use-atomic` we will use an atomic message. Note, this is only +possible for very small messages, usually with just one field. For larger messages it's not +possible to define an atomic message type. The message is incremented using a CAS loop, therefore +we expect the increment to be executed atomically and the subscriber should not observe any +repeated value in this case. + +```bash +$ ros2 run minimal_data_sharing minimal_atomic_message --use-atomic +``` + + + +### minimal_atomic_parameter + +```bash +$ ros2 run minimal_data_sharing minimal_atomic_parameter +``` + +Reset the count parameter: + +```bash +ros2 param set /minimal_publisher count 0 +``` + +### minimal_lock_free_queue + +```bash +$ ros2 run minimal_data_sharing minimal_lock_free_queue +``` + +### minimal_lock_free_queue_logging + +```bash +$ ros2 run minimal_data_sharing minimal_lock_free_queue_logging +``` + +## Resources + +- Matthias Killat - Lock-free programming for real-time systems - Meeting C++ 2021 +([video](https://www.youtube.com/watch?v=j2AgjFSFgRc)) +- [Eclipse iceoryx hoofs overview](https://github.com/eclipse-iceoryx/iceoryx/tree/master/iceoryx_hoofs#concurrent) +- Real time 101 - David Rowland & Fabian Renn Giles - Meeting C++ 2019 +([video](https://www.youtube.com/watch?v=ndeN983j_GQ)) diff --git a/minimal_data_sharing/minimal_atomic_message.cpp b/minimal_data_sharing/minimal_atomic_message.cpp new file mode 100644 index 0000000..37f5caf --- /dev/null +++ b/minimal_data_sharing/minimal_atomic_message.cpp @@ -0,0 +1,157 @@ +// Copyright 2022 Carlos San Vicente +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/u_int32.hpp" + +using namespace std::chrono_literals; + +namespace +{ +constexpr const std::string_view OPTION_USE_ATOMIC = "--use-atomic"; + +void print_usage() +{ + printf("\nUsage:\n"); + printf( + "%s : use atomic increment for the message data." + " Defaults to false.\n", OPTION_USE_ATOMIC.data()); +} +} // namespace + +class MinimalPublisher : public rclcpp::Node +{ +public: + explicit MinimalPublisher(bool use_atomic) + : Node("minimal_publisher"), + atomic_msg_(std_msgs::msg::UInt32()), + msg_(std_msgs::msg::UInt32()), + use_atomic_{use_atomic} + { + // Make the callback groups reentrant so they can run concurrently + auto reentrant_callback_group = + this->create_callback_group(rclcpp::CallbackGroupType::Reentrant); + rclcpp::PublisherOptions options; + options.callback_group = reentrant_callback_group; + auto publisher = this->create_publisher("topic", 10U, options); + auto timer_callback = + [this, publisher]() -> void { + // The message data will be incremented concurrently and then published + std_msgs::msg::UInt32 msg; + if (use_atomic_) { + msg = increment_atomic_message_data(); + } else { + msg = increment_message_data(); + } + publisher->publish(msg); + }; + // Crate as many publishers and CPU cores + const auto num_pub = std::thread::hardware_concurrency(); + for (auto pub_index = 0U; pub_index < num_pub; pub_index++) { + timers_.push_back(this->create_wall_timer(1ms, timer_callback, reentrant_callback_group)); + } + } + + std_msgs::msg::UInt32 increment_atomic_message_data() + { + // we use a CAS loop to increment the data + // Note it is not recommended to implement custom lock-free algorithms since it is + // prone to errors. This is just for demonstration purposes only. + auto old_value = atomic_msg_.load(); + auto new_value = old_value; + new_value.data++; + do { + new_value = old_value; + new_value.data++; + } while(!atomic_msg_.compare_exchange_weak(old_value, new_value)); + return new_value; + } + + std_msgs::msg::UInt32 increment_message_data() + { + // this increment is not thread safe, race conditions are expected + msg_.data++; + return msg_; + } + +private: + using AtomicUint32Msg = std::atomic; + + // Note: atomic can generate code with mutexes in it (also platform-dependent). + // When using atomics, always check if it is lock-free. + static_assert(AtomicUint32Msg::is_always_lock_free); + + std::vector timers_; + AtomicUint32Msg atomic_msg_; + std_msgs::msg::UInt32 msg_; + bool use_atomic_; +}; + +class MinimalSubscriber : public rclcpp::Node +{ +public: + MinimalSubscriber() + : Node("minimal_subscriber") + { + subscription_ = this->create_subscription( + "topic", + 10U, + [this](std_msgs::msg::UInt32::UniquePtr msg) { + // The received message is expected to be unique if the data was incremented atomically + // We print a warning if this is not the case + auto search = received_values_.find(msg->data); + if (search != received_values_.end()) { + RCLCPP_WARN(this->get_logger(), "I heard a repeated value: '%d'", msg->data); + } + received_values_.insert(msg->data); + }); + } + +private: + rclcpp::Subscription::SharedPtr subscription_; + std::set received_values_; +}; + +int main(int argc, char * argv[]) +{ + rclcpp::init(argc, argv); + + // Argument count and usage + if (rcutils_cli_option_exist(argv, argv + argc, "-h")) { + print_usage(); + return EXIT_SUCCESS; + } + + bool use_atomic{false}; + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_USE_ATOMIC.data())) { + use_atomic = true; + } + + std::cout << "Use atomic: " << std::to_string(use_atomic) << std::endl; + + auto publishers = std::make_shared(use_atomic); + auto subscriber = std::make_shared(); + rclcpp::executors::MultiThreadedExecutor executor; + executor.add_node(publishers); + executor.add_node(subscriber); + executor.spin(); + rclcpp::shutdown(); + return 0; +} diff --git a/minimal_data_sharing/minimal_atomic_parameter.cpp b/minimal_data_sharing/minimal_atomic_parameter.cpp new file mode 100644 index 0000000..b107879 --- /dev/null +++ b/minimal_data_sharing/minimal_atomic_parameter.cpp @@ -0,0 +1,124 @@ +// Copyright 2022 Carlos San Vicente +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/float64.hpp" + +using namespace std::chrono_literals; + +class MinimalPublisher : public rclcpp::Node +{ +public: + MinimalPublisher() + : Node("minimal_publisher") + { + publisher_ = this->create_publisher("topic", 10); + + // called on real-time thread + auto timer_callback = + [this]() -> void { + // simulate some sensor reading and processing + double sensor_data = get_sensor_data(); + auto prcessed_sensor_data = process_sensor_data(sensor_data); + auto message = std_msgs::msg::Float64(); + message.data = prcessed_sensor_data; + RCLCPP_INFO(this->get_logger(), "Publishing: '%lf'", message.data); + this->publisher_->publish(message); + }; + + // add the timer callback to a separate callback group + realtime_callback_group_ = this->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, false); + timer_ = this->create_wall_timer(500ms, timer_callback, realtime_callback_group_); + + // add parameter callback to the default callback group + gain_ = declare_parameter("gain", 1.0); + parameter_event_handler_ = std::make_shared(this); + + // called on non-real-time thread + auto param_callback = [this](const rclcpp::Parameter & p) { + auto new_gain = p.as_double(); + RCLCPP_INFO(this->get_logger(), "New gain: '%lf'", new_gain); + set_sensor_gain(new_gain); + }; + parameter_callback_handle_ = + parameter_event_handler_->add_parameter_callback("gain", param_callback); + } + + rclcpp::CallbackGroup::SharedPtr get_realtime_callback_group() const + { + return realtime_callback_group_; + } + + double process_sensor_data(double sensor_in) const + { + return sensor_in *= gain_.load(); + } + + void set_sensor_gain(double new_gain) + { + gain_.store(new_gain); + } + + double get_sensor_data() + { + // simulate some sine-wave based input + static double rad{2 * M_PI}; + rad += M_PI / 5; + return std::sin(rad); + } + +private: + // Note: atomic can generate code with mutexes in it (also platform-dependent). + // When using atomics, always check if it is lock-free. + static_assert(std::atomic::is_always_lock_free); + + rclcpp::TimerBase::SharedPtr timer_; + rclcpp::Publisher::SharedPtr publisher_; + std::atomic gain_{1.0}; + rclcpp::CallbackGroup::SharedPtr realtime_callback_group_; + std::shared_ptr parameter_event_handler_; + std::shared_ptr parameter_callback_handle_; +}; + + +int main(int argc, char * argv[]) +{ + rclcpp::init(argc, argv); + + auto node = std::make_shared(); + rclcpp::executors::StaticSingleThreadedExecutor default_callback_group_executor; + rclcpp::executors::StaticSingleThreadedExecutor realtime_executor; + + default_callback_group_executor.add_node(node); + realtime_executor.add_callback_group( + node->get_realtime_callback_group(), node->get_node_base_interface()); + + // spin real-time tasks in a separate thread + // note the thread is not configured with real-time attributes in this example + auto realtime_thread = std::thread( + [&]() { + realtime_executor.spin(); + }); + + default_callback_group_executor.spin(); + realtime_thread.join(); + + rclcpp::shutdown(); + return 0; +} diff --git a/minimal_data_sharing/minimal_lock_free_queue.cpp b/minimal_data_sharing/minimal_lock_free_queue.cpp new file mode 100644 index 0000000..b7bc0c0 --- /dev/null +++ b/minimal_data_sharing/minimal_lock_free_queue.cpp @@ -0,0 +1,142 @@ +// Copyright 2022 Carlos San Vicente +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "trajectory_msgs/msg/joint_trajectory.hpp" + +#include + + +using namespace std::chrono_literals; + +class TrajectoryGenerator : public rclcpp::Node +{ +public: + TrajectoryGenerator() + : Node("trajectory_generator"), count_(0) + { + constexpr std::size_t TRAJECTORY_SIZE = 10; + publisher_ = this->create_publisher("trajectory", 10); + auto timer_callback = + [this]() -> void { + auto message = trajectory_msgs::msg::JointTrajectory(); + message.points.reserve(TRAJECTORY_SIZE); + for (std::size_t i = 0; i < TRAJECTORY_SIZE; i++) { + trajectory_msgs::msg::JointTrajectoryPoint point; + point.positions.push_back(count_++); + message.points.push_back(point); + } + RCLCPP_INFO( + this->get_logger(), "Publish trajectory with %zu points", + message.points.size()); + this->publisher_->publish(message); + }; + timer_ = this->create_wall_timer(1s, timer_callback); + } + +private: + rclcpp::TimerBase::SharedPtr timer_; + rclcpp::Publisher::SharedPtr publisher_; + size_t count_; +}; + +class TrajectoryController : public rclcpp::Node +{ +public: + TrajectoryController() + : Node("trajectory_controller") + { + // Create a callback function for when messages are received. + // Variations of this function also exist using, for example UniquePtr for zero-copy transport. + setvbuf(stdout, NULL, _IONBF, BUFSIZ); + subscription_ = this->create_subscription( + "trajectory", + 10, + [this](trajectory_msgs::msg::JointTrajectory::UniquePtr msg) { + for (const auto & point : msg->points) { + queue_.tryPush(point); + } + RCLCPP_INFO(this->get_logger(), "Received trajectory with %zu points ", msg->points.size()); + }); + + auto timer_callback = + [this]() -> void { + auto msg = this->queue_.pop(); + while (msg.has_value()) { + execute_trajectory_point(msg.value()); + msg = this->queue_.pop(); + } + }; + realtime_callback_group_ = this->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, false); + + timer_ = this->create_wall_timer(100ms, timer_callback, realtime_callback_group_); + } + + void execute_trajectory_point(const trajectory_msgs::msg::JointTrajectoryPoint & point) + { + // Logging is not safe for RT but this is just a demo for message passing + for (std::size_t joint_index = 0; joint_index < point.positions.size(); joint_index++) { + RCLCPP_INFO( + this->get_logger(), "Trajectory point: joint %lu, position %lf", + joint_index, point.positions[joint_index]); + } + } + + rclcpp::CallbackGroup::SharedPtr get_realtime_callback_group() + { + return realtime_callback_group_; + } + +private: + using JointTrajectoryPointQueue = + iox::concurrent::LockFreeQueue; + rclcpp::TimerBase::SharedPtr timer_; + rclcpp::Subscription::SharedPtr subscription_; + JointTrajectoryPointQueue queue_; + rclcpp::CallbackGroup::SharedPtr realtime_callback_group_; +}; + + +int main(int argc, char * argv[]) +{ + rclcpp::init(argc, argv); + + auto trajectory_controller = std::make_shared(); + auto trajectory_generator = std::make_shared(); + + rclcpp::executors::StaticSingleThreadedExecutor default_callback_group_executor; + rclcpp::executors::StaticSingleThreadedExecutor realtime_executor; + + default_callback_group_executor.add_node(trajectory_generator); + default_callback_group_executor.add_node(trajectory_controller); + realtime_executor.add_callback_group( + trajectory_controller->get_realtime_callback_group(), + trajectory_controller->get_node_base_interface()); + + // spin real-time tasks in a separate thread + auto realtime_thread = std::thread( + [&]() { + realtime_executor.spin(); + }); + + default_callback_group_executor.spin(); + realtime_thread.join(); + + rclcpp::shutdown(); + return 0; +} diff --git a/minimal_data_sharing/minimal_lock_free_queue_logging.cpp b/minimal_data_sharing/minimal_lock_free_queue_logging.cpp new file mode 100644 index 0000000..139cf6a --- /dev/null +++ b/minimal_data_sharing/minimal_lock_free_queue_logging.cpp @@ -0,0 +1,99 @@ +// Copyright 2022 Carlos San Vicente +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/string.hpp" + +#include "iceoryx_utils/concurrent/lockfree_queue.hpp" +#include "string/string.hpp" + +using namespace std::chrono_literals; + +class MinimalSubscriber : public rclcpp::Node +{ +public: + using LogMessage = apex::string256_t; + using LogMessageQueue = iox::concurrent::LockFreeQueue; + + MinimalSubscriber() + : Node("minimal_subscriber"), count_(0) + { + publisher_ = this->create_publisher("topic", 10); + auto timer_callback = + [this]() -> void { + auto message = std_msgs::msg::String(); + message.data = "Hello, world! " + std::to_string(this->count_++); + logger_queue_.push(apex::varargs_to_string("Publishing: ", message.data.c_str())); + this->publisher_->publish(message); + }; + timer_ = this->create_wall_timer(500ms, timer_callback); + + subscription_ = this->create_subscription( + "topic", + 10, + [this](std_msgs::msg::String::UniquePtr msg) { + logger_queue_.push(apex::varargs_to_string("I heard: ", msg->data.c_str())); + }); + + // Note this is not an example of how to implement a real-time logger + // This only shows how pass data from a real-time thread to another + // thread to handle the data in a non real-time context + logger_thread_ = std::thread( + [this]() { + while (rclcpp::ok()) { + auto log_msg = logger_queue_.pop(); + while (log_msg.has_value()) { + log_message(log_msg.value()); + log_msg = logger_queue_.pop(); + } + rclcpp::sleep_for(100ms); + } + } + ); + } + + void log_message(const LogMessage & message) + { + RCLCPP_INFO(this->get_logger(), "%s", message.c_str()); + } + + ~MinimalSubscriber() + { + if (logger_thread_.joinable()) { + logger_thread_.join(); + } + } + +private: + rclcpp::TimerBase::SharedPtr timer_; + size_t count_; + rclcpp::Publisher::SharedPtr publisher_; + rclcpp::Subscription::SharedPtr subscription_; + LogMessageQueue logger_queue_; + std::thread logger_thread_; +}; + +int main(int argc, char * argv[]) +{ + rclcpp::init(argc, argv); + auto node = std::make_shared(); + rclcpp::spin(node); + rclcpp::shutdown(); + return 0; +} diff --git a/minimal_data_sharing/minimal_pendulum_controller.cpp b/minimal_data_sharing/minimal_pendulum_controller.cpp new file mode 100644 index 0000000..34a298e --- /dev/null +++ b/minimal_data_sharing/minimal_pendulum_controller.cpp @@ -0,0 +1,327 @@ +// Copyright 2022 Carlos San Vicente +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "pendulum_msgs/msg/joint_command.hpp" +#include "pendulum_msgs/msg/joint_state.hpp" + +#include + +static_assert(std::atomic::is_always_lock_free); + +// Based on: https://github.com/ros2/demos/blob/7a15d0bf14e5765ea223873b6927be908c7acbb3/pendulum_control/include/pendulum_control/pendulum_controller.hpp + +using namespace std::chrono_literals; + +struct PIDProperties +{ + /// Proportional constant. + double p = 1; + /// Integral constant. + double i = 0; + /// Derivative constant. + double d = 0; +}; + +/// Provides a simple PID controller for the inverted pendulum. +class PendulumController : public rclcpp::Node +{ +public: + /// Default constructor. + /** + * \param[in] period The update period of the controller. + * \param[in] pid The properties of the controller. + */ + PendulumController(std::chrono::nanoseconds period, PIDProperties pid) + : rclcpp::Node("pendulum_controller"), + publish_period_(period), pid_(pid) + { + set_command(get_setpoint()); + + // Calculate the controller timestep (for discrete differentiation/integration). + dt_ = publish_period_.count() / (1000.0 * 1000.0 * 1000.0); + if (std::isnan(dt_) || dt_ == 0) { + throw std::runtime_error("Invalid dt_ calculated in PendulumController constructor"); + } + + // Create real-time callbacks + realtime_callback_group_ = this->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, false); + + rclcpp::SubscriptionOptions subscription_options; + subscription_options.callback_group = realtime_callback_group_; + + // Create a lambda function to invoke the controller callback when a command is received. + auto controller_subscribe_callback = + [this](pendulum_msgs::msg::JointState::ConstSharedPtr msg) -> void + { + this->on_sensor_message(msg); + }; + sensor_sub_ = this->create_subscription( + "pendulum_sensor", + 10, + controller_subscribe_callback, subscription_options); + + // Initialize the publisher for the command message. + command_pub_ = create_publisher("pendulum_command", 10); + // Create a lambda function that will fire regularly to publish the next command message. + auto controller_publish_callback = + [this]() + { + auto command_message = pendulum_msgs::msg::JointCommand().set__position(get_command()); + this->command_pub_->publish(command_message); + }; + + controller_publisher_timer_ = this->create_wall_timer( + period, controller_publish_callback, + realtime_callback_group_); + + // Create non-real-time callbacks + // Create a lambda function to accept user input to command the pendulum + auto controller_command_callback = + [this](pendulum_msgs::msg::JointCommand::ConstSharedPtr msg) -> void + { + this->on_pendulum_setpoint(msg); + }; + + setpoint_sub_ = this->create_subscription( + "pendulum_setpoint", 10, controller_command_callback); + + // Set PID parameters + declare_parameter("p", 1.0); + declare_parameter("i", 0.0); + declare_parameter("d", 0.0); + + setParameterEventCallback(); + + // Create a lambda function that will fire regularly to publish the next results message. + auto logger_publish_callback = + [this]() { + RCLCPP_INFO( + this->get_logger(), "Pendulum command position: %lf", get_command()); + RCLCPP_INFO( + this->get_logger(), "Pendulum state position: %lf", get_sensor_position()); + RCLCPP_INFO( + this->get_logger(), "Pendulum setpoint: %lf", get_setpoint()); + RCLCPP_INFO(this->get_logger(), "----------------------------"); + }; + + // Add a timer to enable regular publication of results messages. + logger_publisher_timer_ = this->create_wall_timer(1s, logger_publish_callback); + } + + /// Calculate new command based on new sensor state and PID controller properties. + // \param[in] msg Received sensor message. + void on_sensor_message(pendulum_msgs::msg::JointState::ConstSharedPtr msg) + { + if (std::isnan(msg->position)) { + throw std::runtime_error("Sensor value was NaN in on_sensor_message callback"); + } + // PID controller algorithm + double setpoint = get_setpoint(); + double error = setpoint - msg->position; + // Proportional gain is proportional to error + double p_gain = pid_.p * error; + // Integral gain is proportional to the accumulation of error + i_gain_ = pid_.i * (i_gain_ + error * dt_); + // Differential gain is proportional to the change in error + double d_gain = pid_.d * (error - last_error_) / dt_; + last_error_ = error; + + // Calculate the message based on PID gains + double new_position; + new_position = msg->position + p_gain + i_gain_ + d_gain; + // Enforce positional limits + if (new_position > M_PI) { + new_position = M_PI; + } else if (new_position < 0) { + new_position = 0; + } + + if (std::isnan(new_position)) { + throw std::runtime_error("Resulting command was NaN in on_sensor_message callback"); + } + + set_command(new_position); + set_sensor_position(msg->position); + } + + /// Callback when a pendulum JointCommand message is received. + // \param[in] msg The incoming message containing the position. + void on_pendulum_setpoint(pendulum_msgs::msg::JointCommand::ConstSharedPtr msg) + { + set_setpoint(msg->position); + RCLCPP_INFO(this->get_logger(), "Pendulum set to: '%lf'", msg->position); + } + + /// Set the properties of the PID controller (gains and desired output). + // \param[in] properties Struct representing the desired properties. + void set_pid_properties(const PIDProperties & properties) + { + // TODO(carlos): set data using a lock-free mechanism + pid_ = properties; + } + + /// Get the properties of the controller. + // \return Struct representing the properties of the controller. + const PIDProperties & get_pid_properties() const + { + // TODO(carlos): get data using a lock-free mechanism + return pid_; + } + + /// Set the commanded position of the controller. + // \param[in] command The new commanded position (in radians). + void set_setpoint(double command) + { + setpoint_.store(command); + } + + /// Get the commanded position of the controller. + // \return The commanded position (in radians). + double get_setpoint() const + { + return setpoint_.load(); + } + + /// Get the output position of the controller. + // \return The commanded position (in radians). + double get_command() const + { + return command_.load(); + } + + /// Set the output position of the controller. + // \param[in] command The new commanded position (in radians). + void set_command(double command) + { + command_.store(command); + } + + /// Get the sensor position. + // \return The sensor position. + double get_sensor_position() const + { + return sensor_position_.load(); + } + + /// Set the sensor position. + // \param[in] sensor_position The new sensor position (in radians). + void set_sensor_position(double sensor_position) + { + sensor_position_.store(sensor_position); + } + + rclcpp::CallbackGroup::SharedPtr get_realtime_callback_group() + { + return realtime_callback_group_; + } + +private: + // based on https://github.com/ros-controls/control_toolbox/blob/d7462e982e40bfdebddbcff8849c611caf8c8422/src/pid_ros.cpp#L281-L321 + void setParameterEventCallback() + { + auto on_parameter_event_callback = [this](const std::vector & parameters) { + rcl_interfaces::msg::SetParametersResult result; + result.successful = true; + + /// @note don't use get_pid_properties, it's rt + PIDProperties pid_prop = get_pid_properties(); + + for (auto & parameter : parameters) { + const std::string param_name = parameter.get_name(); + try { + if (param_name == "p") { + pid_prop.p = parameter.get_value(); + } else if (param_name == "i") { + pid_prop.i = parameter.get_value(); + } else if (param_name == "d") { + pid_prop.d = parameter.get_value(); + } else { + result.successful = false; + result.reason = "Invalid parameter"; + } + } catch (const rclcpp::exceptions::InvalidParameterTypeException & e) { + RCLCPP_INFO_STREAM(this->get_logger(), "Please use the right type: " << e.what()); + } + } + + if (result.successful) { + /// @note don't call set_pid_properties() from inside a callback + set_pid_properties(pid_prop); + RCLCPP_INFO( + this->get_logger(), + "PID parameters set to: p=%lf, i=%lf, d=%lf", + pid_prop.p, pid_prop.i, pid_prop.d); + } + + return result; + }; + + parameter_callback_ = this->get_node_parameters_interface()->add_on_set_parameters_callback( + on_parameter_event_callback); + } + + // controller should publish less frequently than the motor + std::chrono::nanoseconds publish_period_; + PIDProperties pid_; + + std::atomic sensor_position_{M_PI / 2}; + std::atomic command_{M_PI / 2}; + std::atomic setpoint_ {M_PI / 2}; + + // state for PID controller + double last_error_ = 0; + double i_gain_ = 0; + double dt_; + + using PIDPropertiesBuffer = iox::concurrent::LockFreeQueue; + PIDPropertiesBuffer pid_buffer_; + + rclcpp::TimerBase::SharedPtr controller_publisher_timer_; + rclcpp::TimerBase::SharedPtr logger_publisher_timer_; + rclcpp::Publisher::SharedPtr command_pub_; + rclcpp::Subscription::SharedPtr sensor_sub_; + rclcpp::Subscription::SharedPtr setpoint_sub_; + rclcpp::node_interfaces::OnSetParametersCallbackHandle::SharedPtr parameter_callback_; + rclcpp::CallbackGroup::SharedPtr realtime_callback_group_; +}; + +int main(int argc, char * argv[]) +{ + rclcpp::init(argc, argv); + PIDProperties pid; + auto node = std::make_shared(std::chrono::nanoseconds(960000), pid); + + rclcpp::executors::StaticSingleThreadedExecutor default_callback_group_executor; + rclcpp::executors::StaticSingleThreadedExecutor realtime_executor; + + default_callback_group_executor.add_node(node); + realtime_executor.add_callback_group( + node->get_realtime_callback_group(), node->get_node_base_interface()); + // spin real-time tasks in a separate thread + auto realtime_thread = std::thread( + [&]() { + realtime_executor.spin(); + }); + + default_callback_group_executor.spin(); + realtime_thread.join(); + + rclcpp::shutdown(); + return 0; +} diff --git a/minimal_data_sharing/package.xml b/minimal_data_sharing/package.xml new file mode 100644 index 0000000..b60ddb8 --- /dev/null +++ b/minimal_data_sharing/package.xml @@ -0,0 +1,35 @@ + + + + minimal_data_sharing + 0.0.1 + Examples of minimal real-time capable data sharing strategies + Carlos San Vicente + Apache License 2.0 + Carlos San Vicente + + ament_cmake + + rclcpp + std_msgs + sensor_msgs + trajectory_msgs + pendulum_msgs + iceoryx_utils + apex_containers + + rclcpp + std_msgs + sensor_msgs + trajectory_msgs + pendulum_msgs + iceoryx_utils + apex_containers + + ament_lint_auto + ament_lint_common + + + ament_cmake + +