Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cpp/src/mavsdk/core/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ void Connection::receive_message(
{
// Register system ID for valid messages
if (result == MavlinkReceiver::ParseResult::MessageParsed) {
if (_system_ids.find(message.sysid) == _system_ids.end()) {
_system_ids.insert(message.sysid);
}
std::lock_guard<std::mutex> lock(_system_ids_mutex);
_system_ids.insert(message.sysid);
}
// Let MavsdkImpl handle the ParseResult (queue for processing or forward-only)
_receiver_callback(result, message, connection);
Expand All @@ -122,6 +121,7 @@ unsigned Connection::forwarding_connections_count()

bool Connection::has_system_id(uint8_t system_id)
{
std::lock_guard<std::mutex> lock(_system_ids_mutex);
return _system_ids.find(system_id) != _system_ids.end();
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/mavsdk/core/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "libmav_receiver.h"
#include <atomic>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_set>
#include <utility>
Expand Down Expand Up @@ -62,6 +63,7 @@ class Connection {
std::unique_ptr<MavlinkReceiver> _mavlink_receiver;
std::unique_ptr<LibmavReceiver> _libmav_receiver;
ForwardingOption _forwarding_option;
std::mutex _system_ids_mutex;
std::unordered_set<uint8_t> _system_ids;

bool _debugging = false;
Expand Down
40 changes: 25 additions & 15 deletions cpp/src/mavsdk/core/mavlink_mission_transfer_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,29 @@ void MavlinkMissionTransferClient::set_current_item_async(

void MavlinkMissionTransferClient::do_work()
{
LockedQueue<WorkItem>::Guard work_queue_guard(_work_queue);
auto work = work_queue_guard.get_front();

if (!work) {
return;
}
// Keep a local shared_ptr so that if pop_front() drops the last queue reference,
// the WorkItem destructor runs after the Guard releases the queue mutex.
// This avoids lock-order-inversion: the WorkItem dtor acquires the message handler
// mutex via unregister_all_blocking(), and holding both would invert the lock order
// vs process_message() which holds the message handler mutex then pushes to the queue.
std::shared_ptr<WorkItem> deferred_work_to_destroy;
{
LockedQueue<WorkItem>::Guard work_queue_guard(_work_queue);
auto work = work_queue_guard.get_front();

if (!work) {
return;
}

if (!work->has_started()) {
work->start();
}
if (work->is_done()) {
work_queue_guard.pop_front();
if (!work->has_started()) {
work->start();
}
if (work->is_done()) {
deferred_work_to_destroy = work;
work_queue_guard.pop_front();
}
}
// deferred_work_to_destroy is destroyed here, outside the queue lock.
}

bool MavlinkMissionTransferClient::is_idle()
Expand Down Expand Up @@ -211,7 +221,7 @@ MavlinkMissionTransferClient::UploadWorkItem::UploadWorkItem(

MavlinkMissionTransferClient::UploadWorkItem::~UploadWorkItem()
{
_message_handler.unregister_all(this);
_message_handler.unregister_all_blocking(this);
_timeout_handler.remove(_cookie);
}

Expand Down Expand Up @@ -599,7 +609,7 @@ MavlinkMissionTransferClient::DownloadWorkItem::DownloadWorkItem(

MavlinkMissionTransferClient::DownloadWorkItem::~DownloadWorkItem()
{
_message_handler.unregister_all(this);
_message_handler.unregister_all_blocking(this);
_timeout_handler.remove(_cookie);
}

Expand Down Expand Up @@ -840,7 +850,7 @@ MavlinkMissionTransferClient::ClearWorkItem::ClearWorkItem(

MavlinkMissionTransferClient::ClearWorkItem::~ClearWorkItem()
{
_message_handler.unregister_all(this);
_message_handler.unregister_all_blocking(this);
_timeout_handler.remove(_cookie);
}

Expand Down Expand Up @@ -989,7 +999,7 @@ MavlinkMissionTransferClient::SetCurrentWorkItem::SetCurrentWorkItem(

MavlinkMissionTransferClient::SetCurrentWorkItem::~SetCurrentWorkItem()
{
_message_handler.unregister_all(this);
_message_handler.unregister_all_blocking(this);
_timeout_handler.remove(_cookie);
}

Expand Down
36 changes: 23 additions & 13 deletions cpp/src/mavsdk/core/mavlink_mission_transfer_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,29 @@ MavlinkMissionTransferServer::send_outgoing_items_async(

void MavlinkMissionTransferServer::do_work()
{
LockedQueue<WorkItem>::Guard work_queue_guard(_work_queue);
auto work = work_queue_guard.get_front();

if (!work) {
return;
}
// Keep a local shared_ptr so that if pop_front() drops the last queue reference,
// the WorkItem destructor runs after the Guard releases the queue mutex.
// This avoids lock-order-inversion: the WorkItem dtor acquires the message handler
// mutex via unregister_all_blocking(), and holding both would invert the lock order
// vs process_message() which holds the message handler mutex then pushes to the queue.
std::shared_ptr<WorkItem> deferred_work_to_destroy;
{
LockedQueue<WorkItem>::Guard work_queue_guard(_work_queue);
auto work = work_queue_guard.get_front();

if (!work) {
return;
}

if (!work->has_started()) {
work->start();
}
if (work->is_done()) {
work_queue_guard.pop_front();
if (!work->has_started()) {
work->start();
}
if (work->is_done()) {
deferred_work_to_destroy = work;
work_queue_guard.pop_front();
}
}
// deferred_work_to_destroy is destroyed here, outside the queue lock.
}

bool MavlinkMissionTransferServer::is_idle()
Expand Down Expand Up @@ -145,7 +155,7 @@ MavlinkMissionTransferServer::ReceiveIncomingMission::ReceiveIncomingMission(

MavlinkMissionTransferServer::ReceiveIncomingMission::~ReceiveIncomingMission()
{
_message_handler.unregister_all(this);
_message_handler.unregister_all_blocking(this);
_timeout_handler.remove(_cookie);
}

Expand Down Expand Up @@ -352,7 +362,7 @@ MavlinkMissionTransferServer::SendOutgoingMission::SendOutgoingMission(

MavlinkMissionTransferServer::SendOutgoingMission::~SendOutgoingMission()
{
_message_handler.unregister_all(this);
_message_handler.unregister_all_blocking(this);
_timeout_handler.remove(_cookie);
}

Expand Down
Loading