From 28e2ce8fafb0cc1ebfdf883896fa39148e91c95c Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 20 Nov 2024 08:47:38 +0100 Subject: [PATCH] Fix destruction data-race on participant removal in intra-process (#5034) (#5367) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix destruction data-race on participant removal in intra-process (#5034) * Refs #21293: Add BB test Signed-off-by: Mario Dominguez * Refs #21293: Reinforce test to fail more frequently Signed-off-by: Mario Dominguez * Refs #21293: Add RefCountedPointer.hpp to utils Signed-off-by: Mario Dominguez * Refs #21293: Add unittests for RefCountedPointer Signed-off-by: Mario Dominguez * Refs #21293: LocalReaderPointer.hpp Signed-off-by: Mario Dominguez * Refs #21293: BaseReader aggregates LocalReaderPointer Signed-off-by: Mario Dominguez * Refs #21293: ReaderLocator aggregates LocalReaderPointer Signed-off-by: Mario Dominguez * Refs #21293: RTPSDomainImpl::find_local_reader returns a sared_ptr and properly calls local_actions_on_reader_removed() Signed-off-by: Mario Dominguez * Refs #21293: RTPSWriters properly using LocalReaderPointer::Instance when accessing local reader Signed-off-by: Mario Dominguez * Refs #21293: Linter Signed-off-by: Mario Dominguez * Refs #21293: Fix windows warnings Signed-off-by: Mario Dominguez * Refs #21293: Address Miguel's review Signed-off-by: Mario Dominguez * Refs #21293: Apply last comment Signed-off-by: Mario Dominguez * Refs #21293: NIT Signed-off-by: Mario Dominguez --------- Signed-off-by: Mario Dominguez (cherry picked from commit 456e45f25b14cdeeac8ddde7b3627323a9b0f759) # Conflicts: # include/fastdds/rtps/writer/ReaderLocator.h # include/fastdds/rtps/writer/ReaderProxy.h # src/cpp/rtps/RTPSDomain.cpp # src/cpp/rtps/RTPSDomainImpl.hpp # src/cpp/rtps/participant/RTPSParticipantImpl.cpp # src/cpp/rtps/participant/RTPSParticipantImpl.h # src/cpp/rtps/reader/BaseReader.cpp # src/cpp/rtps/reader/BaseReader.hpp # src/cpp/rtps/writer/ReaderLocator.cpp # src/cpp/rtps/writer/StatefulWriter.cpp # src/cpp/rtps/writer/StatelessWriter.cpp # test/blackbox/common/DDSBlackboxTestsBasic.cpp # test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h # test/unittest/utils/CMakeLists.txt * Solve conflicts Signed-off-by: Mario Dominguez * Apply Miguel's suggestions: make LocalReaderPointer inherit RefCounterPointer<> and add DOXYGEN_SHOULD_SKIP_THIS_PUBLIC Signed-off-by: Mario Dominguez * Apply NIT Signed-off-by: Mario Dominguez * Construct LocalReaderPointer in RTPSReader Signed-off-by: Mario Dominguez --------- Signed-off-by: Mario Dominguez Co-authored-by: Mario Domínguez López <116071334+Mario-DL@users.noreply.github.com> Co-authored-by: Mario Dominguez --- .../rtps/reader/LocalReaderPointer.hpp | 50 ++++ include/fastdds/rtps/reader/RTPSReader.h | 18 ++ include/fastdds/rtps/writer/ReaderLocator.h | 11 +- include/fastdds/rtps/writer/ReaderProxy.h | 2 +- include/fastrtps/utils/RefCountedPointer.hpp | 222 ++++++++++++++++++ src/cpp/rtps/RTPSDomain.cpp | 5 +- src/cpp/rtps/RTPSDomainImpl.hpp | 3 +- .../rtps/participant/RTPSParticipantImpl.cpp | 40 +++- .../rtps/participant/RTPSParticipantImpl.h | 3 +- src/cpp/rtps/reader/RTPSReader.cpp | 30 ++- src/cpp/rtps/writer/ReaderLocator.cpp | 22 +- src/cpp/rtps/writer/StatefulWriter.cpp | 28 ++- src/cpp/rtps/writer/StatelessWriter.cpp | 10 +- .../blackbox/common/DDSBlackboxTestsBasic.cpp | 83 +++++++ .../fastdds/rtps/reader/RTPSReader.h | 4 + .../fastdds/rtps/writer/ReaderLocator.h | 12 +- test/unittest/utils/CMakeLists.txt | 9 + .../unittest/utils/RefCountedPointerTests.cpp | 183 +++++++++++++++ 18 files changed, 667 insertions(+), 68 deletions(-) create mode 100644 include/fastdds/rtps/reader/LocalReaderPointer.hpp create mode 100644 include/fastrtps/utils/RefCountedPointer.hpp create mode 100644 test/unittest/utils/RefCountedPointerTests.cpp diff --git a/include/fastdds/rtps/reader/LocalReaderPointer.hpp b/include/fastdds/rtps/reader/LocalReaderPointer.hpp new file mode 100644 index 00000000000..7642dfb4e9e --- /dev/null +++ b/include/fastdds/rtps/reader/LocalReaderPointer.hpp @@ -0,0 +1,50 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file LocalReaderPointer.hpp + */ + +#ifndef FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP +#define FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP + +#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC + +#include + +namespace eprosima { +namespace fastrtps { +namespace rtps { + +class RTPSReader; + +struct LocalReaderPointer : public RefCountedPointer +{ + LocalReaderPointer( + RTPSReader* ptr) + : RefCountedPointer(ptr) + { + } + + virtual ~LocalReaderPointer() = default; + +}; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC + +#endif // FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP diff --git a/include/fastdds/rtps/reader/RTPSReader.h b/include/fastdds/rtps/reader/RTPSReader.h index 6db4528e617..fd6deb15316 100644 --- a/include/fastdds/rtps/reader/RTPSReader.h +++ b/include/fastdds/rtps/reader/RTPSReader.h @@ -46,6 +46,7 @@ struct CacheChange_t; struct ReaderHistoryState; class WriterProxyData; class IDataSharingListener; +struct LocalReaderPointer; /** * Class RTPSReader, manages the reception of data from its matched writers. @@ -175,6 +176,11 @@ class RTPSReader const SequenceNumberSet_t& gapList, fastdds::rtps::VendorId_t origin_vendor_id = c_VendorId_Unknown) = 0; + /** + * @brief Waits for not being referenced/used by any other entity. + */ + virtual void local_actions_on_reader_removed(); + /** * Method to indicate the reader that some change has been removed due to HistoryQos requirements. * @param change Pointer to the CacheChange_t. @@ -485,6 +491,14 @@ class RTPSReader bool is_datasharing_compatible_with( const WriterProxyData& wdata); + /** + * @brief Retrieves the local pointer to this reader + * to be used by other local entities. + * + * @return Local pointer to this reader. + */ + std::shared_ptr get_local_pointer(); + //!ReaderHistory ReaderHistory* mp_history; //!Listener @@ -495,6 +509,10 @@ class RTPSReader bool m_acceptMessagesFromUnkownWriters; //!Trusted writer (for Builtin) EntityId_t m_trustedWriterEntityId; + + /// RefCountedPointer of this instance. + std::shared_ptr local_ptr_; + //!Expects Inline Qos. bool m_expectsInlineQos; diff --git a/include/fastdds/rtps/writer/ReaderLocator.h b/include/fastdds/rtps/writer/ReaderLocator.h index 49a697fd9e4..073d370b098 100644 --- a/include/fastdds/rtps/writer/ReaderLocator.h +++ b/include/fastdds/rtps/writer/ReaderLocator.h @@ -21,11 +21,12 @@ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #include -#include #include +#include +#include #include #include -#include +#include namespace eprosima { namespace fastrtps { @@ -69,10 +70,10 @@ class ReaderLocator : public RTPSMessageSenderInterface return is_local_reader_; } - RTPSReader* local_reader(); + LocalReaderPointer::Instance local_reader(); void local_reader( - RTPSReader* local_reader) + std::shared_ptr local_reader) { local_reader_ = local_reader; } @@ -260,7 +261,7 @@ class ReaderLocator : public RTPSMessageSenderInterface LocatorSelectorEntry async_locator_info_; bool expects_inline_qos_; bool is_local_reader_; - RTPSReader* local_reader_; + std::shared_ptr local_reader_; std::vector guid_prefix_as_vector_; std::vector guid_as_vector_; IDataSharingNotifier* datasharing_notifier_; diff --git a/include/fastdds/rtps/writer/ReaderProxy.h b/include/fastdds/rtps/writer/ReaderProxy.h index 2610e988ce2..375493fa7f2 100644 --- a/include/fastdds/rtps/writer/ReaderProxy.h +++ b/include/fastdds/rtps/writer/ReaderProxy.h @@ -294,7 +294,7 @@ class ReaderProxy * Get the local reader on the same process (if any). * @return The local reader on the same process. */ - inline RTPSReader* local_reader() + inline LocalReaderPointer::Instance local_reader() { return locator_info_.local_reader(); } diff --git a/include/fastrtps/utils/RefCountedPointer.hpp b/include/fastrtps/utils/RefCountedPointer.hpp new file mode 100644 index 00000000000..572ef84dffb --- /dev/null +++ b/include/fastrtps/utils/RefCountedPointer.hpp @@ -0,0 +1,222 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file RefCountedPointer.hpp + */ + +#ifndef UTILS__REFCOUNTEDPOINTER_HPP +#define UTILS__REFCOUNTEDPOINTER_HPP + +#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC + +#include +#include +#include +#include +#include +#include + +namespace eprosima { +namespace fastrtps { + +/** + * @brief Class to manage a local pointer with reference counting. + * + * It is similar to std::shared_ptr, but designed for cases where + * a shared pointer cannot be used due to API restrictions. + * + * USAGE: + * - On T class: + * - Add a shared_ptr> local_ptr_ member. + * - Call local_ptr_->deactivate() before destroying T. + * + * - On classes that need to use a pointer to T: + * - Keep a copy of the shared_ptr>. + * - Whenever you need to access T: + * RefCountedPointer::Instance instance(local_ptr_) + * if (instance) + * { + * ptr->method(); + * } + */ +template +class RefCountedPointer +{ +public: + + class Instance; + + /** + * @brief Explicit constructor. + * @param ptr Pointer to manage. + * + * @pre nullptr != ptr. We must ensure that the pointer we + * are manaing is valid. + */ + explicit RefCountedPointer( + T* ptr) + : ptr_(ptr) + , is_active_(true) + , instances_(0) + { + assert(nullptr != ptr); + } + + ~RefCountedPointer() = default; + + // Non-copyable and non-movable + RefCountedPointer( + const RefCountedPointer&) = delete; + RefCountedPointer& operator =( + const RefCountedPointer&) = delete; + RefCountedPointer( + RefCountedPointer&&) = delete; + RefCountedPointer& operator =( + RefCountedPointer&&) = delete; + + /** + * @brief Class to manage the local pointer instance. + * It will increase the reference count on construction and decrease + * it on destruction. Provides a facade to access the pointee. + */ + class Instance + { + public: + + /** + * @brief Constructor. + * @param parent Shared pointer reference to its RefCountedPointer. + */ + explicit Instance( + const std::shared_ptr>& parent) + : parent_(parent) + , ptr_(parent && parent->is_active_ ? parent->ptr_ : nullptr) + { + if (parent_) + { + parent_->inc_instances(); + } + } + + /** + * @brief Destructor. + */ + ~Instance() + { + if (parent_) + { + parent_->dec_instances(); + } + } + + // Non-copyable, default movable + Instance( + const Instance&) = delete; + Instance& operator =( + const Instance&) = delete; + Instance( + Instance&&) = default; + Instance& operator =( + Instance&&) = default; + + /** + * @brief operator to check if the pointer is valid. + */ + operator bool() const + { + return nullptr != ptr_; + } + + /** + * @brief operator to call the T methods. + */ + T* operator ->() const + { + assert(nullptr != ptr_); + return ptr_; + } + + private: + + std::shared_ptr> parent_; + T* const ptr_; + }; + + /** + * @brief Ensure no more valid local pointer instances are created, and wait for current ones to die. + */ + void deactivate() + { + std::unique_lock lock(mutex_); + is_active_ = false; + cv_.wait(lock, [this]() -> bool + { + return instances_ == 0; + }); + } + +private: + + /** + * @brief Increase the reference count. + */ + void inc_instances() + { + std::unique_lock lock(mutex_); + ++instances_; + } + + /** + * @brief Decrease the reference count. + */ + void dec_instances() + { + std::unique_lock lock(mutex_); + --instances_; + if (instances_ == 0) + { + cv_.notify_one(); + } + } + + /** + * Pointer to the managed object. + */ + T* const ptr_; + + /** + * Indicates whether the pointee is still alive + * and accessing the pointer is valid. + */ + std::atomic is_active_; + + /** + * Protections for the number of instances. + */ + mutable std::mutex mutex_; + std::condition_variable cv_; + + /** + * Number of active instances (currently using the pointee). + */ + size_t instances_; +}; + +} // namespace fastdds +} // namespace eprosima + +#endif // DOXYGEN_SHOULD_SKIP_THIS_PUBLIC + +#endif // UTILS__REFCOUNTEDPOINTER_HPP diff --git a/src/cpp/rtps/RTPSDomain.cpp b/src/cpp/rtps/RTPSDomain.cpp index 20e48e89e5f..7dd46e510cf 100644 --- a/src/cpp/rtps/RTPSDomain.cpp +++ b/src/cpp/rtps/RTPSDomain.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -777,7 +778,7 @@ RTPSParticipantImpl* RTPSDomainImpl::find_local_participant( return nullptr; } -RTPSReader* RTPSDomainImpl::find_local_reader( +std::shared_ptr RTPSDomainImpl::find_local_reader( const GUID_t& reader_guid) { auto instance = get_instance(); @@ -791,7 +792,7 @@ RTPSReader* RTPSDomainImpl::find_local_reader( } } - return nullptr; + return std::shared_ptr(nullptr); } RTPSWriter* RTPSDomainImpl::find_local_writer( diff --git a/src/cpp/rtps/RTPSDomainImpl.hpp b/src/cpp/rtps/RTPSDomainImpl.hpp index 1abe1490805..139b89f5597 100644 --- a/src/cpp/rtps/RTPSDomainImpl.hpp +++ b/src/cpp/rtps/RTPSDomainImpl.hpp @@ -25,6 +25,7 @@ #endif // defined(_WIN32) || defined(__unix__) #include +#include #include #include #include @@ -176,7 +177,7 @@ class RTPSDomainImpl * * @returns A pointer to a local reader given its endpoint guid, or nullptr if not found. */ - static RTPSReader* find_local_reader( + static std::shared_ptr find_local_reader( const GUID_t& reader_guid); /** diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 644295d8dc5..0c240518bbc 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -27,29 +27,30 @@ #include #include #include +#include #include #include #include -#include #include #include #include #include #include #include -#include +#include +#include #include #include -#include +#include #include -#include +#include #include #include -#include -#include +#include +#include #include #include -#include +#include #include #include @@ -1573,7 +1574,7 @@ bool RTPSParticipantImpl::createReader( return create_reader(ReaderOut, param, entityId, isBuiltin, enable, callback); } -RTPSReader* RTPSParticipantImpl::find_local_reader( +std::shared_ptr RTPSParticipantImpl::find_local_reader( const GUID_t& reader_guid) { shared_lock _(endpoints_list_mutex); @@ -1582,11 +1583,11 @@ RTPSReader* RTPSParticipantImpl::find_local_reader( { if (reader->getGuid() == reader_guid) { - return reader; + return reader->get_local_pointer(); } } - return nullptr; + return std::shared_ptr(); } RTPSWriter* RTPSParticipantImpl::find_local_writer( @@ -2223,6 +2224,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint( bool found = false, found_in_users = false; Endpoint* p_endpoint = nullptr; + RTPSReader* reader = nullptr; if (endpoint.entityId.is_writer()) { @@ -2257,6 +2259,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint( { if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it { + reader = *rit; m_userReaderList.erase(rit); found_in_users = true; break; @@ -2267,6 +2270,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint( { if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it { + reader = *rit; p_endpoint = *rit; m_allReaderList.erase(rit); found = true; @@ -2325,6 +2329,10 @@ bool RTPSParticipantImpl::deleteUserEndpoint( #endif // if HAVE_SECURITY } + if (reader) + { + reader->local_actions_on_reader_removed(); + } delete(p_endpoint); return true; } @@ -2412,6 +2420,11 @@ void RTPSParticipantImpl::deleteAllUserEndpoints() } #endif // if HAVE_SECURITY + if (kind == READER) + { + static_cast(endpoint)->local_actions_on_reader_removed(); + } + // remove the endpoints delete(endpoint); } @@ -3124,8 +3137,11 @@ bool RTPSParticipantImpl::register_in_reader( } else if (!fastdds::statistics::is_statistics_builtin(reader_guid.entityId)) { - RTPSReader* reader = find_local_reader(reader_guid); - res = reader->add_statistics_listener(listener); + LocalReaderPointer::Instance local_reader(find_local_reader(reader_guid)); + if (local_reader) + { + res = local_reader->add_statistics_listener(listener); + } } return res; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 372e48daafb..e963a23a298 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -121,6 +121,7 @@ class PDP; class PDPSimple; class IPersistenceService; class WLP; +struct LocalReaderPointer; /** * @brief Class RTPSParticipantImpl, it contains the private implementation of the RTPSParticipant functions and @@ -479,7 +480,7 @@ class RTPSParticipantImpl /*** * @returns A pointer to a local reader given its endpoint guid, or nullptr if not found. */ - RTPSReader* find_local_reader( + std::shared_ptr find_local_reader( const GUID_t& reader_guid); /*** diff --git a/src/cpp/rtps/reader/RTPSReader.cpp b/src/cpp/rtps/reader/RTPSReader.cpp index 297feee6ef9..f6ec0089a3d 100644 --- a/src/cpp/rtps/reader/RTPSReader.cpp +++ b/src/cpp/rtps/reader/RTPSReader.cpp @@ -21,19 +21,11 @@ #include #include -#include -#include - -#include - -#include - -#include +#include #include - -#include #include +#include #include #include @@ -41,6 +33,12 @@ #include +#include +#include +#include +#include +#include + namespace eprosima { namespace fastrtps { @@ -105,6 +103,11 @@ RTPSReader::RTPSReader( init(payload_pool, change_pool, att); } +void RTPSReader::local_actions_on_reader_removed() +{ + local_ptr_->deactivate(); +} + void RTPSReader::init( const std::shared_ptr& payload_pool, const std::shared_ptr& change_pool, @@ -144,6 +147,8 @@ void RTPSReader::init( mp_history->mp_reader = this; mp_history->mp_mutex = &mp_mutex; + local_ptr_ = std::make_shared(this); + EPROSIMA_LOG_INFO(RTPS_READER, "RTPSReader created correctly"); } @@ -214,6 +219,11 @@ bool RTPSReader::setListener( return true; } +std::shared_ptr RTPSReader::get_local_pointer() +{ + return local_ptr_; +} + History::const_iterator RTPSReader::findCacheInFragmentedProcess( const SequenceNumber_t& sequence_number, const GUID_t& writer_guid, diff --git a/src/cpp/rtps/writer/ReaderLocator.cpp b/src/cpp/rtps/writer/ReaderLocator.cpp index cd847f185b7..854af3c03b2 100644 --- a/src/cpp/rtps/writer/ReaderLocator.cpp +++ b/src/cpp/rtps/writer/ReaderLocator.cpp @@ -42,7 +42,7 @@ ReaderLocator::ReaderLocator( , async_locator_info_(max_unicast_locators, max_multicast_locators) , expects_inline_qos_(false) , is_local_reader_(false) - , local_reader_(nullptr) + , local_reader_() , guid_prefix_as_vector_(1u) , guid_as_vector_(1u) , datasharing_notifier_(nullptr) @@ -81,7 +81,7 @@ bool ReaderLocator::start( is_local_reader_ = RTPSDomainImpl::should_intraprocess_between(owner_->getGuid(), remote_guid); is_datasharing &= !is_local_reader_; - local_reader_ = nullptr; + local_reader_.reset(); if (!is_local_reader_ && !is_datasharing) { @@ -174,7 +174,7 @@ void ReaderLocator::stop() guid_prefix_as_vector_.at(0) = c_GuidPrefix_Unknown; expects_inline_qos_ = false; is_local_reader_ = false; - local_reader_ = nullptr; + local_reader_.reset(); } bool ReaderLocator::send( @@ -202,13 +202,13 @@ bool ReaderLocator::send( return true; } -RTPSReader* ReaderLocator::local_reader() +LocalReaderPointer::Instance ReaderLocator::local_reader() { if (!local_reader_) { local_reader_ = RTPSDomainImpl::find_local_reader(general_locator_info_.remote_guid); } - return local_reader_; + return LocalReaderPointer::Instance(local_reader_); } bool ReaderLocator::is_datasharing_reader() const @@ -218,15 +218,13 @@ bool ReaderLocator::is_datasharing_reader() const void ReaderLocator::datasharing_notify() { - RTPSReader* reader = nullptr; if (is_local_reader()) { - reader = local_reader(); - } - - if (reader) - { - reader->datasharing_listener()->notify(true); + LocalReaderPointer::Instance reader = local_reader(); + if (reader) + { + reader->datasharing_listener()->notify(true); + } } else { diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 88b8f9006c9..df8e15a1a22 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -465,14 +466,14 @@ bool StatefulWriter::intraprocess_delivery( CacheChange_t* change, ReaderProxy* reader_proxy) { - RTPSReader* reader = reader_proxy->local_reader(); - if (reader) + LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); + if (local_reader) { if (change->write_params.related_sample_identity() != SampleIdentity::unknown()) { change->write_params.sample_identity(change->write_params.related_sample_identity()); } - return reader->processDataMsg(change); + return local_reader->processDataMsg(change); } return false; } @@ -482,10 +483,11 @@ bool StatefulWriter::intraprocess_gap( const SequenceNumber_t& first_seq, const SequenceNumber_t& last_seq) { - RTPSReader* reader = reader_proxy->local_reader(); - if (reader) + LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); + if (local_reader) { - return reader->processGapMsg(m_guid, first_seq, SequenceNumberSet_t(last_seq), c_VendorId_eProsima); + return local_reader->processGapMsg( + m_guid, first_seq, SequenceNumberSet_t(last_seq), c_VendorId_eProsima); } return false; @@ -496,12 +498,11 @@ bool StatefulWriter::intraprocess_heartbeat( bool liveliness) { bool returned_value = false; + LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); - std::lock_guard guardW(mp_mutex); - RTPSReader* reader = RTPSDomainImpl::find_local_reader(reader_proxy->guid()); - - if (reader) + if (local_reader) { + std::unique_lock lockW(mp_mutex); SequenceNumber_t first_seq = get_seq_num_min(); SequenceNumber_t last_seq = get_seq_num_max(); @@ -518,9 +519,10 @@ bool StatefulWriter::intraprocess_heartbeat( (liveliness || reader_proxy->has_changes())) { incrementHBCount(); - returned_value = - reader->processHeartbeatMsg(m_guid, m_heartbeatCount, first_seq, last_seq, true, liveliness, - c_VendorId_eProsima); + Count_t hb_count = m_heartbeatCount; + lockW.unlock(); + returned_value = local_reader->processHeartbeatMsg( + m_guid, hb_count, first_seq, last_seq, true, liveliness, c_VendorId_eProsima); } } diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 8c7a770b709..2fa8e2d041c 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -344,16 +344,16 @@ bool StatelessWriter::intraprocess_delivery( CacheChange_t* change, ReaderLocator& reader_locator) { - RTPSReader* reader = reader_locator.local_reader(); + LocalReaderPointer::Instance local_reader = reader_locator.local_reader(); - if (reader && + if (local_reader && (!reader_data_filter_ || reader_data_filter_->is_relevant(*change, reader_locator.remote_guid()))) { if (change->write_params.related_sample_identity() != SampleIdentity::unknown()) { change->write_params.sample_identity(change->write_params.related_sample_identity()); } - return reader->processDataMsg(change); + return local_reader->processDataMsg(change); } return false; @@ -929,8 +929,8 @@ bool StatelessWriter::get_connections( //! intraprocess for_matched_readers(matched_local_readers_, [&connection, &connection_list](ReaderLocator& reader) { - connection.guid(fastdds::statistics::to_statistics_type(reader.local_reader()->getGuid())); - connection.mode(fastdds::statistics::INTRAPROCESS); + connection.guid(fastdds::statistics::to_statistics_type(reader.remote_guid())); + connection.mode(fastdds::statistics::ConnectionMode::INTRAPROCESS); connection_list.push_back(connection); return false; diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index a2fedada3e2..755e9f833ce 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -46,6 +46,7 @@ #include "BlackboxTests.hpp" #include "mock/BlackboxMockConsumer.h" #include "../api/dds-pim/CustomPayloadPool.hpp" +#include "../api/dds-pim/PubSubParticipant.hpp" #include "../api/dds-pim/PubSubReader.hpp" #include "../api/dds-pim/PubSubWriter.hpp" #include "../api/dds-pim/PubSubWriterReader.hpp" @@ -899,6 +900,88 @@ TEST(DDSBasic, max_output_message_size_writer) } +/** + * @test This is a regression test for Redmine Issue 21293. + * The destruction among intra-process participants should be correctly performed. + * local_reader() has to return a valid pointer. + * + */ +TEST(DDSBasic, successful_destruction_among_intraprocess_participants) +{ + // Set intraprocess delivery to full + fastrtps::LibrarySettingsAttributes library_settings; + library_settings = fastrtps::xmlparser::XMLProfileManager::library_settings(); + auto old_library_settings = library_settings; + library_settings.intraprocess_delivery = fastrtps::INTRAPROCESS_FULL; + fastrtps::xmlparser::XMLProfileManager::library_settings(library_settings); + + { + auto participant_1 = std::make_shared>(1u, 1u, 1u, 1u); + + ASSERT_TRUE(participant_1->init_participant()); + participant_1->pub_topic_name(TEST_TOPIC_NAME); + ASSERT_TRUE(participant_1->init_publisher(0u)); + participant_1->sub_topic_name(TEST_TOPIC_NAME + "_Return"); + ASSERT_TRUE(participant_1->init_subscriber(0u)); + + std::vector>> reception_participants; + + size_t num_reception_participants = 50; + + for (size_t i = 0; i < num_reception_participants; i++) + { + reception_participants.push_back(std::make_shared>(1u, 1u, 1u, 1u)); + ASSERT_TRUE(reception_participants.back()->init_participant()); + reception_participants.back()->sub_topic_name(TEST_TOPIC_NAME); + ASSERT_TRUE(reception_participants.back()->init_subscriber(0u)); + reception_participants.back()->pub_topic_name(TEST_TOPIC_NAME + "_Return"); + ASSERT_TRUE(reception_participants.back()->init_publisher(0u)); + } + + participant_1->wait_discovery(std::chrono::seconds::zero(), (uint8_t)num_reception_participants, true); + + participant_1->pub_wait_discovery((unsigned int)num_reception_participants); + participant_1->sub_wait_discovery((unsigned int)num_reception_participants); + + auto data_12 = default_helloworld_data_generator(); + + std::thread p1_thread([&participant_1, &data_12]() + { + auto data_size = data_12.size(); + for (size_t i = 0; i < data_size; i++) + { + participant_1->send_sample(data_12.back()); + data_12.pop_back(); + } + }); + + std::vector reception_threads; + reception_threads.reserve(num_reception_participants); + for (auto& reception_participant : reception_participants) + { + reception_threads.emplace_back([&reception_participant]() + { + auto data_21 = default_helloworld_data_generator(); + for (auto& data : data_21) + { + reception_participant->send_sample(data); + } + + reception_participant.reset(); + }); + } + + p1_thread.join(); + for (auto& rec_thread : reception_threads) + { + rec_thread.join(); + } + } + + // Restore library settings + fastrtps::xmlparser::XMLProfileManager::library_settings(old_library_settings); +} + } // namespace dds } // namespace fastdds } // namespace eprosima diff --git a/test/mock/rtps/RTPSReader/fastdds/rtps/reader/RTPSReader.h b/test/mock/rtps/RTPSReader/fastdds/rtps/reader/RTPSReader.h index 77743ee37d8..1cdfb9194ca 100644 --- a/test/mock/rtps/RTPSReader/fastdds/rtps/reader/RTPSReader.h +++ b/test/mock/rtps/RTPSReader/fastdds/rtps/reader/RTPSReader.h @@ -156,6 +156,10 @@ class RTPSReader : public Endpoint return true; } + virtual void local_actions_on_reader_removed() + { + } + virtual bool change_removed_by_history( CacheChange_t*, WriterProxy*) diff --git a/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h b/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h index dc3be493e06..45318bc561a 100644 --- a/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h +++ b/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h @@ -22,14 +22,14 @@ #define _FASTDDS_RTPS_READERLOCATOR_H_ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC -#include -#include +#include #include +#include +#include #include #include -#include #include - +#include namespace eprosima { namespace fastrtps { @@ -204,9 +204,9 @@ class ReaderLocator : public RTPSMessageSenderInterface return false; } - RTPSReader* local_reader() + LocalReaderPointer::Instance local_reader() { - return nullptr; + return LocalReaderPointer::Instance(std::shared_ptr()); } bool is_datasharing_reader() const diff --git a/test/unittest/utils/CMakeLists.txt b/test/unittest/utils/CMakeLists.txt index f528cf6d0f7..a79af73b556 100644 --- a/test/unittest/utils/CMakeLists.txt +++ b/test/unittest/utils/CMakeLists.txt @@ -80,6 +80,9 @@ set(SYSTEMINFOTESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp) +set(REF_COUNTED_POINTER_TESTS_SOURCE + RefCountedPointerTests.cpp) + include_directories(mock/) add_executable(StringMatchingTests ${STRINGMATCHINGTESTS_SOURCE}) @@ -179,6 +182,12 @@ target_include_directories(SharedMutexTests PUBLIC ${PROJECT_SOURCE_DIR}/include target_link_libraries(SharedMutexTests PUBLIC GTest::gtest) gtest_discover_tests(SharedMutexTests) +add_executable(RefCountedPointerTests ${REF_COUNTED_POINTER_TESTS_SOURCE}) +target_include_directories(RefCountedPointerTests PRIVATE + ${PROJECT_SOURCE_DIR}/include ${PROJECT_SOURCE_DIR}/src/cpp) +target_link_libraries(RefCountedPointerTests PUBLIC GTest::gtest) +gtest_discover_tests(RefCountedPointerTests) + ############################################################################### # Necessary files ############################################################################### diff --git a/test/unittest/utils/RefCountedPointerTests.cpp b/test/unittest/utils/RefCountedPointerTests.cpp new file mode 100644 index 00000000000..705fec623e7 --- /dev/null +++ b/test/unittest/utils/RefCountedPointerTests.cpp @@ -0,0 +1,183 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include + +using namespace std; + +namespace eprosima { +namespace fastrtps { + +struct EntityMock +{ + EntityMock() + : local_pointer(std::make_shared>(this)) + , n_times_data_processed(0) + { + } + + std::shared_ptr> get_refcounter_pointer() const + { + return local_pointer; + } + + void dummy_process_data( + void*) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ++n_times_data_processed; + } + + void destroy() + { + local_pointer->deactivate(); + } + + std::shared_ptr> local_pointer; + std::atomic n_times_data_processed; +}; + +enum class RoutineStatus +{ + NON_INITIALIZED, + SUCCESS, + FAILURE +}; + +struct EntityOwner +{ + EntityOwner( + const EntityMock& entity) + : entity_ptr(entity.get_refcounter_pointer()) + , routine_status(RoutineStatus::NON_INITIALIZED) + { + } + + void spawn_routine() + { + th = std::thread([&]() + { + RefCountedPointer::Instance entity_instance(entity_ptr); + if (entity_instance) + { + entity_instance->dummy_process_data(nullptr); + routine_status = RoutineStatus::SUCCESS; + } + else + { + routine_status = RoutineStatus::FAILURE; + } + }); + } + + void join() + { + th.join(); + } + + std::shared_ptr> entity_ptr; + RoutineStatus routine_status; + std::thread th; +}; + +class RefCountedPointerTests : public ::testing::Test +{ +public: + + static constexpr std::size_t n_owners = 5; + + void SetUp() override + { + owners_.reserve(5); + for (std::size_t i = 0; i < n_owners; ++i) + { + owners_.emplace_back(entity_); + } + } + + void TearDown() override + { + for (std::size_t i = 0; i < n_owners; ++i) + { + owners_[i].join(); + } + } + +protected: + + EntityMock entity_; + std::vector owners_; +}; + +TEST_F(RefCountedPointerTests, refcountedpointer_inactive) +{ + // Make the first owner spawn a routine + owners_[0].spawn_routine(); + + // Wait for the routine to finish + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + ASSERT_EQ(owners_[0].routine_status, RoutineStatus::SUCCESS); + + // Destroy the entity + entity_.destroy(); + + // Make the rest of the owners spawn a routine + for (std::size_t i = 1; i < n_owners; ++i) + { + owners_[i].spawn_routine(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // The routine should fail + ASSERT_EQ(owners_[i].routine_status, RoutineStatus::FAILURE); + } + + // The entity should have been processed only once + ASSERT_EQ(1, entity_.n_times_data_processed); +} + +TEST_F(RefCountedPointerTests, refcounterpointer_deactivate_waits_for_no_references) +{ + // Spawn some routines + for (std::size_t i = 0; i < n_owners; ++i) + { + owners_[i].spawn_routine(); + } + + // Ensure owners' routines have started + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + auto t0 = std::chrono::steady_clock::now(); + entity_.destroy(); + auto elapsed = std::chrono::duration_cast(std::chrono::steady_clock::now() - t0).count(); + + std::cout << "Elapsed time: " << elapsed << " ms" << std::endl; + ASSERT_GT(elapsed, 50); // destroy should have taken at least 50 ms. Being strict it should be 80, but we allow some margin + ASSERT_EQ(entity_.n_times_data_processed, 5); +} + +} // namespace fastdds +} // namespace eprosima + +int main( + int argc, + char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}