Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/fastdds/dds/subscriber/SampleInfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ struct SampleInfo
//!Related Sample Identity (Extension for RPC)
rtps::SampleIdentity related_sample_identity;

//!Persistence Sample Identity (Extension for RPC)
rtps::SampleIdentity persistence_sample_identity;

};

} // namespace dds
Expand Down
2 changes: 2 additions & 0 deletions include/fastdds/rtps/common/CacheChange.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ struct CacheChangeReaderInfo_t
int32_t no_writers_generation_count;
//! Ownership stregth of its writer when the sample was received.
uint32_t writer_ownership_strength;
//! Persistence writer guid associated with received sample.
GUID_t persistence_writer_guid;
};

/**
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ struct ReadTakeCommand
info.sample_identity.writer_guid(item->writerGUID);
info.sample_identity.sequence_number(item->sequenceNumber);
info.related_sample_identity = item->write_params.sample_identity();
info.persistence_sample_identity.writer_guid(item->reader_info.persistence_writer_guid);
info.persistence_sample_identity.sequence_number(item->sequenceNumber);

info.valid_data = true;

Expand Down
14 changes: 14 additions & 0 deletions src/cpp/rtps/reader/BaseReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,20 @@ fastdds::rtps::SequenceNumber_t BaseReader::get_last_notified(
return ret_val;
}

fastdds::rtps::GUID_t BaseReader::get_persistence_guid(
const fastdds::rtps::GUID_t& guid)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
GUID_t ret_val = guid;
auto p_guid = history_state_->persistence_guid_map.find(guid);
if (p_guid != history_state_->persistence_guid_map.end())
{
ret_val = p_guid->second;
}

return ret_val;
}

fastdds::rtps::SequenceNumber_t BaseReader::update_last_notified(
const fastdds::rtps::GUID_t& guid,
const fastdds::rtps::SequenceNumber_t& seq)
Expand Down
8 changes: 8 additions & 0 deletions src/cpp/rtps/reader/BaseReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,14 @@ class BaseReader
fastdds::rtps::SequenceNumber_t get_last_notified(
const fastdds::rtps::GUID_t& guid);

/**
* @brief Retrieve the persistence guid for the provided writer guid.
*
* @param guid The writer GUID to query.
*/
GUID_t get_persistence_guid(
const GUID_t& guid);

/**
* @brief Update the last notified sequence for a writer's GUID.
*
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,9 @@ bool StatefulReader::change_received(
a_change->reader_info.writer_ownership_strength = (std::numeric_limits<uint32_t>::max)();
}

// Update persistence GUID information on the CacheChange.
a_change->reader_info.persistence_writer_guid = get_persistence_guid(a_change->writerGUID);

// NOTE: Depending on QoS settings, one change can be removed from history
// inside the call to history_->received_change
fastdds::dds::SampleRejectedStatusKind rejection_reason;
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ bool StatelessReader::change_received(
change->reader_info.writer_ownership_strength = (std::numeric_limits<uint32_t>::max)();
}

// Update persistence GUID information on the CacheChange.
change->reader_info.persistence_writer_guid = get_persistence_guid(change->writerGUID);

if (history_->received_change(change, 0))
{
auto payload_length = change->serializedPayload.length;
Expand Down
Loading