Skip to content

Commit

Permalink
Fix destruction data-race on participant removal in intra-process (#5034
Browse files Browse the repository at this point in the history
) (#5367)

* Fix destruction data-race on participant removal in intra-process (#5034)

* Refs #21293: Add BB test

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Reinforce test to fail more frequently

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Add RefCountedPointer.hpp to utils

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Add unittests for RefCountedPointer

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: LocalReaderPointer.hpp

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: BaseReader aggregates LocalReaderPointer

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: ReaderLocator aggregates LocalReaderPointer

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: RTPSDomainImpl::find_local_reader returns a sared_ptr<LocalReaderPointer> and properly calls local_actions_on_reader_removed()

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: RTPSWriters properly using LocalReaderPointer::Instance when accessing local reader

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Linter

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Fix windows warnings

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Address Miguel's review

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Apply last comment

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: NIT

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
(cherry picked from commit 456e45f)

# Conflicts:
#	include/fastdds/rtps/writer/ReaderLocator.h
#	include/fastdds/rtps/writer/ReaderProxy.h
#	src/cpp/rtps/RTPSDomain.cpp
#	src/cpp/rtps/RTPSDomainImpl.hpp
#	src/cpp/rtps/participant/RTPSParticipantImpl.cpp
#	src/cpp/rtps/participant/RTPSParticipantImpl.h
#	src/cpp/rtps/reader/BaseReader.cpp
#	src/cpp/rtps/reader/BaseReader.hpp
#	src/cpp/rtps/writer/ReaderLocator.cpp
#	src/cpp/rtps/writer/StatefulWriter.cpp
#	src/cpp/rtps/writer/StatelessWriter.cpp
#	test/blackbox/common/DDSBlackboxTestsBasic.cpp
#	test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h
#	test/unittest/utils/CMakeLists.txt

* Solve conflicts

Signed-off-by: Mario Dominguez <[email protected]>

* Apply Miguel's suggestions: make LocalReaderPointer inherit RefCounterPointer<> and add DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

Signed-off-by: Mario Dominguez <[email protected]>

* Apply NIT

Signed-off-by: Mario Dominguez <[email protected]>

* Construct LocalReaderPointer in RTPSReader

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
Co-authored-by: Mario Domínguez López <[email protected]>
Co-authored-by: Mario Dominguez <[email protected]>
  • Loading branch information
3 people authored Nov 20, 2024
1 parent 725e8ab commit 28e2ce8
Show file tree
Hide file tree
Showing 18 changed files with 667 additions and 68 deletions.
50 changes: 50 additions & 0 deletions include/fastdds/rtps/reader/LocalReaderPointer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file LocalReaderPointer.hpp
*/

#ifndef FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP
#define FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP

#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <fastrtps/utils/RefCountedPointer.hpp>

namespace eprosima {
namespace fastrtps {
namespace rtps {

class RTPSReader;

struct LocalReaderPointer : public RefCountedPointer<RTPSReader>
{
LocalReaderPointer(
RTPSReader* ptr)
: RefCountedPointer<RTPSReader>(ptr)
{
}

virtual ~LocalReaderPointer() = default;

};

} // namespace rtps
} // namespace fastdds
} // namespace eprosima

#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#endif // FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP
18 changes: 18 additions & 0 deletions include/fastdds/rtps/reader/RTPSReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct CacheChange_t;
struct ReaderHistoryState;
class WriterProxyData;
class IDataSharingListener;
struct LocalReaderPointer;

/**
* Class RTPSReader, manages the reception of data from its matched writers.
Expand Down Expand Up @@ -175,6 +176,11 @@ class RTPSReader
const SequenceNumberSet_t& gapList,
fastdds::rtps::VendorId_t origin_vendor_id = c_VendorId_Unknown) = 0;

/**
* @brief Waits for not being referenced/used by any other entity.
*/
virtual void local_actions_on_reader_removed();

/**
* Method to indicate the reader that some change has been removed due to HistoryQos requirements.
* @param change Pointer to the CacheChange_t.
Expand Down Expand Up @@ -485,6 +491,14 @@ class RTPSReader
bool is_datasharing_compatible_with(
const WriterProxyData& wdata);

/**
* @brief Retrieves the local pointer to this reader
* to be used by other local entities.
*
* @return Local pointer to this reader.
*/
std::shared_ptr<LocalReaderPointer> get_local_pointer();

//!ReaderHistory
ReaderHistory* mp_history;
//!Listener
Expand All @@ -495,6 +509,10 @@ class RTPSReader
bool m_acceptMessagesFromUnkownWriters;
//!Trusted writer (for Builtin)
EntityId_t m_trustedWriterEntityId;

/// RefCountedPointer of this instance.
std::shared_ptr<LocalReaderPointer> local_ptr_;

//!Expects Inline Qos.
bool m_expectsInlineQos;

Expand Down
11 changes: 6 additions & 5 deletions include/fastdds/rtps/writer/ReaderLocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <vector>
#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/common/LocatorSelectorEntry.hpp>
#include <fastdds/rtps/common/SequenceNumber.h>
#include <fastdds/rtps/messages/RTPSMessageGroup.h>
#include <fastdds/rtps/common/LocatorSelectorEntry.hpp>
#include <fastdds/rtps/reader/LocalReaderPointer.hpp>

namespace eprosima {
namespace fastrtps {
Expand Down Expand Up @@ -69,10 +70,10 @@ class ReaderLocator : public RTPSMessageSenderInterface
return is_local_reader_;
}

RTPSReader* local_reader();
LocalReaderPointer::Instance local_reader();

void local_reader(
RTPSReader* local_reader)
std::shared_ptr<LocalReaderPointer> local_reader)
{
local_reader_ = local_reader;
}
Expand Down Expand Up @@ -260,7 +261,7 @@ class ReaderLocator : public RTPSMessageSenderInterface
LocatorSelectorEntry async_locator_info_;
bool expects_inline_qos_;
bool is_local_reader_;
RTPSReader* local_reader_;
std::shared_ptr<LocalReaderPointer> local_reader_;
std::vector<GuidPrefix_t> guid_prefix_as_vector_;
std::vector<GUID_t> guid_as_vector_;
IDataSharingNotifier* datasharing_notifier_;
Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class ReaderProxy
* Get the local reader on the same process (if any).
* @return The local reader on the same process.
*/
inline RTPSReader* local_reader()
inline LocalReaderPointer::Instance local_reader()
{
return locator_info_.local_reader();
}
Expand Down
222 changes: 222 additions & 0 deletions include/fastrtps/utils/RefCountedPointer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file RefCountedPointer.hpp
*/

#ifndef UTILS__REFCOUNTEDPOINTER_HPP
#define UTILS__REFCOUNTEDPOINTER_HPP

#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <atomic>
#include <cassert>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <mutex>

namespace eprosima {
namespace fastrtps {

/**
* @brief Class to manage a local pointer with reference counting.
*
* It is similar to std::shared_ptr, but designed for cases where
* a shared pointer cannot be used due to API restrictions.
*
* USAGE:
* - On T class:
* - Add a shared_ptr<RefCountedPointer<T>> local_ptr_ member.
* - Call local_ptr_->deactivate() before destroying T.
*
* - On classes that need to use a pointer to T:
* - Keep a copy of the shared_ptr<RefCountedPointer<T>>.
* - Whenever you need to access T:
* RefCountedPointer<T>::Instance instance(local_ptr_)
* if (instance)
* {
* ptr->method();
* }
*/
template<typename T>
class RefCountedPointer
{
public:

class Instance;

/**
* @brief Explicit constructor.
* @param ptr Pointer to manage.
*
* @pre nullptr != ptr. We must ensure that the pointer we
* are manaing is valid.
*/
explicit RefCountedPointer(
T* ptr)
: ptr_(ptr)
, is_active_(true)
, instances_(0)
{
assert(nullptr != ptr);
}

~RefCountedPointer() = default;

// Non-copyable and non-movable
RefCountedPointer(
const RefCountedPointer&) = delete;
RefCountedPointer& operator =(
const RefCountedPointer&) = delete;
RefCountedPointer(
RefCountedPointer&&) = delete;
RefCountedPointer& operator =(
RefCountedPointer&&) = delete;

/**
* @brief Class to manage the local pointer instance.
* It will increase the reference count on construction and decrease
* it on destruction. Provides a facade to access the pointee.
*/
class Instance
{
public:

/**
* @brief Constructor.
* @param parent Shared pointer reference to its RefCountedPointer.
*/
explicit Instance(
const std::shared_ptr<RefCountedPointer<T>>& parent)
: parent_(parent)
, ptr_(parent && parent->is_active_ ? parent->ptr_ : nullptr)
{
if (parent_)
{
parent_->inc_instances();
}
}

/**
* @brief Destructor.
*/
~Instance()
{
if (parent_)
{
parent_->dec_instances();
}
}

// Non-copyable, default movable
Instance(
const Instance&) = delete;
Instance& operator =(
const Instance&) = delete;
Instance(
Instance&&) = default;
Instance& operator =(
Instance&&) = default;

/**
* @brief operator to check if the pointer is valid.
*/
operator bool() const
{
return nullptr != ptr_;
}

/**
* @brief operator to call the T methods.
*/
T* operator ->() const
{
assert(nullptr != ptr_);
return ptr_;
}

private:

std::shared_ptr<RefCountedPointer<T>> parent_;
T* const ptr_;
};

/**
* @brief Ensure no more valid local pointer instances are created, and wait for current ones to die.
*/
void deactivate()
{
std::unique_lock<std::mutex> lock(mutex_);
is_active_ = false;
cv_.wait(lock, [this]() -> bool
{
return instances_ == 0;
});
}

private:

/**
* @brief Increase the reference count.
*/
void inc_instances()
{
std::unique_lock<std::mutex> lock(mutex_);
++instances_;
}

/**
* @brief Decrease the reference count.
*/
void dec_instances()
{
std::unique_lock<std::mutex> lock(mutex_);
--instances_;
if (instances_ == 0)
{
cv_.notify_one();
}
}

/**
* Pointer to the managed object.
*/
T* const ptr_;

/**
* Indicates whether the pointee is still alive
* and accessing the pointer is valid.
*/
std::atomic<bool> is_active_;

/**
* Protections for the number of instances.
*/
mutable std::mutex mutex_;
std::condition_variable cv_;

/**
* Number of active instances (currently using the pointee).
*/
size_t instances_;
};

} // namespace fastdds
} // namespace eprosima

#endif // DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#endif // UTILS__REFCOUNTEDPOINTER_HPP
5 changes: 3 additions & 2 deletions src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/history/WriterHistory.h>
#include <fastdds/rtps/participant/RTPSParticipant.h>
#include <fastdds/rtps/reader/LocalReaderPointer.hpp>
#include <fastdds/rtps/reader/RTPSReader.h>
#include <fastdds/rtps/writer/RTPSWriter.h>

Expand Down Expand Up @@ -777,7 +778,7 @@ RTPSParticipantImpl* RTPSDomainImpl::find_local_participant(
return nullptr;
}

RTPSReader* RTPSDomainImpl::find_local_reader(
std::shared_ptr<LocalReaderPointer> RTPSDomainImpl::find_local_reader(
const GUID_t& reader_guid)
{
auto instance = get_instance();
Expand All @@ -791,7 +792,7 @@ RTPSReader* RTPSDomainImpl::find_local_reader(
}
}

return nullptr;
return std::shared_ptr<LocalReaderPointer>(nullptr);
}

RTPSWriter* RTPSDomainImpl::find_local_writer(
Expand Down
Loading

0 comments on commit 28e2ce8

Please sign in to comment.