Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions .github/actions/fetch-fastdds-repos/action.yml

This file was deleted.

21 changes: 0 additions & 21 deletions .github/actions/install-gtest/action.yml

This file was deleted.

21 changes: 21 additions & 0 deletions .github/workflows/ci.repos
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
repositories:

foonathan_memory_vendor:
type: git
url: https://github.com/eProsima/foonathan_memory_vendor.git
version: master

fastcdr:
type: git
url: https://github.com/eProsima/Fast-CDR.git
version: master

fastdds:
type: git
url: https://github.com/eProsima/Fast-DDS.git
version: master

googletest-distribution:
type: git
url: https://github.com/google/googletest.git
version: release-1.12.1
25 changes: 13 additions & 12 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,12 @@ jobs:
- name: Install apt packages
uses: ./src/Fast-DDS-statistics-backend/.github/actions/install-apt-packages

- name: Install GTest
uses: ./src/Fast-DDS-statistics-backend/.github/actions/install-gtest

- name: Install Python packages
uses: ./src/Fast-DDS-statistics-backend/.github/actions/install-python-packages

- name: Fetch eProsima dependencies
uses: ./src/Fast-DDS-statistics-backend/.github/actions/fetch-fastdds-repos
run: |
vcs import src < ./src/Fast-DDS-statistics-backend/.github/workflows/ci.repos

- name: Update colcon mixin
run: |
Expand All @@ -111,10 +109,17 @@ jobs:
colcon mixin update default
continue-on-error: true

- name: Build gtest
run: |
colcon build \
--event-handlers=console_direct+ \
--packages-select googletest-distribution

- name: Build workspace
run: |
cat src/Fast-DDS-statistics-backend/.github/workflows/test.meta
colcon build \
--packages-skip googletest-distribution \
--event-handlers=console_direct+ \
--metas src/Fast-DDS-statistics-backend/.github/workflows/test.meta

Expand Down Expand Up @@ -187,14 +192,12 @@ jobs:
- name: Install apt packages
uses: ./src/Fast-DDS-statistics-backend/.github/actions/install-apt-packages

- name: Install GTest
uses: ./src/Fast-DDS-statistics-backend/.github/actions/install-gtest

- name: Install Python packages
uses: ./src/Fast-DDS-statistics-backend/.github/actions/install-python-packages

- name: Fetch eProsima dependencies
uses: ./src/Fast-DDS-statistics-backend/.github/actions/fetch-fastdds-repos
run: |
vcs import src < ./src/Fast-DDS-statistics-backend/.github/workflows/ci.repos

- name: Update colcon mixin
run: |
Expand Down Expand Up @@ -299,14 +302,12 @@ jobs:
- name: Install apt packages
uses: ./src/Fast-DDS-statistics-backend/.github/actions/install-apt-packages

- name: Install GTest
uses: ./src/Fast-DDS-statistics-backend/.github/actions/install-gtest

- name: Install Python packages
uses: ./src/Fast-DDS-statistics-backend/.github/actions/install-python-packages

- name: Fetch eProsima dependencies
uses: ./src/Fast-DDS-statistics-backend/.github/actions/fetch-fastdds-repos
run: |
vcs import src < ./src/Fast-DDS-statistics-backend/.github/workflows/ci.repos

- name: Build workspace
run: |
Expand Down
29 changes: 14 additions & 15 deletions src/cpp/Monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,50 +51,49 @@ namespace details {
struct Monitor
{
//! The EntityId of the monitored domain
EntityId id;
EntityId id{};

//! The user listener for this monitor
DomainListener* domain_listener;
DomainListener* domain_listener = nullptr;

//! The callback mask applied to the \c domain_listener
CallbackMask domain_callback_mask;
CallbackMask domain_callback_mask{};

//! The data mask applied to the \c domain_listener->on_data_available
DataKindMask data_mask;
DataKindMask data_mask{};

//! The participant created to communicate with the statistics reporting endpoints in this monitor
fastdds::dds::DomainParticipant* participant;
fastdds::dds::DomainParticipant* participant = nullptr;

//! The listener linked to the \c participant
//! It will process the entity discoveries
fastdds::dds::DomainParticipantListener* participant_listener;

fastdds::dds::DomainParticipantListener* participant_listener = nullptr;

//! The participant created to communicate with the statistics reporting publishers in this monitor
fastdds::dds::Subscriber* subscriber;
fastdds::dds::Subscriber* subscriber = nullptr;

//! Holds the topic object created for each of the statistics topics
std::map<std::string, fastdds::dds::Topic*> topics;
std::map<std::string, fastdds::dds::Topic*> topics{};

//! Holds the datareader object created for each of the statistics topics
std::map<std::string, fastdds::dds::DataReader*> readers;
std::map<std::string, fastdds::dds::DataReader*> readers{};

//! The listener linked to the \c readers
//! All readers will use the same listener
//! The listener will decide how to process the data according to the topic of the reader
fastdds::dds::DataReaderListener* reader_listener;
fastdds::dds::DataReaderListener* reader_listener = nullptr;

//! Participant discovery status. Used in the participant discovery user callback
DomainListener::Status participant_status_;
DomainListener::Status participant_status_{};

//! Topic discovery status. Used in the topic discovery user callback
DomainListener::Status topic_status_;
DomainListener::Status topic_status_{};

//! Datareader discovery status. Used in the datareader discovery user callback
DomainListener::Status datareader_status_;
DomainListener::Status datareader_status_{};

//! DataWriter discovery status. Used in the datawriter discovery user callback
DomainListener::Status datawriter_status_;
DomainListener::Status datawriter_status_{};
};

} // namespace details
Expand Down
118 changes: 74 additions & 44 deletions src/cpp/StatisticsBackend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ void find_or_create_topic_and_type(
{
if (topic_desc->get_type_name() != type->getName())
{
details::StatisticsBackendData::get_instance()->unlock();
throw Error(topic_name + " is not using expected type " + type->getName() +
" and is using instead type " + topic_desc->get_type_name());
}
Expand All @@ -107,7 +106,6 @@ void find_or_create_topic_and_type(
catch (const std::bad_cast& e)
{
// TODO[ILG]: Could we support other TopicDescription types in this context?
details::StatisticsBackendData::get_instance()->unlock();
throw Error(topic_name + " is already used but is not a simple Topic: " + e.what());
}

Expand All @@ -117,7 +115,6 @@ void find_or_create_topic_and_type(
if (ReturnCode_t::RETCODE_PRECONDITION_NOT_MET == monitor->participant->register_type(type, type->getName()))
{
// Name already in use
details::StatisticsBackendData::get_instance()->unlock();
throw Error(std::string("Type name ") + type->getName() + " is already in use");
}
monitor->topics[topic_name] =
Expand Down Expand Up @@ -182,11 +179,19 @@ EntityId create_and_register_monitor(
const DomainParticipantQos& participant_qos,
const DomainId domain_id = 0)
{
// NOTE: This method is quite awful to read because of the error handle of every entity
// This could be done much nicer encapsulating this in Monitor creation in destruction, but you know...
// Why do not call stop_monitor in error case?, youll ask. Well, mutexes are treated rarely here, as this static
// class locks and unlocks StatisticsBackendData mutex, what makes very difficult to do some coherent
// calls from one to another.
// What should happen is that all this logic is moved to StatisticsBackendData. You know, some day...

details::StatisticsBackendData::get_instance()->lock();

/* Create monitor instance and register it in the database */
// Create monitor instance.
std::shared_ptr<details::Monitor> monitor = std::make_shared<details::Monitor>();
std::shared_ptr<database::Domain> domain = std::make_shared<database::Domain>(domain_name);

try
{
domain->id = details::StatisticsBackendData::get_instance()->database_->insert(domain);
Expand All @@ -196,6 +201,8 @@ EntityId create_and_register_monitor(
details::StatisticsBackendData::get_instance()->unlock();
throw;
}
// TODO: in case this function fails afterwards, the domain will be kept in the database without associated
// Participant. There must exist a way in database to delete a domain, or to make a rollback.

monitor->id = domain->id;
monitor->domain_listener = domain_listener;
Expand All @@ -222,6 +229,11 @@ EntityId create_and_register_monitor(

if (monitor->participant == nullptr)
{
// Remove those elements that have been set
delete monitor->reader_listener;
delete monitor->participant_listener;
details::StatisticsBackendData::get_instance()->monitors_by_entity_.erase(domain->id);

details::StatisticsBackendData::get_instance()->unlock();
throw Error("Error initializing monitor. Could not create participant");
}
Expand All @@ -234,19 +246,48 @@ EntityId create_and_register_monitor(

if (monitor->subscriber == nullptr)
{
// Remove those elements that have been set
DomainParticipantFactory::get_instance()->delete_participant(monitor->participant);
delete monitor->reader_listener;
delete monitor->participant_listener;
details::StatisticsBackendData::get_instance()->monitors_by_entity_.erase(domain->id);

details::StatisticsBackendData::get_instance()->unlock();
throw Error("Error initializing monitor. Could not create subscriber");
}

for (const auto& topic : topics)
{
/* Register the type and topic*/
register_statistics_type_and_topic(monitor, topic);

if (monitor->topics[topic] == nullptr)
try
{
register_statistics_type_and_topic(monitor, topic);
}
catch (const std::exception& e)
{
// Remove those elements that have been set
for (auto& it : monitor->readers)
{
if (nullptr != it.second)
{
monitor->subscriber->delete_datareader(it.second);
}
}
for (auto& it : monitor->topics)
{
if (nullptr != it.second)
{
monitor->participant->delete_topic(it.second);
}
}
monitor->participant->delete_subscriber(monitor->subscriber);
DomainParticipantFactory::get_instance()->delete_participant(monitor->participant);
delete monitor->reader_listener;
delete monitor->participant_listener;
details::StatisticsBackendData::get_instance()->monitors_by_entity_.erase(domain->id);

details::StatisticsBackendData::get_instance()->unlock();
throw Error("Error initializing monitor. Could not create topic " + std::string(topic));
throw Error("Error registering topic " + std::string(topic) + " : " + e.what());
}

/* Create DataReaders */
Expand All @@ -258,6 +299,27 @@ EntityId create_and_register_monitor(

if (monitor->readers[topic] == nullptr)
{
// Remove those elements that have been set
for (auto& it : monitor->readers)
{
if (nullptr != it.second)
{
monitor->subscriber->delete_datareader(it.second);
}
}
for (auto& it : monitor->topics)
{
if (nullptr != it.second)
{
monitor->participant->delete_topic(it.second);
}
}
monitor->participant->delete_subscriber(monitor->subscriber);
DomainParticipantFactory::get_instance()->delete_participant(monitor->participant);
delete monitor->reader_listener;
delete monitor->participant_listener;
details::StatisticsBackendData::get_instance()->monitors_by_entity_.erase(domain->id);

details::StatisticsBackendData::get_instance()->unlock();
throw Error("Error initializing monitor. Could not create reader for topic " + std::string(topic));
}
Expand Down Expand Up @@ -285,8 +347,9 @@ void StatisticsBackend::set_domain_listener(
CallbackMask callback_mask,
DataKindMask data_mask)
{
auto monitor = details::StatisticsBackendData::get_instance()->monitors_by_entity_.find(monitor_id);
if (monitor == details::StatisticsBackendData::get_instance()->monitors_by_entity_.end())
auto statistics_backend_data = details::StatisticsBackendData::get_instance();
auto monitor = statistics_backend_data->monitors_by_entity_.find(monitor_id);
if (monitor == statistics_backend_data->monitors_by_entity_.end())
{
throw BadParameter("There is no monitor with the given ID");
}
Expand Down Expand Up @@ -336,40 +399,7 @@ EntityId StatisticsBackend::init_monitor(
void StatisticsBackend::stop_monitor(
EntityId monitor_id)
{
details::StatisticsBackendData::get_instance()->lock();

//Find the monitor
auto it = details::StatisticsBackendData::get_instance()->monitors_by_entity_.find(monitor_id);
if (it == details::StatisticsBackendData::get_instance()->monitors_by_entity_.end())
{
details::StatisticsBackendData::get_instance()->unlock();
throw BadParameter("No monitor with such ID");
}
auto monitor = it->second;
details::StatisticsBackendData::get_instance()->monitors_by_entity_.erase(it);

// Delete everything created during monitor initialization
for (const auto& reader : monitor->readers)
{
monitor->subscriber->delete_datareader(reader.second);
}
monitor->readers.clear();

for (const auto& topic : monitor->topics)
{
monitor->participant->delete_topic(topic.second);
}
monitor->topics.clear();

monitor->participant->delete_subscriber(monitor->subscriber);
DomainParticipantFactory::get_instance()->delete_participant(monitor->participant);
delete monitor->reader_listener;
delete monitor->participant_listener;

// The monitor is inactive
details::StatisticsBackendData::get_instance()->database_->change_entity_status(monitor_id, false);

details::StatisticsBackendData::get_instance()->unlock();
details::StatisticsBackendData::get_instance()->stop_monitor(monitor_id);
}

EntityId StatisticsBackend::init_monitor(
Expand Down
Loading