diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index 758c8f3fa6e..5f32b3903cf 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -58,7 +58,7 @@ #ifdef FASTDDS_STATISTICS #include #include -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS using namespace eprosima::fastdds; using namespace eprosima::fastdds::rtps; @@ -435,12 +435,7 @@ ReturnCode_t DataWriterImpl::enable() // In case it has been loaded from the persistence DB, rebuild instances on history history_->rebuild_instances(); - deadline_timer_ = new TimedEvent(publisher_->rtps_participant()->get_resource_event(), - [&]() -> bool - { - return deadline_missed(); - }, - qos_.deadline().period.to_ns() * 1e-6); + configure_deadline_timer_(); lifespan_timer_ = new TimedEvent(publisher_->rtps_participant()->get_resource_event(), [&]() -> bool @@ -683,8 +678,8 @@ ReturnCode_t DataWriterImpl::check_write_preconditions( type_.get()->compute_key(data, instance_handle, is_key_protected); } - //Check if the Handle is different from the special value HANDLE_NIL and - //does not correspond with the instance referred by the data + // Check if the Handle is different from the special value HANDLE_NIL and + // does not correspond with the instance referred by the data if (handle.isDefined() && handle != instance_handle) { return RETCODE_PRECONDITION_NOT_MET; @@ -1036,6 +1031,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change( } } + // create_change seeds the next per-instance deadline and reschedules the timer for the next sample CacheChange_t* ch = history_->create_change(change_kind, handle); if (ch != nullptr) { @@ -1068,7 +1064,8 @@ ReturnCode_t DataWriterImpl::perform_create_new_change( return RETCODE_TIMEOUT; } - if (qos_.deadline().period != dds::c_TimeInfinite) + if (qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != dds::c_TimeInfinite && + deadline_missed_status_.total_count < std::numeric_limits::max()) { if (!history_->set_next_deadline( handle, @@ -1179,7 +1176,7 @@ void DataWriterImpl::publisher_qos_updated() { if (writer_ != nullptr) { - //NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED + // NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos()); publisher_->rtps_participant()->update_writer(writer_, wqos); } @@ -1214,6 +1211,9 @@ ReturnCode_t DataWriterImpl::set_qos( return RETCODE_IMMUTABLE_POLICY; } + // Take a snapshot of the current QoS before mutating it + const DataWriterQos old_qos = qos_; + set_qos(qos_, qos_to_set, !enabled); if (enabled) @@ -1229,32 +1229,29 @@ ReturnCode_t DataWriterImpl::set_qos( writer_->update_attributes(w_att); } - //Notify the participant that a Writer has changed its QOS + // Notify the participant that a Writer has changed its QOS WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos()); publisher_->rtps_participant()->update_writer(writer_, wqos); - // Deadline - if (qos_.deadline().period != dds::c_TimeInfinite) + // If the deadline period actually changed, (re)configure the timer. + if (old_qos.deadline().period != qos_.deadline().period) { - deadline_duration_us_ = - duration>(qos_.deadline().period.to_ns() * 1e-3); - deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6); - } - else - { - deadline_timer_->cancel_timer(); + configure_deadline_timer_(); } // Lifespan - if (qos_.lifespan().duration != dds::c_TimeInfinite) + if (old_qos.lifespan().duration != qos_.lifespan().duration) { - lifespan_duration_us_ = - duration>(qos_.lifespan().duration.to_ns() * 1e-3); - lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6); - } - else - { - lifespan_timer_->cancel_timer(); + if (qos_.lifespan().duration != dds::c_TimeInfinite) + { + lifespan_duration_us_ = + duration>(qos_.lifespan().duration.to_ns() * 1e-3); + lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6); + } + else + { + lifespan_timer_->cancel_timer(); + } } } @@ -1326,7 +1323,7 @@ void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::StatusKind::INCOMPATIBLE_QOS); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1365,7 +1362,7 @@ void DataWriterImpl::InnerDataWriterListener::on_liveliness_lost( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::StatusKind::LIVELINESS_LOST); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1409,7 +1406,7 @@ void DataWriterImpl::InnerDataWriterListener::notify_status_observer( } } -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS ReturnCode_t DataWriterImpl::wait_for_acknowledgments( const dds::Duration_t& max_wait) @@ -1502,10 +1499,12 @@ ReturnCode_t DataWriterImpl::get_publication_matched_status( bool DataWriterImpl::deadline_timer_reschedule() { - assert(qos_.deadline().period != dds::c_TimeInfinite); - std::unique_lock lock(writer_->getMutex()); + assert(qos_.deadline().period != dds::c_TimeInfinite); + assert(deadline_timer_ != nullptr); + assert(deadline_missed_status_.total_count < std::numeric_limits::max()); + steady_clock::time_point next_deadline_us; if (!history_->get_next_deadline(timer_owner_, next_deadline_us)) { @@ -1518,18 +1517,57 @@ bool DataWriterImpl::deadline_timer_reschedule() return true; } -bool DataWriterImpl::deadline_missed() +void DataWriterImpl::configure_deadline_timer_() { - assert(qos_.deadline().period != dds::c_TimeInfinite); - std::unique_lock lock(writer_->getMutex()); - deadline_missed_status_.total_count++; - deadline_missed_status_.total_count_change++; - deadline_missed_status_.last_instance_handle = timer_owner_; + // Create the timer once + if (deadline_timer_ == nullptr) + { + deadline_timer_ = new TimedEvent( + publisher_->rtps_participant()->get_resource_event(), + [this]() -> bool + { + return deadline_missed(); + }, + // Park timer with a huge interval (prevents spurious callbacks); we'll arm/cancel explicitly + std::numeric_limits::max() + ); + } + + // Handle "infinite" and "zero" outside the callback + if (qos_.deadline().period == dds::c_TimeInfinite) + { + deadline_duration_us_ = std::chrono::duration::max(); + deadline_timer_->cancel_timer(); + return; + } + + deadline_duration_us_ = + std::chrono::duration>(qos_.deadline().period.to_ns() * 1e-3); + + if (qos_.deadline().period.to_ns() == 0) + { + deadline_timer_->cancel_timer(); + + deadline_missed_status_.total_count = std::numeric_limits::max(); + deadline_missed_status_.total_count_change = std::numeric_limits::max(); + EPROSIMA_LOG_WARNING( + DATA_WRITER, + "Deadline period is 0, it will be ignored from now on."); + + // Bump once and notify listener exactly once. + notify_deadline_missed_nts_(); + return; + } + + deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6); +} + +void DataWriterImpl::notify_deadline_missed_nts_() +{ StatusMask notify_status = StatusMask::offered_deadline_missed(); - auto listener = get_listener_for(notify_status); - if (nullptr != listener) + if (auto* listener = get_listener_for(notify_status)) { listener->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_); deadline_missed_status_.total_count_change = 0; @@ -1537,9 +1575,31 @@ bool DataWriterImpl::deadline_missed() #ifdef FASTDDS_STATISTICS writer_listener_.notify_status_observer(statistics::StatusKind::DEADLINE_MISSED); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); +} + +bool DataWriterImpl::deadline_missed() +{ + std::unique_lock lock(writer_->getMutex()); + + assert(qos_.deadline().period != dds::c_TimeInfinite); + + deadline_missed_status_.total_count++; + deadline_missed_status_.total_count_change++; + deadline_missed_status_.last_instance_handle = timer_owner_; + + notify_deadline_missed_nts_(); + + // If we just reached the max -> log ONCE, stop timer, and bail. + if (deadline_missed_status_.total_count == std::numeric_limits::max()) + { + EPROSIMA_LOG_WARNING(DATA_WRITER, + "Maximum number of deadline missed messages reached. Stopping deadline timer."); + deadline_timer_->cancel_timer(); + return false; // do not reschedule + } if (!history_->set_next_deadline( timer_owner_, diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.hpp b/src/cpp/fastdds/publisher/DataWriterImpl.hpp index a4a3f29d369..81b05d43202 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.hpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.hpp @@ -20,6 +20,7 @@ #define _FASTDDS_DATAWRITERIMPL_HPP_ #include +#include #include #include @@ -604,6 +605,7 @@ class DataWriterImpl : protected rtps::IReaderDataFilter /** * @brief A method to reschedule the deadline timer + * @return true if deadline rescheduling succeeded, false otherwise */ bool deadline_timer_reschedule(); @@ -732,6 +734,18 @@ class DataWriterImpl : protected rtps::IReaderDataFilter private: + /** + * (Re)configures the deadline timer: + * In case of infinite deadline period cancel it, for 0 warn and notify once (with max counts), and + * for non-infinite positive values store period. + */ + void configure_deadline_timer_(); + + /** + * Notifies listeners that a deadline has been missed. + */ + void notify_deadline_missed_nts_(); + void create_history( const std::shared_ptr& payload_pool, const std::shared_ptr& change_pool); diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index b54e5e38962..7b2ddd59002 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -59,7 +59,7 @@ #ifdef FASTDDS_STATISTICS #include #include -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS using eprosima::fastdds::RecursiveTimedMutex; using eprosima::fastdds::dds::c_TimeInfinite; @@ -263,12 +263,7 @@ ReturnCode_t DataReaderImpl::enable() reader_ = reader; - deadline_timer_ = new TimedEvent(subscriber_->rtps_participant()->get_resource_event(), - [&]() -> bool - { - return deadline_missed(); - }, - qos_.deadline().period.to_ns() * 1e-6); + configure_deadline_timer_(); lifespan_timer_ = new TimedEvent(subscriber_->rtps_participant()->get_resource_event(), [&]() -> bool @@ -286,7 +281,7 @@ ReturnCode_t DataReaderImpl::enable() SubscriptionBuiltinTopicData subscription_data; if (get_subscription_builtin_topic_data(subscription_data) != RETCODE_OK) { - EPROSIMA_LOG_ERROR(DATA_WRITER, "Error getting subscription data. RTPS Writer not enabled."); + EPROSIMA_LOG_ERROR(DATA_READER, "Error getting subscription data. RTPS Reader not enabled."); return RETCODE_ERROR; } @@ -341,7 +336,7 @@ void DataReaderImpl::stop() DataReaderImpl::~DataReaderImpl() { - // assert there are no pending conditions + // Assert there are no pending conditions assert(read_conditions_.empty()); // Disable the datareader to prevent receiving data in the middle of deleting it @@ -868,6 +863,9 @@ ReturnCode_t DataReaderImpl::set_qos( return RETCODE_IMMUTABLE_POLICY; } + // Take a snapshot of the current QoS before mutating it + const DataReaderQos old_qos = qos_; + set_qos(qos_, qos_to_set, !enabled); if (enabled) @@ -875,27 +873,25 @@ ReturnCode_t DataReaderImpl::set_qos( // NOTIFY THE BUILTIN PROTOCOLS THAT THE READER HAS CHANGED update_rtps_reader_qos(); - // Deadline - if (qos_.deadline().period != dds::c_TimeInfinite) + // If the deadline period actually changed, (re)configure the timer. + if (old_qos.deadline().period != qos_.deadline().period) { - deadline_duration_us_ = duration>(qos_.deadline().period.to_ns() * 1e-3); - deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6); - } - else - { - deadline_timer_->cancel_timer(); + configure_deadline_timer_(); } // Lifespan - if (qos_.lifespan().duration != dds::c_TimeInfinite) + if (old_qos.lifespan().duration != qos_.lifespan().duration) { - lifespan_duration_us_ = - std::chrono::duration>(qos_.lifespan().duration.to_ns() * 1e-3); - lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6); - } - else - { - lifespan_timer_->cancel_timer(); + if (qos_.lifespan().duration != dds::c_TimeInfinite) + { + lifespan_duration_us_ = + std::chrono::duration>(qos_.lifespan().duration.to_ns() * 1e-3); + lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6); + } + else + { + lifespan_timer_->cancel_timer(); + } } } @@ -920,7 +916,7 @@ void DataReaderImpl::InnerDataReaderListener::on_data_available( { auto user_reader = data_reader_->user_datareader_; - //First check if we can handle with on_data_on_readers + // First check if we can handle with on_data_on_readers SubscriberListener* subscriber_listener = data_reader_->subscriber_->get_listener_for(StatusMask::data_on_readers()); if (subscriber_listener != nullptr) @@ -978,7 +974,7 @@ void DataReaderImpl::InnerDataReaderListener::on_liveliness_changed( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::StatusKind::LIVELINESS_CHANGED); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1001,7 +997,7 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::StatusKind::INCOMPATIBLE_QOS); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1024,7 +1020,7 @@ void DataReaderImpl::InnerDataReaderListener::on_sample_lost( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::StatusKind::SAMPLE_LOST); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1058,12 +1054,12 @@ void DataReaderImpl::InnerDataReaderListener::notify_status_observer( { if (!statistics_pp_impl->get_status_observer()->on_local_entity_status_change(data_reader_->guid(), status_id)) { - EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set entity status"); + EPROSIMA_LOG_ERROR(DATA_READER, "Could not set entity status"); } } } -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS bool DataReaderImpl::on_data_available( const fastdds::rtps::GUID_t& writer_guid, @@ -1102,13 +1098,14 @@ bool DataReaderImpl::on_new_cache_change_added( return false; } - if (qos_.deadline().period != dds::c_TimeInfinite) + if (qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != dds::c_TimeInfinite && + deadline_missed_status_.total_count < std::numeric_limits::max()) { if (!history_.set_next_deadline( change->instanceHandle, steady_clock::now() + duration_cast(deadline_duration_us_))) { - EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history"); + EPROSIMA_LOG_ERROR(DATA_READER, "Could not set next deadline in the history"); } else if (timer_owner_ == change->instanceHandle || timer_owner_ == InstanceHandle_t()) { @@ -1150,7 +1147,7 @@ bool DataReaderImpl::on_new_cache_change_added( } else { - EPROSIMA_LOG_ERROR(SUBSCRIBER, "A change was added to history that could not be retrieved"); + EPROSIMA_LOG_ERROR(DATA_READER, "A change was added to history that could not be retrieved"); } // Update and restart the timer @@ -1245,14 +1242,16 @@ ReturnCode_t DataReaderImpl::get_matched_publications( bool DataReaderImpl::deadline_timer_reschedule() { - assert(qos_.deadline().period != dds::c_TimeInfinite); - std::unique_lock lock(reader_->getMutex()); + assert(qos_.deadline().period != dds::c_TimeInfinite); + assert(deadline_timer_ != nullptr); + assert(deadline_missed_status_.total_count < std::numeric_limits::max()); + steady_clock::time_point next_deadline_us; if (!history_.get_next_deadline(timer_owner_, next_deadline_us)) { - EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not get the next deadline from the history"); + EPROSIMA_LOG_ERROR(DATA_READER, "Could not get the next deadline from the history"); return false; } auto interval_ms = duration_cast(next_deadline_us - steady_clock::now()); @@ -1261,15 +1260,55 @@ bool DataReaderImpl::deadline_timer_reschedule() return true; } -bool DataReaderImpl::deadline_missed() +void DataReaderImpl::configure_deadline_timer_() { - assert(qos_.deadline().period != dds::c_TimeInfinite); - std::unique_lock lock(reader_->getMutex()); - deadline_missed_status_.total_count++; - deadline_missed_status_.total_count_change++; - deadline_missed_status_.last_instance_handle = timer_owner_; + // Create the timer once + if (deadline_timer_ == nullptr) + { + deadline_timer_ = new TimedEvent( + subscriber_->rtps_participant()->get_resource_event(), + [this]() -> bool + { + return deadline_missed(); + }, + // Park timer with a huge interval (prevents spurious callbacks); we'll arm/cancel explicitly + std::numeric_limits::max() + ); + } + + // Handle "infinite" and "zero" outside the callback + if (qos_.deadline().period == dds::c_TimeInfinite) + { + deadline_duration_us_ = std::chrono::duration::max(); + deadline_timer_->cancel_timer(); + return; + } + + deadline_duration_us_ = + std::chrono::duration>(qos_.deadline().period.to_ns() * 1e-3); + + if (qos_.deadline().period.to_ns() == 0) + { + deadline_timer_->cancel_timer(); + + deadline_missed_status_.total_count = std::numeric_limits::max(); + deadline_missed_status_.total_count_change = std::numeric_limits::max(); + EPROSIMA_LOG_WARNING( + DATA_READER, + "Deadline period is 0, it will be ignored from now on."); + + // Bump once and notify listener exactly once. + notify_deadline_missed_nts_(); + return; + } + + deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6); +} + +void DataReaderImpl::notify_deadline_missed_nts_() +{ StatusMask notify_status = StatusMask::requested_deadline_missed(); auto listener = get_listener_for(notify_status); if (nullptr != listener) @@ -1280,15 +1319,37 @@ bool DataReaderImpl::deadline_missed() #ifdef FASTDDS_STATISTICS reader_listener_.notify_status_observer(statistics::StatusKind::DEADLINE_MISSED); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); +} + +bool DataReaderImpl::deadline_missed() +{ + std::unique_lock lock(reader_->getMutex()); + + assert(qos_.deadline().period != dds::c_TimeInfinite); + + deadline_missed_status_.total_count++; + deadline_missed_status_.total_count_change++; + deadline_missed_status_.last_instance_handle = timer_owner_; + + notify_deadline_missed_nts_(); + + // If we just reached the max -> log ONCE, stop timer, and bail + if (deadline_missed_status_.total_count == std::numeric_limits::max()) + { + EPROSIMA_LOG_WARNING(DATA_READER, + "Maximum number of deadline missed messages reached. Stopping deadline timer."); + deadline_timer_->cancel_timer(); + return false; // do not reschedule + } if (!history_.set_next_deadline( timer_owner_, steady_clock::now() + duration_cast(deadline_duration_us_), true)) { - EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history"); + EPROSIMA_LOG_ERROR(DATA_READER, "Could not set next deadline in the history"); return false; } return deadline_timer_reschedule(); @@ -1919,7 +1980,7 @@ ReturnCode_t DataReaderImpl::check_datasharing_compatible( return RETCODE_OK; break; default: - EPROSIMA_LOG_ERROR(DATA_WRITER, "Unknown data sharing kind."); + EPROSIMA_LOG_ERROR(DATA_READER, "Unknown data sharing kind."); return RETCODE_BAD_PARAMETER; } } @@ -1951,14 +2012,14 @@ ReturnCode_t DataReaderImpl::delete_contained_entities() // Check pending ReadConditions for (detail::ReadConditionImpl* impl : read_conditions_) { - // should be alive + // Should be alive auto keep_alive = impl->shared_from_this(); assert((bool)keep_alive); - // free ReadConditions + // Free ReadConditions impl->detach_all_conditions(); } - // release the colection + // Release the collection read_conditions_.clear(); return RETCODE_OK; @@ -2068,12 +2129,12 @@ ReadCondition* DataReaderImpl::create_readcondition( if (it != read_conditions_.end()) { - // already there + // Already there impl = (*it)->shared_from_this(); } else { - // create a new one + // Create a new one impl = std::make_shared(*this, key); impl->set_trigger_value(current_mask); // Add the implementation object to the collection @@ -2084,7 +2145,7 @@ ReadCondition* DataReaderImpl::create_readcondition( ReadCondition* cond = new ReadCondition(); auto ret_code = impl->attach_condition(cond); - // attach cannot fail in this scenario + // Attach cannot fail in this scenario assert(RETCODE_OK == ret_code); (void)ret_code; @@ -2120,7 +2181,7 @@ ReturnCode_t DataReaderImpl::delete_readcondition( # ifdef __cpp_lib_enable_shared_from_this std::weak_ptr wp = impl->weak_from_this(); # else - // remove when C++17 is enforced + // Remove when C++17 is enforced auto wp = std::weak_ptr(impl->shared_from_this()); # endif // ifdef __cpp_lib_enable_shared_from_this @@ -2129,10 +2190,10 @@ ReturnCode_t DataReaderImpl::delete_readcondition( if (RETCODE_OK == ret_code) { - // delete the condition + // Delete the condition delete a_condition; - // check if we must remove the implementation object + // Check if we must remove the implementation object if (wp.expired()) { read_conditions_.erase(it); @@ -2177,7 +2238,7 @@ void DataReaderImpl::try_notify_read_conditions() noexcept last_mask_state_.instance_states & ~old_mask.instance_states; } - // traverse the conditions notifying + // Traverse the conditions notifying std::lock_guard _(get_conditions_mutex()); for (detail::ReadConditionImpl* impl : read_conditions_) { @@ -2197,7 +2258,7 @@ ReturnCode_t DataReaderImpl::get_subscription_builtin_topic_data( return RETCODE_NOT_ENABLED; } - // sanity checks + // Sanity checks assert(nullptr != subscriber_); assert(nullptr != topic_); assert(nullptr != subscriber_->get_participant()); diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp index 94bac666ec3..275363ea08a 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp @@ -675,6 +675,17 @@ class DataReaderImpl private: + /** + * (Re)configures the deadline timer: + * In case of infinite deadline period cancel it, for 0 warn and notify once (with max counts), and + * for non-infinite positive values store period. + */ + void configure_deadline_timer_(); + + /** + * Notifies listeners that a deadline has been missed. + */ + void notify_deadline_missed_nts_(); void update_rtps_reader_qos(); DataReaderQos get_datareader_qos_from_settings( diff --git a/test/blackbox/CMakeLists.txt b/test/blackbox/CMakeLists.txt index 596a8348683..aee5cc1b552 100644 --- a/test/blackbox/CMakeLists.txt +++ b/test/blackbox/CMakeLists.txt @@ -220,7 +220,8 @@ if(FASTDDS_PIM_API_TESTS) ) target_include_directories(BlackboxTests_DDS_PIM PRIVATE ${Asio_INCLUDE_DIR} - api/dds-pim) + api/dds-pim + ${PROJECT_SOURCE_DIR}/test/utils) target_link_libraries(BlackboxTests_DDS_PIM fastdds fastcdr diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 4b4c876893d..85a56914e27 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -176,7 +176,6 @@ class PubSubReader Listener( PubSubReader& reader) : reader_(reader) - , times_deadline_missed_(0) { } @@ -226,8 +225,8 @@ class PubSubReader const eprosima::fastdds::dds::RequestedDeadlineMissedStatus& status) override { (void)datareader; - - times_deadline_missed_ = status.total_count; + std::lock_guard lk(mutex_); + requested_deadline_status_ = status; } void on_requested_incompatible_qos( @@ -278,7 +277,14 @@ class PubSubReader unsigned int missed_deadlines() const { - return times_deadline_missed_; + std::lock_guard lk(mutex_); + return requested_deadline_status_.total_count; + } + + unsigned int missed_deadlines_change() const + { + std::lock_guard lk(mutex_); + return requested_deadline_status_.total_count_change; } private: @@ -287,6 +293,9 @@ class PubSubReader const Listener&) = delete; PubSubReader& reader_; + mutable std::mutex mutex_; + + eprosima::fastdds::dds::RequestedDeadlineMissedStatus requested_deadline_status_{}; //! Number of times deadline was missed unsigned int times_deadline_missed_; @@ -1820,6 +1829,11 @@ class PubSubReader return listener_.missed_deadlines(); } + unsigned int missed_deadlines_change() const + { + return listener_.missed_deadlines_change(); + } + void liveliness_lost() { std::unique_lock lock(liveliness_mutex_); @@ -1898,6 +1912,22 @@ class PubSubReader return status; } + bool set_qos() + { + return (eprosima::fastdds::dds::RETCODE_OK == datareader_->set_qos(datareader_qos_)); + } + + bool set_qos( + const eprosima::fastdds::dds::DataReaderQos& att) + { + return (eprosima::fastdds::dds::RETCODE_OK == datareader_->set_qos(att)); + } + + eprosima::fastdds::dds::DataReaderQos get_qos() + { + return (datareader_->get_qos()); + } + bool is_matched() const { return matched_ > 0; diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 6b7e60854b0..dce2fc3f003 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -183,7 +183,6 @@ class PubSubWriter Listener( PubSubWriter& writer) : writer_(writer) - , times_deadline_missed_(0) , times_liveliness_lost_(0) , times_unack_sample_removed_(0) { @@ -214,7 +213,8 @@ class PubSubWriter const eprosima::fastdds::dds::OfferedDeadlineMissedStatus& status) override { static_cast(datawriter); - times_deadline_missed_ = status.total_count; + std::lock_guard lk(mutex_); + offered_deadline_status_ = status; } void on_offered_incompatible_qos( @@ -245,7 +245,14 @@ class PubSubWriter unsigned int missed_deadlines() const { - return times_deadline_missed_; + std::lock_guard lk(mutex_); + return offered_deadline_status_.total_count; + } + + unsigned int missed_deadlines_change() const + { + std::lock_guard lk(mutex_); + return offered_deadline_status_.total_count_change; } unsigned int times_liveliness_lost() const @@ -269,9 +276,10 @@ class PubSubWriter const Listener&) = delete; PubSubWriter& writer_; + mutable std::mutex mutex_; + + eprosima::fastdds::dds::OfferedDeadlineMissedStatus offered_deadline_status_{}; - //! The number of times deadline was missed - unsigned int times_deadline_missed_; //! The number of times liveliness was lost unsigned int times_liveliness_lost_; //! The number of times a sample has been removed unacknowledged @@ -1737,6 +1745,11 @@ class PubSubWriter return listener_.missed_deadlines(); } + unsigned int missed_deadlines_change() const + { + return listener_.missed_deadlines_change(); + } + unsigned int times_liveliness_lost() const { return listener_.times_liveliness_lost(); diff --git a/test/blackbox/common/BlackboxTestsDeadlineQos.cpp b/test/blackbox/common/BlackboxTestsDeadlineQos.cpp index 4ec14da1ac3..fda219c2cc7 100644 --- a/test/blackbox/common/BlackboxTestsDeadlineQos.cpp +++ b/test/blackbox/common/BlackboxTestsDeadlineQos.cpp @@ -22,8 +22,11 @@ #include "PubSubReader.hpp" #include "PubSubWriter.hpp" +#include + using namespace eprosima::fastdds; using namespace eprosima::fastdds::rtps; +using fastlog = eprosima::fastdds::dds::Log; enum communication_type { @@ -32,7 +35,7 @@ enum communication_type DATASHARING }; -class DeadlineQos : public testing::TestWithParam +class DeadlineQos : public ::testing::TestWithParam { public: @@ -309,6 +312,179 @@ TEST_P(DeadlineQos, KeyedTopicBestEffortReaderVolatileWriterSetDeadline) EXPECT_GE(writer.missed_deadlines(), 1u); } +/** + * Testing Redmine issue #23289. + * Writer-side version of ZeroDeadlinePeriod. + * Regression test for the zero-deadline period bug. + * Creating a DataWriter with a deadline of 0. + * Checking if a warning is logged exactly once, the timer is cancelled without missed deadline + * messages and a total count and count change set to max integer. + * Checking warnings, total count and count change when changing the deadline. + */ +TEST_P(DeadlineQos, ZeroDeadlinePeriodWriter) +{ + auto observer = std::make_shared(/*store=*/ false); + auto consumer = std::make_unique(observer); + + fastlog::ClearConsumers(); + fastlog::RegisterConsumer(std::move(consumer)); + fastlog::SetVerbosity(fastlog::Kind::Warning); + + observer->set_global_needle("Deadline period is 0"); + + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + writer.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS); + writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS); + reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS); + + writer.deadline_period(0.0).init(); + reader.init(); + ASSERT_TRUE(writer.isInitialized()); + ASSERT_TRUE(reader.isInitialized()); + + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_keyedhelloworld_data_generator(1); + writer.send_sample(data.front()); + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + // Writer offered-deadline counters should be saturated + EXPECT_EQ(writer.missed_deadlines(), + std::numeric_limits::max()) << "Expected the max value after a zero-deadline warning."; + EXPECT_EQ(writer.missed_deadlines_change(), std::numeric_limits::max()); + + const auto prev = observer->matched_global(); + EXPECT_EQ(prev, 1u) << "Expected exactly one 'deadline=0' warning\n"; + + const auto pre_total = writer.missed_deadlines(); + const auto pre_change = writer.missed_deadlines_change(); + + // Wait for a period long enough to expect a new miss if the timer were still active + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + const auto post_total = writer.missed_deadlines(); + const auto post_change = writer.missed_deadlines_change(); + + EXPECT_EQ(pre_total, post_total) << "The total count should not change, as the timer was canceled."; + EXPECT_EQ(pre_change, post_change) << "The total_count_change should not change, as the timer was canceled."; + EXPECT_EQ(observer->matched_global(), prev) << "No extra warnings after cancel."; + + auto q = writer.get_qos(); + q.deadline().period = Duration_t(0.1); + + ASSERT_TRUE(writer.set_qos(q)); // Update 0 -> finite + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + EXPECT_EQ(writer.missed_deadlines(), std::numeric_limits::max()); + EXPECT_EQ(writer.missed_deadlines_change(), std::numeric_limits::max()); + EXPECT_EQ(observer->matched_global(), prev) << "No new warning when moving reader from 0 -> finite"; + + q.deadline().period = Duration_t(0.0); + + ASSERT_TRUE(writer.set_qos(q)); // Update finite -> 0 + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + EXPECT_EQ(writer.missed_deadlines(), std::numeric_limits::max()); + EXPECT_EQ(writer.missed_deadlines_change(), std::numeric_limits::max()); + EXPECT_EQ(observer->matched_global(), prev + 1) << "Exactly one new warning."; + + fastlog::ClearConsumers(); +} + +/** + * Testing Redmine issue #23289. + * Regression test for the zero-deadline period bug. + * Reader-side version of ZeroDeadlinePeriod. + * Creating a DataReader with a deadline of 0. + * Checking if a warning is logged exactly once, the timer is cancelled without missed deadline + * messages and a total count and count change set to max integer. + * Checking warnings, total count and count change when changing the deadline. + */ +TEST_P(DeadlineQos, ZeroDeadlinePeriodReader) +{ + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + writer.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS); + writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS); + reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS); + + // Writer deadline must also be 0 to satisfy the matching rule and ensure discovery + writer.deadline_period(0.0).init(); + ASSERT_TRUE(writer.isInitialized()); + + auto observer = std::make_shared(/*store=*/ false); + auto consumer = std::make_unique(observer); + + fastlog::ClearConsumers(); + fastlog::RegisterConsumer(std::move(consumer)); + fastlog::SetVerbosity(fastlog::Kind::Warning); + + observer->set_global_needle("Deadline period is 0"); + + // Zero deadline on the READER + reader.deadline_period(0.0).init(); + ASSERT_TRUE(reader.isInitialized()); + + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_keyedhelloworld_data_generator(1); + writer.send_sample(data.front()); + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + // Reader requested-deadline counters should be saturated + EXPECT_EQ(reader.missed_deadlines(), + std::numeric_limits::max()) << "Expected the max value after a zero-deadline warning."; + EXPECT_EQ(reader.missed_deadlines_change(), std::numeric_limits::max()); + + const auto prev = observer->matched_global(); + EXPECT_EQ(prev, 1u) << "Expected exactly one 'deadline=0' warning\n"; + + const auto pre_total = reader.missed_deadlines(); + const auto pre_change = reader.missed_deadlines_change(); + + // Wait for a period long enough to expect a new miss if the timer were still active + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + const auto post_total = reader.missed_deadlines(); + const auto post_change = reader.missed_deadlines_change(); + + EXPECT_EQ(pre_total, post_total) << "Timer canceled on reader; total must not change."; + EXPECT_EQ(pre_change, post_change) << "Timer canceled on reader; total_count_change must not change."; + EXPECT_EQ(observer->matched_global(), prev) << "No extra warnings after cancel."; + + // Now change reader's deadline from 0 -> finite; still no additional warning; counters remain saturated + auto q = reader.get_qos(); + q.deadline().period = Duration_t(0.1); + ASSERT_TRUE(reader.set_qos(q)); + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + EXPECT_EQ(reader.missed_deadlines(), std::numeric_limits::max()); + EXPECT_EQ(reader.missed_deadlines_change(), std::numeric_limits::max()); + EXPECT_EQ(observer->matched_global(), prev) << "No new warning when moving reader from 0 -> finite"; + + q.deadline().period = Duration_t(0.0); + + ASSERT_TRUE(reader.set_qos(q)); // Update finite -> 0 + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + EXPECT_EQ(reader.missed_deadlines(), std::numeric_limits::max()); + EXPECT_EQ(reader.missed_deadlines_change(), std::numeric_limits::max()); + EXPECT_EQ(observer->matched_global(), prev + 1) << "Exactly one new warning."; + + fastlog::ClearConsumers(); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else @@ -317,8 +493,8 @@ TEST_P(DeadlineQos, KeyedTopicBestEffortReaderVolatileWriterSetDeadline) GTEST_INSTANTIATE_TEST_MACRO(DeadlineQos, DeadlineQos, - testing::Values(TRANSPORT, INTRAPROCESS, DATASHARING), - [](const testing::TestParamInfo& info) + ::testing::Values(TRANSPORT, INTRAPROCESS, DATASHARING), + [](const ::testing::TestParamInfo& info) { switch (info.param) { diff --git a/test/utils/LogCounter.hpp b/test/utils/LogCounter.hpp new file mode 100644 index 00000000000..a75556e626f --- /dev/null +++ b/test/utils/LogCounter.hpp @@ -0,0 +1,149 @@ +// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file LogCounter.hpp + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace testing { + +/** + * This class holds all counting/state logic: + * - Counts per Log::Kind (Warning, Error, Info, ...) + * - Optional storage of full entries (disabled by default) + * - "Needle" matcher for exact message occurrences + * Intended to be used behind a LogCounterConsumer + */ +class LogCounterObserver +{ +public: + + using Log = eprosima::fastdds::dds::Log; + using Kind = Log::Kind; + + explicit LogCounterObserver( + bool store_logs = false) + : store_(store_logs) + { + matched_.store(0, std::memory_order_relaxed); + } + + // Set / reset the message substring to match and count + void set_global_needle( + std::string s) + { + std::lock_guard lk(m_); + needle_ = std::move(s); + matched_.store(0, std::memory_order_relaxed); + } + + // Number of messages that matched the current needle (substring match) + size_t matched_global() const + { + return matched_.load(std::memory_order_relaxed); + } + + // Count of logs for a specific kind (Warning, Error, ...) + size_t count( + Kind k) const + { + std::lock_guard lk(m_); + auto it = counts_.find(k); + return (it == counts_.end()) ? 0 : it->second; + } + + // Get stored entries (only if constructed with store_logs=true) + const std::vector& entries() const + { + return entries_; + } + + // Called by the consumer + void on_log( + const Log::Entry& e) + { + std::string local_needle; + { + std::lock_guard lk(m_); + ++counts_[e.kind]; + local_needle = needle_; + if (store_) + { + entries_.push_back(e); + } + } + if (!local_needle.empty() && e.message.find(local_needle) != std::string::npos) + { + matched_.fetch_add(1, std::memory_order_relaxed); + } + } + +private: + + std::map counts_; + std::atomic matched_{0}; + + bool store_; + mutable std::mutex m_; + std::string needle_; + std::vector entries_; +}; + +/** + * Class holding a shared_ptr that ensures the observer + * outlives asynchronous logging, so tests can safely read counters even if a + * failure occurs mid-test. + */ +class LogCounterConsumer : public eprosima::fastdds::dds::LogConsumer +{ +public: + + using Log = eprosima::fastdds::dds::Log; + + explicit LogCounterConsumer( + std::shared_ptr obs) + : observer_(std::move(obs)) + { + } + + void Consume( + const Log::Entry& e) override + { + if (observer_) + { + observer_->on_log(e); + } + } + +private: + + std::shared_ptr observer_; +}; + +} // namespace testing +} // namespace fastdds +} // namespace eprosima