From f15ca928e559f721486b123cd6c433b41242017c Mon Sep 17 00:00:00 2001 From: albtam Date: Wed, 10 Apr 2024 21:37:21 +0200 Subject: [PATCH] Address reviewer's comments --- .../buffer/include/iox/detail/spsc_sofi.hpp | 12 +++--- .../buffer/include/iox/detail/spsc_sofi.inl | 37 ++++++++++--------- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp index 3ddff08f1e..bcb8d7f3da 100644 --- a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp +++ b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp @@ -31,13 +31,12 @@ namespace iox { namespace concurrent { -/// @brief Thread safe (without locks) single producer and single consumer queue with a safe +/// @brief Thread safe lock-free single producer and single consumer queue with a safe /// overflowing behavior /// @note When SpscSoFi is full and a sender tries to push, the data at the current read pos will be /// returned. This behavior mimics a FiFo queue but prevents resource leaks when pushing into /// a full SpscSoFi. /// SpscSoFi is especially designed to provide fixed capacity storage. -/// SpscSoFi only allocates memory when created, capacity can be adjusted explicitly. /// It's an expected behavior that when push/pop are called concurrently and SpscSoFi is full, as /// many elements as specified with 'CapacityValue' can be removed /// @param[in] ValueType DataType to be stored, must be trivially copyable @@ -45,9 +44,8 @@ namespace concurrent template class SpscSofi { - // We need to make sure that the copy operation doesn't have any logic static_assert(std::is_trivially_copyable::value, - "SpscSofi can handle only trivially copyable data types"); + "SpscSofi can only handle trivially copyable data types since 'memcpy' is used internally"); /// @brief Check if Atomic integer is lockfree on platform /// ATOMIC_INT_LOCK_FREE = 2 - is always lockfree /// ATOMIC_INT_LOCK_FREE = 1 - is sometimes lockfree @@ -86,8 +84,8 @@ class SpscSofi // |--A--|--B--|--C--| // ^ // w=3, r=0, - // 4. Then increment the read position - // |--A--|--B--|--C--| + // 4. Then increment the read position and return the overflowing 'A' + // |-----|--B--|--C--| // ^ ^ // w=3 r=1 // 5. The producer thread is suspended, the consumer thread pops a value @@ -118,7 +116,7 @@ class SpscSofi /// @note restricted thread safe: can only be called from one thread. The authorization to push into the /// SpscSofi can be transferred to another thread if appropriate synchronization mechanisms are used. /// @return return true if push was successful else false. - /// @code + /// @remarks /// 1. SpscSofi is empty |-----|-----| /// 2. push an element |--A--|-----| /// 3. push an element |--A--|--B--| diff --git a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl index 29015fa076..99d31f7087 100644 --- a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl +++ b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl @@ -101,20 +101,6 @@ inline bool SpscSofi::pop(ValueType& valueOut) noexcep { // Memory synchronization is not needed but we need to prevent operation reordering to avoid the following scenario // where the CPU reorder the load of m_readPosition and m_writePosition: - // 0. Initial situation (the queue is full) - // |----|--B--|--C--| - // ^ ^ - // w=3 r=1 - // 1. The consumer thread loads m_writePosition => 3 - // |----|--B--|--C--| - // ^ ^ - // w=3 r=1 - // 2. The producer thread pushes two times - // |--D--|--E--|-----| - // ^ ^ - // r=3 w=5 - // 3. The consumer thread loads m_readPosition => 3. The pop method returns false - // => Whereas the queue was full, pop returned false giving the impression that the queue if empty uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire); uint64_t nextReadPosition{0}; @@ -128,6 +114,21 @@ inline bool SpscSofi::pop(ValueType& valueOut) noexcep { nextReadPosition = currentReadPosition; popWasSuccessful = false; + // We cannot just return false (i.e. we need to continue the loop) to avoid the following situation: + // 0. Initial situation (the queue is full) + // |----|--B--|--C--| + // ^ ^ + // w=3 r=1 + // 1. The consumer thread loads m_writePosition => 3 + // |----|--B--|--C--| + // ^ ^ + // w=3 r=1 + // 2. The producer thread pushes two times + // |--D--|--E--|-----| + // ^ ^ + // r=3 w=5 + // 3. The consumer thread loads m_readPosition => 3 The pop method returns false + // => Whereas the queue was full, pop returned false giving the impression that the queue if empty } else { @@ -138,9 +139,9 @@ inline bool SpscSofi::pop(ValueType& valueOut) noexcep } // We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted - // While memory synchronization is not needed for m_readPosition we need to have a - // corresponding m_readPosition.store(release) to the m_readPosition.load(acquire) in the - // push method + // While memory synchronization is not needed for m_readPosition, we need to ensure that the + // 'memcpy' happens before the CAS operation. + // Corresponding m_readPosition.load(acquire) is in the push method // ============================================= // ABA problem: m_readPosition is an uint64_t. Assuming a thread is pushing at a rate of 1 GHz // while this thread is blocked, we would still need more than 500 years to overflow @@ -185,7 +186,7 @@ inline bool SpscSofi::push(const ValueType& valueIn, V m_writePosition.store(nextWritePosition, std::memory_order_release); // While memory synchronization is not needed with m_readPosition, we need - // memory_order_acquire to avoid the reordering of the operation + // memory_order_acquire to avoid the reordering of the operation. uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire); // Check if queue is full: since we have an extra element (INTERNAL_CAPACITY_ADD_ON), we need to