From aaf9b1e3af5365754b84f07ac2da4fe23493c733 Mon Sep 17 00:00:00 2001 From: albtam Date: Tue, 5 Mar 2024 08:58:30 +0100 Subject: [PATCH 1/8] iox-#2177 Update SPSC SoFi - add comments to clarify concurrent behavior - fix memory orders --- .../buffer/include/iox/detail/spsc_sofi.hpp | 169 ++++++------ .../buffer/include/iox/detail/spsc_sofi.inl | 241 +++++++++++------- .../moduletests/test_concurrent_spsc_sofi.cpp | 50 +--- 3 files changed, 245 insertions(+), 215 deletions(-) diff --git a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp index 7b60a8b1c5..2e06a8e35e 100644 --- a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp +++ b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp @@ -25,28 +25,36 @@ #include #include +#include namespace iox { namespace concurrent { -/// @brief -/// Thread safe producer and consumer queue with a safe overflowing behavior. -/// SpscSofi is designed in a FIFO Manner but prevents data loss when pushing into -/// a full SpscSofi. When SpscSofi is full and a Sender tries to push, the data at the -/// current read position will be returned. SpscSofi is threadsafe without using -/// locks. When the buffer is filled, new data is written starting at the -/// beginning of the buffer and overwriting the old.The SpscSofi is especially -/// designed to provide fixed capacity storage. When its capacity is exhausted, -/// newly inserted elements will cause elements either at the beginning -/// to be overwritten.The SpscSofi only allocates memory when -/// created , capacity can be is adjusted explicitly. -/// +/// @brief Thread safe (without locks) single producer and single consumer queue with a safe +/// overflowing behavior +/// @note When SpscSoFi is full and a sender tries to push, the data at the current read pos will be +/// returned. This behavior mimics a FiFo queue but prevents resource leaks when pushing into +/// a full SpscSoFi. +/// SpscSoFi is especially designed to provide fixed capacity storage. +/// SpscSoFi only allocates memory when created, capacity can be adjusted explicitly. +/// @example +/// It's an expected behavior that when push/pop are called concurrently and SpscSoFi is full, as +/// many elements as specified with 'CapacityValue' can be removed +/// 0: Initial situation: +/// |--A--|--B--| +/// 1. Thread 1 pushes a new element. Since it is an overflowing situation, the overwritten value is +/// removed and returned to the caller +/// |--A--|--B--| +/// 2. Right before push() returns, pop() detects that an element is about to be removed, and remove +/// the next element +/// |--C--|----| /// @param[in] ValueType DataType to be stored, must be trivially copyable /// @param[in] CapacityValue Capacity of the SpscSofi template class SpscSofi { + // We need to make sure that the copy operation doesn't have any logic static_assert(std::is_trivially_copyable::value, "SpscSofi can handle only trivially copyable data types"); /// @brief Check if Atomic integer is lockfree on platform @@ -55,87 +63,89 @@ class SpscSofi /// ATOMIC_INT_LOCK_FREE = 0 - is never lockfree static_assert(2 <= ATOMIC_INT_LOCK_FREE, "SpscSofi is not able to run lock free on this data type"); - /// @brief Internal size needs to be bigger than the size desirred by the user - /// This is because of buffer empty detection and overflow handling - static constexpr uint32_t INTERNAL_SIZE_ADD_ON = 1; - - /// @brief This is the resulting internal size on creation - static constexpr uint32_t INTERNAL_SPSC_SOFI_SIZE = CapacityValue + INTERNAL_SIZE_ADD_ON; + // To ensure a consumer gets at least the amount of capacity of data when a queue is full, an additional free + // slot (add-on) is required. + // ======================================================================== + // Consider the following scenario when there is no "capacity add-on": + // 1. CapacityValue = 2 + // |--A--|--B--| + // ^ + // w=2, r=0 + // 2. We want to push a new element + // 3. Advance the read position (this effectively reduces the capacity and is the reason the internal capacity + // needs to be larger; the consumer cannot pop out CAPACITY amount of samples even though the queue is full if + // the push thread is suspended right after this operation) + // |--A--|--B--| + // ^ ^ + // w=2 r=1 + // 4. Take overflow data + // |-----|--B--| + // ^ ^ + // w=2 r=1 + // 5. Write new data + // |--C--|--B--| + // ^ ^ + // w=2 r=1 + // 6. Advance next write position + // |--C--|--B--| + // ^ + // w=3, r=1 + // ======================================================================== + // With "capacity add-on" + // 1. CapacityValue = 2, InternalCapacity = 3 + // |--A--|--B--|----| + // ^ ^ + // r=0 w=2 + // 2. We want to push a new element + // 3. We first write at index 2 % capacity + // |--A--|--B--|--C--| + // ^ + // w=3, r=0, + // 2. We want to push a new element: + // 4. We detect that the queue if full so we retrieve the value pointed by the read pointer: the value A is + // returned + // |-(A)-|--B--|--C--| + // ^ ^ + // w=3 r=1 + // ======================================================================== + static constexpr uint32_t INTERNAL_CAPACITY_ADDON = 1; + + /// @brief Internal capacity of the queue at creation + static constexpr uint32_t INTERNAL_SPSC_SOFI_CAPACITY = CapacityValue + INTERNAL_CAPACITY_ADDON; public: /// @brief default constructor which constructs an empty sofi SpscSofi() noexcept = default; - /// @brief pushs an element into SpscSofi. if SpscSofi is full the oldest data will be + /// @brief push an element into sofi. if sofi is full the oldest data will be /// returned and the pushed element is stored in its place instead. - /// @param[in] valueIn value which should be stored - /// @param[out] valueOut if SpscSofi is overflowing the value of the overridden value + /// @param[in] value_in value which should be stored + /// @param[out] value_out if sofi is overflowing the value of the overridden value /// is stored here - /// @concurrent restricted thread safe: single pop, single push no - /// push calls from multiple contexts - /// @return return true if push was sucessfull else false. + /// @note restricted thread safe: can only be called from one thread. The authorization to push into the + /// SpscSofi can be transferred to another thread if appropriate synchronization mechanisms are used. + /// @return return true if push was successful else false. /// @code - /// (initial situation, SpscSofi is FULL) - /// Start|-----A-------| - /// |-----B-------| - /// |-----C-------| - /// |-----D-------| - /// - /// - /// (calling push with data ā€™Eā€™) - /// Start|-----E-------| - /// |-----A-------| - /// |-----B-------| - /// |-----C-------| - /// (ā€™Dā€™ is returned as valueOut) - /// - /// ################################################################### - /// - /// (if SpscSofi is not FULL , calling push() add new data) - /// Start|-------------| - /// |-------------| ( Initial SpscSofi ) - /// (push() Called two times) - /// - /// |-------------| - /// (New Data) - /// |-------------| - /// (New Data) - /// @endcode + /// 1. sofi is empty |-----|-----| + /// 2. push an element |--A--|-----| + /// 3. push an element |--A--|--B--| + /// 5. sofi is full + /// 6. push an element |--C--|--B--| -> value_out is set to 'A' bool push(const ValueType& valueIn, ValueType& valueOut) noexcept; /// @brief pop the oldest element /// @param[out] valueOut storage of the pop'ed value - /// @concurrent restricted thread safe: single pop, single push no - /// pop or popIf calls from multiple contexts + /// @concurrent restricted thread safe: can only be called from one thread. The authorization to pop from the + /// SpscSofi can be transferred to another thread if appropriate synchronization mechanisms are used. /// @return false if SpscSofi is empty, otherwise true bool pop(ValueType& valueOut) noexcept; - /// @brief conditional pop call to provide an alternative for a peek - /// and pop approach. If the verificator returns true the - /// peeked element is returned. - /// @param[out] valueOut storage of the pop'ed value - /// @param[in] verificator callable of type bool(const ValueType& peekValue) - /// which takes the value which would be pop'ed as argument and returns - /// true if it should be pop'ed, otherwise false - /// @code - /// int limit = 7128; - /// mysofi.popIf(value, [=](const ValueType & peek) - /// { - /// return peek < limit; // pop only when peek is smaller than limit - /// } - /// ); // pop's a value only if it is smaller than 9012 - /// @endcode - /// @concurrent restricted thread safe: single pop, single push no - /// pop or popIf calls from multiple contexts - /// @return false if SpscSofi is empty or when verificator returns false, otherwise true - template - bool popIf(ValueType& valueOut, const Verificator_T& verificator) noexcept; - /// @brief returns true if SpscSofi is empty, otherwise false /// @note the use of this function is limited in the concurrency case. if you /// call this and in another thread pop is called the result can be out /// of date as soon as you require it - /// @concurrent unrestricted thread safe + /// @concurrent unrestricted thread safe (the result might already be outdated when used). Expected to be called + /// from either the producer or the consumer thread but not from a third thread bool empty() const noexcept; /// @brief resizes SpscSofi @@ -150,15 +160,16 @@ class SpscSofi uint64_t capacity() const noexcept; /// @brief returns the current size of SpscSofi - /// @concurrent unrestricted thread safe + /// @concurrent unrestricted thread safe (the result might already be outdated when used). Expected to be called + /// from either the producer or the consumer thread but not from a third thread uint64_t size() const noexcept; private: - UninitializedArray m_data; - uint64_t m_size = INTERNAL_SPSC_SOFI_SIZE; + std::pair getReadWritePositions() const noexcept; - /// @brief the write/read pointers are "atomic pointers" so that they are not - /// reordered (read or written too late) + private: + UninitializedArray m_data; + uint64_t m_size = INTERNAL_SPSC_SOFI_CAPACITY; Atomic m_readPosition{0}; Atomic m_writePosition{0}; }; diff --git a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl index 146ac726dd..9e9a4a8dc7 100644 --- a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl +++ b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl @@ -27,11 +27,11 @@ namespace concurrent template inline uint64_t SpscSofi::capacity() const noexcept { - return m_size - INTERNAL_SIZE_ADD_ON; + return m_size - INTERNAL_CAPACITY_ADDON; } template -inline uint64_t SpscSofi::size() const noexcept +inline std::pair SpscSofi::getReadWritePositions() const noexcept { uint64_t readPosition{0}; uint64_t writePosition{0}; @@ -39,17 +39,44 @@ inline uint64_t SpscSofi::size() const noexcept { readPosition = m_readPosition.load(std::memory_order_relaxed); writePosition = m_writePosition.load(std::memory_order_relaxed); + + // The while loop is needed to avoid the following scenarios: + // 1. Implementation to get the size: size = m_writePosition - m_readPosition; + // - consumer reads m_writePosition + // - consumer thread gets suspended + // - producer pushes 100 times + // - consumer reads m_readPosition + // => m_readPosition will be past m_writePosition and one would get a negative size (or the positive unsigned + // equivalent) + // 2. Implementation to get the size: readPosition = m_readPosition; size = m_writePosition - readPosition; + // - consumer stores m_readPosition in readPosition + // - consumer thread gets suspended + // - producer pushes 100 times + // - consumer reads m_writePosition + // => m_writePosition will be past readPosition + Capacity and one would get a size which is much larger than + // the capacity + // =========================================== + // Note: it is still possible to return a size that is not up-to-date anymore but at least + // the returned size is logically valid } while (m_writePosition.load(std::memory_order_relaxed) != writePosition || m_readPosition.load(std::memory_order_relaxed) != readPosition); + return {readPosition, writePosition}; +} + + +template +inline uint64_t SpscSofi::size() const noexcept +{ + auto [readPosition, writePosition] = getReadWritePositions(); return writePosition - readPosition; } template inline bool SpscSofi::setCapacity(const uint64_t newSize) noexcept { - uint64_t newInternalSize = newSize + INTERNAL_SIZE_ADD_ON; - if (empty() && (newInternalSize <= INTERNAL_SPSC_SOFI_SIZE)) + uint64_t newInternalSize = newSize + INTERNAL_CAPACITY_ADDON; + if (empty() && (newInternalSize <= INTERNAL_SPSC_SOFI_CAPACITY)) { m_size = newInternalSize; @@ -65,78 +92,58 @@ inline bool SpscSofi::setCapacity(const uint64_t newSi template inline bool SpscSofi::empty() const noexcept { - uint64_t currentReadPosition{0}; - bool isEmpty{false}; - - do - { - /// @todo iox-#1695 read before write since the writer increments the aba counter!!! - /// @todo iox-#1695 write doc with example!!! - currentReadPosition = m_readPosition.load(std::memory_order_acquire); - uint64_t currentWritePosition = m_writePosition.load(std::memory_order_acquire); - - isEmpty = (currentWritePosition == currentReadPosition); - // we need compare without exchange - } while (!(currentReadPosition == m_readPosition.load(std::memory_order_acquire))); - - return isEmpty; + auto [readPost, writePos] = getReadWritePositions(); + return readPost == writePos; } template inline bool SpscSofi::pop(ValueType& valueOut) noexcept { - return popIf(valueOut, [](ValueType) { return true; }); -} - -template -template -inline bool SpscSofi::popIf(ValueType& valueOut, const Verificator_T& verificator) noexcept -{ - uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire); - uint64_t nextReadPosition{0}; + // Memory synchronization is not needed but we need to prevent operation reordering to avoid the following scenario + // where the CPU reorder the load of m_readPosition and m_writePosition: + // 0. Initial situation (the queue is full) + // |----|--B--|--C--| + // ^ ^ + // w=3 r=1 + // 1. The consumer thread loads m_writePosition => 3 + // |----|--B--|--C--| + // ^ ^ + // w=3 r=1 + // 2. The producer thread pushes two times + // |--D--|--E--|-----| + // ^ ^ + // r=3 w=5 + // 3. The consumer thread loads m_readPosition => 3. The pop method returns false + // => Whereas the queue was full, pop returned false giving the impression that the queue if empty + // TODO: To which release/store statement does it correspond? + uint64_t currentReadPos = m_readPosition.load(std::memory_order_acquire); - bool popWasSuccessful{true}; do { - if (currentReadPosition == m_writePosition.load(std::memory_order_acquire)) + // SYNC POINT READ: m_data + // See explanation of the corresponding synchronization point in push() + if (currentReadPos == m_writePosition.load(std::memory_order_acquire)) { - nextReadPosition = currentReadPosition; - popWasSuccessful = false; + return false; + // We don't need to check if read has changed, as it is enough to know that the empty state + // was valid in the past. The same race can also happen after the while loop and before the + // return operation } - else - { - // we use memcpy here, since the copy assignment is not thread safe in general (we might have an overflow in - // the push thread and invalidates the object while the copy is running and therefore works on an - // invalid object); memcpy is also not thread safe, but we discard the object anyway and read it - // again if its overwritten in between; this is only relevant for types larger than pointer size - // assign the user data - std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType)); - - /// @brief first we need to peak valueOut if it is fitting the condition and then we have to verify - /// if valueOut is not am invalid object, this could be the case if the read position has - /// changed - if (m_readPosition.load(std::memory_order_relaxed) == currentReadPosition && !verificator(valueOut)) - { - popWasSuccessful = false; - nextReadPosition = currentReadPosition; - } - else - { - nextReadPosition = currentReadPosition + 1U; - popWasSuccessful = true; - } - } - - // compare and swap - // if(m_readPosition == currentReadPosition) - // m_readPosition = l_next_aba_read_pos - // else - // currentReadPosition = m_readPosition - // Assign m_aba_read_p to next readable location + // we use memcpy here, to ensure that there is no logic in copying the data + std::memcpy(&valueOut, &m_data[currentReadPos % m_size], sizeof(ValueType)); + + // We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted + // Memory order relaxed is enough as: + // - synchronization is not needed with m_readPosition + // - there is no operation reordering possible + // ============================================= + // ABA problem: m_readPosition is an uint64_t. Assuming a thread is pushing at a rate of 1 GHz + // while this thread is blocked, we would still need more than 500 years to overflow + // m_readPosition and encounter the ABA problem } while (!m_readPosition.compare_exchange_weak( - currentReadPosition, nextReadPosition, std::memory_order_acq_rel, std::memory_order_acquire)); + currentReadPos, currentReadPos + 1U, std::memory_order_relaxed, std::memory_order_relaxed)); - return popWasSuccessful; + return true; } template @@ -144,43 +151,87 @@ inline bool SpscSofi::push(const ValueType& valueIn, V { constexpr bool SOFI_OVERFLOW{false}; - uint64_t currentWritePosition = m_writePosition.load(std::memory_order_relaxed); - uint64_t nextWritePosition = currentWritePosition + 1U; - - m_data[currentWritePosition % m_size] = valueIn; - m_writePosition.store(nextWritePosition, std::memory_order_release); - - uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire); - - // check if there is a free position for the next push - if (nextWritePosition < currentReadPosition + m_size) + // SYNC POINT READ: m_data + // We need to synchronize data to avoid the following scenario: + // 1. A thread calls push() and updates data with a new value + // 2. Another thread calls push() and enters the overflow case. The data could be read before + // any synchronization + uint64_t currentWritePos = m_writePosition.load(std::memory_order_acquire); + uint64_t nextWritePos = currentWritePos + 1U; + + m_data[currentWritePos % m_size] = valueIn; + // SYNC POINT WRITE: m_data + // We need to make sure that writing the value happens before incrementing the + // m_writePosition otherwise the following scenario can happen: + // 1. m_writePosition is increased (but the value has not been written yet) + // 2. Another thread calls pop(): we check if the queue is empty => no (e.g. m_writePosition == 1 + // and m_readPosition == 0) + // 3. In pop(), a data race can occur + // With memory_order_release, this cannot happen as it is guaranteed that writing the data + // happens before incrementing m_writePosition + // ======================================= + // Note that the following situation can still happen (but is not a problem): + // 1. A value is written (m_writePosition hasn't been incremented yet) + // 2. Another thread calls pop(): we check if the queue is empty => yes (e.g. m_writePosition == + // m_readPosition == 0 ) + // 3. An element was already stored so we could have popped the element + m_writePosition.store(nextWritePos, std::memory_order_release); + + // Memory order relaxed is enough since: + // - synchronization is not needed with m_readPosition + // - operation reordering: + // - cannot move below if statement otherwise, the code won't compile + // - if it moves above, we might get an outdated read position that will be caught by the + // compare_exchange check + uint64_t currentReadPos = m_readPosition.load(std::memory_order_relaxed); + + // Check if queue is full: since we have an extra element (INTERNAL_CAPACITY_ADD_ON), we need to + // check if there is a free position for the *next* write position + if (nextWritePos < currentReadPos + m_size) { return !SOFI_OVERFLOW; } - // this is an overflow situation, which means that the next push has no free position, therefore the oldest value - // needs to be passed back to the caller - - uint64_t nextReadPosition = currentReadPosition + 1U; - - // we need to update the read position - // a) it works, then we need to pass the overflow value back - // b) it doesn't work, which means that the pop thread already took the value in the meantime an no further action - // is required - // memory order success is memory_order_acq_rel - // - this is to prevent the reordering of m_writePosition.store(...) after the increment of the m_readPosition - // - in case of an overflow, this might result in the pop thread getting one element less than the capacity of - // the SoFi if the push thread is suspended in between this two statements - // - it's still possible to get more elements than the capacity, but this is an inherent issue with concurrent - // queues and cannot be prevented since there can always be a push during a pop operation - // - another issue might be that two consecutive pushes (not concurrent) happen on different CPU cores without - // synchronization, then the memory also needs to be synchronized for the overflow case - // memory order failure is memory_order_relaxed since there is no further synchronization needed if there is no - // overflow + // This is an overflow situation so we will need to read the overwritten value + // however, it could be that pop() was called in the meantime, i.e. m_readPosition was increased. + // Memory order relaxed is enough for both success and failure cases since: + // - synchronization is not needed with m_readPosition + // - operation reordering cannot happen + // ====================================== + // ABA problem: m_readPosition is an uint64_t. Assuming a thread is popping at a rate of 1 GHz while + // this thread is blocked, we would still need more than 500 years to overflow m_readPosition and + // encounter the ABA problem if (m_readPosition.compare_exchange_strong( - currentReadPosition, nextReadPosition, std::memory_order_acq_rel, std::memory_order_relaxed)) + currentReadPos, currentReadPos + 1U, std::memory_order_relaxed, std::memory_order_relaxed)) { - std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType)); + // Since INTERNAL_SOFI_CAPACITY = CapacityValue + 1, it can happen that we return more + // elements than the CapacityValue by calling push and pop concurrently (in case of an + // overflow). This is an inherent behavior with concurrent queues. Scenario example + // (CapacityValue = 2): + // 0. Initial situation (before the call to push) + // |--A--|--B--|----| + // ^ ^ + // r=0 w=2 + // 1. Thread 1, pushes a new value and increases m_readPosition (overflow situation) + // |--A--|--B--|--C--| + // ^ ^ + // w=3, r=1 + // 2. Now, thread 1 is interrupted and another thread pops as many elements as possible + // 3. pop() -> returns B (First value returned by pop) + // |--A--|-(B)-|--C--| + // ^ ^ + // w=3 r=2 + // 4. pop() -> returns C (Second value returned by pop) + // |--A--|-(B)-|-(C)-| + // ^ + // w=3, r=3 + // 5. pop() -> nothing to return + // 6. Finally, thread 1 resumes and returns A (Third value [additional value] returned by + // push) + // |-(A)-|-(B)-|-(C)-| + // ^ + // w=3, r=3 + std::memcpy(&valueOut, &m_data[currentReadPos % m_size], sizeof(ValueType)); return SOFI_OVERFLOW; } diff --git a/iceoryx_hoofs/test/moduletests/test_concurrent_spsc_sofi.cpp b/iceoryx_hoofs/test/moduletests/test_concurrent_spsc_sofi.cpp index dd48bf5a6c..2d724fb00c 100644 --- a/iceoryx_hoofs/test/moduletests/test_concurrent_spsc_sofi.cpp +++ b/iceoryx_hoofs/test/moduletests/test_concurrent_spsc_sofi.cpp @@ -388,7 +388,7 @@ TEST_F(SpscSofiTest, ResizeAndSizeFillUp) } } -TEST_F(SpscSofiTest, PopIfWithValidCondition) +TEST_F(SpscSofiTest, Pop) { ::testing::Test::RecordProperty("TEST_ID", "f149035c-21cc-4f7d-ba4d-564a645e933b"); sofi.push(10, returnVal); @@ -396,33 +396,22 @@ TEST_F(SpscSofiTest, PopIfWithValidCondition) sofi.push(12, returnVal); int output{-1}; - bool result = sofi.popIf(output, [](const int& peek) { return peek < 20; }); + bool result = sofi.pop(output); EXPECT_EQ(result, true); EXPECT_EQ(output, 10); } -TEST_F(SpscSofiTest, PopIfWithInvalidCondition) -{ - ::testing::Test::RecordProperty("TEST_ID", "1a494c28-928f-48f4-8b01-e68dfbd7563e"); - sofi.push(15, returnVal); - sofi.push(16, returnVal); - sofi.push(17, returnVal); - - bool result = sofi.popIf(returnVal, [](const int& peek) { return peek < 5; }); - - EXPECT_EQ(result, false); -} -TEST_F(SpscSofiTest, PopIfOnEmpty) +TEST_F(SpscSofiTest, PopOnEmpty) { ::testing::Test::RecordProperty("TEST_ID", "960ad78f-cb9b-4c34-a077-6adb343a841c"); - bool result = sofi.popIf(returnVal, [](const int& peek) { return peek < 7; }); + bool result = sofi.pop(returnVal); EXPECT_EQ(result, false); } -TEST_F(SpscSofiTest, PopIfFullWithValidCondition) +TEST_F(SpscSofiTest, PopFull) { ::testing::Test::RecordProperty("TEST_ID", "167f2f01-f926-4442-bc4f-ff5e7cfe9fe0"); constexpr int INITIAL_VALUE = 100; @@ -432,42 +421,21 @@ TEST_F(SpscSofiTest, PopIfFullWithValidCondition) sofi.push(i + INITIAL_VALUE, returnVal); } - bool result = sofi.popIf(returnVal, [](const int& peek) { return peek < 150; }); + bool result = sofi.pop(returnVal); EXPECT_EQ(result, true); EXPECT_EQ(returnVal, INITIAL_VALUE + OFFSET); } -TEST_F(SpscSofiTest, PopIfFullWithInvalidCondition) -{ - ::testing::Test::RecordProperty("TEST_ID", "672881b9-eebd-471d-9d62-e792a8b8013f"); - for (int i = 0; i < static_cast(sofi.capacity()) + 2; i++) - { - sofi.push(i + 100, returnVal); - } - - bool result = sofi.popIf(returnVal, [](const int& peek) { return peek < 50; }); - - EXPECT_EQ(result, false); -} - -TEST_F(SpscSofiTest, PopIfValidEmptyAfter) +TEST_F(SpscSofiTest, PopEmptyAfter) { ::testing::Test::RecordProperty("TEST_ID", "19444dcd-7746-4e6b-a3b3-398c9d62317d"); + sofi.push(2, returnVal); - sofi.popIf(returnVal, [](const int& peek) { return peek < 50; }); + sofi.pop(returnVal); EXPECT_EQ(sofi.empty(), true); } -TEST_F(SpscSofiTest, PopIfInvalidNotEmptyAfter) -{ - ::testing::Test::RecordProperty("TEST_ID", "cadd7f02-6fe5-49a5-bd5d-837f5fcb2a71"); - sofi.push(200, returnVal); - - sofi.popIf(returnVal, [](const int& peek) { return peek < 50; }); - - EXPECT_EQ(sofi.empty(), false); -} } // namespace From 1dc20ab7d49a88164b38202eb19fb2dc69387b45 Mon Sep 17 00:00:00 2001 From: albtam Date: Thu, 21 Mar 2024 00:53:43 +0100 Subject: [PATCH 2/8] iox-#2177 Address reviewer's comments --- .../buffer/include/iox/detail/spsc_sofi.hpp | 54 ++++++++----------- .../buffer/include/iox/detail/spsc_sofi.inl | 12 ++--- 2 files changed, 28 insertions(+), 38 deletions(-) diff --git a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp index 2e06a8e35e..507a251daa 100644 --- a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp +++ b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp @@ -38,17 +38,8 @@ namespace concurrent /// a full SpscSoFi. /// SpscSoFi is especially designed to provide fixed capacity storage. /// SpscSoFi only allocates memory when created, capacity can be adjusted explicitly. -/// @example /// It's an expected behavior that when push/pop are called concurrently and SpscSoFi is full, as /// many elements as specified with 'CapacityValue' can be removed -/// 0: Initial situation: -/// |--A--|--B--| -/// 1. Thread 1 pushes a new element. Since it is an overflowing situation, the overwritten value is -/// removed and returned to the caller -/// |--A--|--B--| -/// 2. Right before push() returns, pop() detects that an element is about to be removed, and remove -/// the next element -/// |--C--|----| /// @param[in] ValueType DataType to be stored, must be trivially copyable /// @param[in] CapacityValue Capacity of the SpscSofi template @@ -71,42 +62,43 @@ class SpscSofi // |--A--|--B--| // ^ // w=2, r=0 - // 2. We want to push a new element - // 3. Advance the read position (this effectively reduces the capacity and is the reason the internal capacity - // needs to be larger; the consumer cannot pop out CAPACITY amount of samples even though the queue is full if - // the push thread is suspended right after this operation) + // 2. The producer thread pushes a new element + // 3. Increment the read position (this effectively reduces the capacity and is the reason the internal capacity + // needs to be larger; // |--A--|--B--| // ^ ^ // w=2 r=1 - // 4. Take overflow data - // |-----|--B--| - // ^ ^ - // w=2 r=1 - // 5. Write new data - // |--C--|--B--| - // ^ ^ - // w=2 r=1 - // 6. Advance next write position - // |--C--|--B--| - // ^ - // w=3, r=1 + // 4. The producer thread is suspended, the consumer thread pops a value + // |--A--|-----| + // ^ + // w=2, r=2 + // 5. The consumer tries to pop another value but the queue looks empty as + // write position == read position: the consumer cannot pop + // out CAPACITY amount of samples even though the queue was full // ======================================================================== // With "capacity add-on" // 1. CapacityValue = 2, InternalCapacity = 3 // |--A--|--B--|----| // ^ ^ // r=0 w=2 - // 2. We want to push a new element - // 3. We first write at index 2 % capacity + // 2. The producer threads pushes a new element + // 3. First write the element at index 2 % capacity and increment the write index // |--A--|--B--|--C--| // ^ // w=3, r=0, - // 2. We want to push a new element: - // 4. We detect that the queue if full so we retrieve the value pointed by the read pointer: the value A is - // returned - // |-(A)-|--B--|--C--| + // 4. Then increment the read position + // |--A--|--B--|--C--| // ^ ^ // w=3 r=1 + // 5. The producer thread is suspended, the consumer thread pops a value + // |--A--|-----|--C--| + // ^ ^ + // w=3 r=2 + // 6. The consumer thread pops another value + // |--A--|-----|-----| + // ^ + // w=3, r=3 + // 7. Now, write position == read position so we cannot pop another element: the queue looks empty. We managed to pop CapacityValue elements // ======================================================================== static constexpr uint32_t INTERNAL_CAPACITY_ADDON = 1; diff --git a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl index 9e9a4a8dc7..ddfc4d94bd 100644 --- a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl +++ b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl @@ -115,7 +115,7 @@ inline bool SpscSofi::pop(ValueType& valueOut) noexcep // r=3 w=5 // 3. The consumer thread loads m_readPosition => 3. The pop method returns false // => Whereas the queue was full, pop returned false giving the impression that the queue if empty - // TODO: To which release/store statement does it correspond? + // TODO(@albtam): Explain yo which release/store statement it corresponds uint64_t currentReadPos = m_readPosition.load(std::memory_order_acquire); do @@ -151,12 +151,10 @@ inline bool SpscSofi::push(const ValueType& valueIn, V { constexpr bool SOFI_OVERFLOW{false}; - // SYNC POINT READ: m_data - // We need to synchronize data to avoid the following scenario: - // 1. A thread calls push() and updates data with a new value - // 2. Another thread calls push() and enters the overflow case. The data could be read before - // any synchronization - uint64_t currentWritePos = m_writePosition.load(std::memory_order_acquire); + // Memory order relaxed is enough since: + // - no synchronization needed as we are loading a value only modified in this method and this method cannot be accessed concurrently + // - the operation cannot move below without observable changes + uint64_t currentWritePos = m_writePosition.load(std::memory_order_relaxed); uint64_t nextWritePos = currentWritePos + 1U; m_data[currentWritePos % m_size] = valueIn; From f34606b607cc75f6e310029629a8a75523130abc Mon Sep 17 00:00:00 2001 From: albtam Date: Wed, 27 Mar 2024 23:50:45 +0100 Subject: [PATCH 3/8] iox-#2177 Address reviewer's comments --- .../buffer/include/iox/detail/spsc_sofi.hpp | 13 +-- .../buffer/include/iox/detail/spsc_sofi.inl | 98 +++++++++++-------- 2 files changed, 63 insertions(+), 48 deletions(-) diff --git a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp index 507a251daa..b201cfafd7 100644 --- a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp +++ b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp @@ -98,7 +98,8 @@ class SpscSofi // |--A--|-----|-----| // ^ // w=3, r=3 - // 7. Now, write position == read position so we cannot pop another element: the queue looks empty. We managed to pop CapacityValue elements + // 7. Now, write position == read position so we cannot pop another element: the queue looks empty. We managed to + // pop CapacityValue elements // ======================================================================== static constexpr uint32_t INTERNAL_CAPACITY_ADDON = 1; @@ -106,22 +107,22 @@ class SpscSofi static constexpr uint32_t INTERNAL_SPSC_SOFI_CAPACITY = CapacityValue + INTERNAL_CAPACITY_ADDON; public: - /// @brief default constructor which constructs an empty sofi + /// @brief default constructor which constructs an empty SpscSofi SpscSofi() noexcept = default; - /// @brief push an element into sofi. if sofi is full the oldest data will be + /// @brief push an element into SpscSofi. if SpscSofi is full the oldest data will be /// returned and the pushed element is stored in its place instead. /// @param[in] value_in value which should be stored - /// @param[out] value_out if sofi is overflowing the value of the overridden value + /// @param[out] value_out if SpscSofi is overflowing the value of the overridden value /// is stored here /// @note restricted thread safe: can only be called from one thread. The authorization to push into the /// SpscSofi can be transferred to another thread if appropriate synchronization mechanisms are used. /// @return return true if push was successful else false. /// @code - /// 1. sofi is empty |-----|-----| + /// 1. SpscSofi is empty |-----|-----| /// 2. push an element |--A--|-----| /// 3. push an element |--A--|--B--| - /// 5. sofi is full + /// 5. SpscSofi is full /// 6. push an element |--C--|--B--| -> value_out is set to 'A' bool push(const ValueType& valueIn, ValueType& valueOut) noexcept; diff --git a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl index ddfc4d94bd..29015fa076 100644 --- a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl +++ b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl @@ -92,8 +92,8 @@ inline bool SpscSofi::setCapacity(const uint64_t newSi template inline bool SpscSofi::empty() const noexcept { - auto [readPost, writePos] = getReadWritePositions(); - return readPost == writePos; + auto [readPosition, writePosition] = getReadWritePositions(); + return readPosition == writePosition; } template @@ -115,35 +115,40 @@ inline bool SpscSofi::pop(ValueType& valueOut) noexcep // r=3 w=5 // 3. The consumer thread loads m_readPosition => 3. The pop method returns false // => Whereas the queue was full, pop returned false giving the impression that the queue if empty - // TODO(@albtam): Explain yo which release/store statement it corresponds - uint64_t currentReadPos = m_readPosition.load(std::memory_order_acquire); + uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire); + uint64_t nextReadPosition{0}; + + bool popWasSuccessful{true}; do { // SYNC POINT READ: m_data // See explanation of the corresponding synchronization point in push() - if (currentReadPos == m_writePosition.load(std::memory_order_acquire)) + if (currentReadPosition == m_writePosition.load(std::memory_order_acquire)) { - return false; - // We don't need to check if read has changed, as it is enough to know that the empty state - // was valid in the past. The same race can also happen after the while loop and before the - // return operation + nextReadPosition = currentReadPosition; + popWasSuccessful = false; + } + else + { + // we use memcpy here, to ensure that there is no logic in copying the data + std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType)); + nextReadPosition = currentReadPosition + 1U; + popWasSuccessful = true; } - // we use memcpy here, to ensure that there is no logic in copying the data - std::memcpy(&valueOut, &m_data[currentReadPos % m_size], sizeof(ValueType)); // We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted - // Memory order relaxed is enough as: - // - synchronization is not needed with m_readPosition - // - there is no operation reordering possible + // While memory synchronization is not needed for m_readPosition we need to have a + // corresponding m_readPosition.store(release) to the m_readPosition.load(acquire) in the + // push method // ============================================= // ABA problem: m_readPosition is an uint64_t. Assuming a thread is pushing at a rate of 1 GHz // while this thread is blocked, we would still need more than 500 years to overflow // m_readPosition and encounter the ABA problem } while (!m_readPosition.compare_exchange_weak( - currentReadPos, currentReadPos + 1U, std::memory_order_relaxed, std::memory_order_relaxed)); + currentReadPosition, nextReadPosition, std::memory_order_acq_rel, std::memory_order_acquire)); - return true; + return popWasSuccessful; } template @@ -152,55 +157,64 @@ inline bool SpscSofi::push(const ValueType& valueIn, V constexpr bool SOFI_OVERFLOW{false}; // Memory order relaxed is enough since: - // - no synchronization needed as we are loading a value only modified in this method and this method cannot be accessed concurrently + // - no synchronization needed as we are loading a value only modified in this method and this method cannot be + // accessed concurrently // - the operation cannot move below without observable changes - uint64_t currentWritePos = m_writePosition.load(std::memory_order_relaxed); - uint64_t nextWritePos = currentWritePos + 1U; + uint64_t currentWritePosition = m_writePosition.load(std::memory_order_relaxed); + uint64_t nextWritePosition = currentWritePosition + 1U; - m_data[currentWritePos % m_size] = valueIn; + m_data[currentWritePosition % m_size] = valueIn; // SYNC POINT WRITE: m_data // We need to make sure that writing the value happens before incrementing the // m_writePosition otherwise the following scenario can happen: // 1. m_writePosition is increased (but the value has not been written yet) - // 2. Another thread calls pop(): we check if the queue is empty => no (e.g. m_writePosition == 1 - // and m_readPosition == 0) - // 3. In pop(), a data race can occur + // 2. The consumer thread calls pop(): we check if the queue is empty => no + // 3. In pop(), when we read a value a data race can occur // With memory_order_release, this cannot happen as it is guaranteed that writing the data // happens before incrementing m_writePosition // ======================================= - // Note that the following situation can still happen (but is not a problem): - // 1. A value is written (m_writePosition hasn't been incremented yet) - // 2. Another thread calls pop(): we check if the queue is empty => yes (e.g. m_writePosition == - // m_readPosition == 0 ) - // 3. An element was already stored so we could have popped the element - m_writePosition.store(nextWritePos, std::memory_order_release); + // Note that the following situation can still happen (but, although it is an inherent race with + // concurrent algorithms, it is not a data race and therefore not a problem): + // 1. There is an empty queue + // 2. A push operation is in progress, the value has been written but 'm_writePosition' was not + // yet advanced + // 3. The consumer thread performs a pop operation and the check for an empty queue is true + // resulting in a failed pop + // 4. The push operation is finished by advancing m_writePos and synchronizing the memory + // 5. The consumer thread missed the chance to pop the element in the blink of an eye + m_writePosition.store(nextWritePosition, std::memory_order_release); - // Memory order relaxed is enough since: - // - synchronization is not needed with m_readPosition - // - operation reordering: - // - cannot move below if statement otherwise, the code won't compile - // - if it moves above, we might get an outdated read position that will be caught by the - // compare_exchange check - uint64_t currentReadPos = m_readPosition.load(std::memory_order_relaxed); + // While memory synchronization is not needed with m_readPosition, we need + // memory_order_acquire to avoid the reordering of the operation + uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire); // Check if queue is full: since we have an extra element (INTERNAL_CAPACITY_ADD_ON), we need to // check if there is a free position for the *next* write position - if (nextWritePos < currentReadPos + m_size) + if (nextWritePosition < currentReadPosition + m_size) { return !SOFI_OVERFLOW; } // This is an overflow situation so we will need to read the overwritten value // however, it could be that pop() was called in the meantime, i.e. m_readPosition was increased. - // Memory order relaxed is enough for both success and failure cases since: - // - synchronization is not needed with m_readPosition - // - operation reordering cannot happen + // Memory order success needs to be memory_order_acq_rel to prevent the reordering of + // m_writePosition.store(...) after the increment of the m_readPosition. Otherwise, in case of + // an overflow, this might result in the pop thread getting one element less than the capacity + // of the SoFi if the push thread is suspended in between this two statements. + // It's still possible to get more elements than the capacity, but this is an inherent issue + // with concurrent queues and cannot be prevented since there can always be a push during a pop + // operation. + // Another issue might be that two consecutive pushes (not concurrent) happen on different CPU + // cores without synchronization, then the memory also needs to be synchronized for the overflow + // case. + // Memory order failure is memory_order_relaxed since there is no further synchronization //// + // needed if there is no overflow // ====================================== // ABA problem: m_readPosition is an uint64_t. Assuming a thread is popping at a rate of 1 GHz while // this thread is blocked, we would still need more than 500 years to overflow m_readPosition and // encounter the ABA problem if (m_readPosition.compare_exchange_strong( - currentReadPos, currentReadPos + 1U, std::memory_order_relaxed, std::memory_order_relaxed)) + currentReadPosition, currentReadPosition + 1U, std::memory_order_acq_rel, std::memory_order_relaxed)) { // Since INTERNAL_SOFI_CAPACITY = CapacityValue + 1, it can happen that we return more // elements than the CapacityValue by calling push and pop concurrently (in case of an @@ -229,7 +243,7 @@ inline bool SpscSofi::push(const ValueType& valueIn, V // |-(A)-|-(B)-|-(C)-| // ^ // w=3, r=3 - std::memcpy(&valueOut, &m_data[currentReadPos % m_size], sizeof(ValueType)); + std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType)); return SOFI_OVERFLOW; } From 9cdee173c23f56b55fce8200b77052e81dd36c71 Mon Sep 17 00:00:00 2001 From: albtam Date: Wed, 10 Apr 2024 21:37:21 +0200 Subject: [PATCH 4/8] iox-#2177 Address reviewer's comments --- .../buffer/include/iox/detail/spsc_sofi.hpp | 12 +++--- .../buffer/include/iox/detail/spsc_sofi.inl | 37 ++++++++++--------- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp index b201cfafd7..e104c44046 100644 --- a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp +++ b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp @@ -31,13 +31,12 @@ namespace iox { namespace concurrent { -/// @brief Thread safe (without locks) single producer and single consumer queue with a safe +/// @brief Thread safe lock-free single producer and single consumer queue with a safe /// overflowing behavior /// @note When SpscSoFi is full and a sender tries to push, the data at the current read pos will be /// returned. This behavior mimics a FiFo queue but prevents resource leaks when pushing into /// a full SpscSoFi. /// SpscSoFi is especially designed to provide fixed capacity storage. -/// SpscSoFi only allocates memory when created, capacity can be adjusted explicitly. /// It's an expected behavior that when push/pop are called concurrently and SpscSoFi is full, as /// many elements as specified with 'CapacityValue' can be removed /// @param[in] ValueType DataType to be stored, must be trivially copyable @@ -45,9 +44,8 @@ namespace concurrent template class SpscSofi { - // We need to make sure that the copy operation doesn't have any logic static_assert(std::is_trivially_copyable::value, - "SpscSofi can handle only trivially copyable data types"); + "SpscSofi can only handle trivially copyable data types since 'memcpy' is used internally"); /// @brief Check if Atomic integer is lockfree on platform /// ATOMIC_INT_LOCK_FREE = 2 - is always lockfree /// ATOMIC_INT_LOCK_FREE = 1 - is sometimes lockfree @@ -86,8 +84,8 @@ class SpscSofi // |--A--|--B--|--C--| // ^ // w=3, r=0, - // 4. Then increment the read position - // |--A--|--B--|--C--| + // 4. Then increment the read position and return the overflowing 'A' + // |-----|--B--|--C--| // ^ ^ // w=3 r=1 // 5. The producer thread is suspended, the consumer thread pops a value @@ -118,7 +116,7 @@ class SpscSofi /// @note restricted thread safe: can only be called from one thread. The authorization to push into the /// SpscSofi can be transferred to another thread if appropriate synchronization mechanisms are used. /// @return return true if push was successful else false. - /// @code + /// @remarks /// 1. SpscSofi is empty |-----|-----| /// 2. push an element |--A--|-----| /// 3. push an element |--A--|--B--| diff --git a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl index 29015fa076..99d31f7087 100644 --- a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl +++ b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl @@ -101,20 +101,6 @@ inline bool SpscSofi::pop(ValueType& valueOut) noexcep { // Memory synchronization is not needed but we need to prevent operation reordering to avoid the following scenario // where the CPU reorder the load of m_readPosition and m_writePosition: - // 0. Initial situation (the queue is full) - // |----|--B--|--C--| - // ^ ^ - // w=3 r=1 - // 1. The consumer thread loads m_writePosition => 3 - // |----|--B--|--C--| - // ^ ^ - // w=3 r=1 - // 2. The producer thread pushes two times - // |--D--|--E--|-----| - // ^ ^ - // r=3 w=5 - // 3. The consumer thread loads m_readPosition => 3. The pop method returns false - // => Whereas the queue was full, pop returned false giving the impression that the queue if empty uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire); uint64_t nextReadPosition{0}; @@ -128,6 +114,21 @@ inline bool SpscSofi::pop(ValueType& valueOut) noexcep { nextReadPosition = currentReadPosition; popWasSuccessful = false; + // We cannot just return false (i.e. we need to continue the loop) to avoid the following situation: + // 0. Initial situation (the queue is full) + // |----|--B--|--C--| + // ^ ^ + // w=3 r=1 + // 1. The consumer thread loads m_writePosition => 3 + // |----|--B--|--C--| + // ^ ^ + // w=3 r=1 + // 2. The producer thread pushes two times + // |--D--|--E--|-----| + // ^ ^ + // r=3 w=5 + // 3. The consumer thread loads m_readPosition => 3 The pop method returns false + // => Whereas the queue was full, pop returned false giving the impression that the queue if empty } else { @@ -138,9 +139,9 @@ inline bool SpscSofi::pop(ValueType& valueOut) noexcep } // We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted - // While memory synchronization is not needed for m_readPosition we need to have a - // corresponding m_readPosition.store(release) to the m_readPosition.load(acquire) in the - // push method + // While memory synchronization is not needed for m_readPosition, we need to ensure that the + // 'memcpy' happens before the CAS operation. + // Corresponding m_readPosition.load(acquire) is in the push method // ============================================= // ABA problem: m_readPosition is an uint64_t. Assuming a thread is pushing at a rate of 1 GHz // while this thread is blocked, we would still need more than 500 years to overflow @@ -185,7 +186,7 @@ inline bool SpscSofi::push(const ValueType& valueIn, V m_writePosition.store(nextWritePosition, std::memory_order_release); // While memory synchronization is not needed with m_readPosition, we need - // memory_order_acquire to avoid the reordering of the operation + // memory_order_acquire to avoid the reordering of the operation. uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire); // Check if queue is full: since we have an extra element (INTERNAL_CAPACITY_ADD_ON), we need to From 24f2f9782a62df30eb807f6befcce4db6906d0b8 Mon Sep 17 00:00:00 2001 From: albtam Date: Fri, 19 Apr 2024 08:05:07 +0200 Subject: [PATCH 5/8] iox-#2177 Add early return to pop method --- .../buffer/include/iox/detail/spsc_sofi.inl | 50 +++++++++---------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl index 99d31f7087..79383e04be 100644 --- a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl +++ b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl @@ -99,13 +99,26 @@ inline bool SpscSofi::empty() const noexcept template inline bool SpscSofi::pop(ValueType& valueOut) noexcept { - // Memory synchronization is not needed but we need to prevent operation reordering to avoid the following scenario - // where the CPU reorder the load of m_readPosition and m_writePosition: + // We need 'm_readPosition.load(std::memory_order_acquire)' to happen before + // 'm_writePosition.load(std::memory_order_acquire)' to avoid the following scenario where the + // CPU reorders the load of m_readPosition and m_writePosition: + // 0. Initial situation (the queue is full) + // |----|--B--|--C--| + // ^ ^ + // w=3 r=1 + // 1. The consumer thread loads m_writePosition => 3 + // |----|--B--|--C--| + // ^ ^ + // w=3 r=1 + // 2. The producer thread pushes two times + // |--D--|--E--|--C--| + // ^ ^ + // r=3 w=5 + // 3. The consumer thread loads m_readPosition => 3. The pop method returns false + // => Whereas the queue was full, pop returned false giving the impression that the queue if empty uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire); uint64_t nextReadPosition{0}; - bool popWasSuccessful{true}; - do { // SYNC POINT READ: m_data @@ -113,29 +126,15 @@ inline bool SpscSofi::pop(ValueType& valueOut) noexcep if (currentReadPosition == m_writePosition.load(std::memory_order_acquire)) { nextReadPosition = currentReadPosition; - popWasSuccessful = false; - // We cannot just return false (i.e. we need to continue the loop) to avoid the following situation: - // 0. Initial situation (the queue is full) - // |----|--B--|--C--| - // ^ ^ - // w=3 r=1 - // 1. The consumer thread loads m_writePosition => 3 - // |----|--B--|--C--| - // ^ ^ - // w=3 r=1 - // 2. The producer thread pushes two times - // |--D--|--E--|-----| - // ^ ^ - // r=3 w=5 - // 3. The consumer thread loads m_readPosition => 3 The pop method returns false - // => Whereas the queue was full, pop returned false giving the impression that the queue if empty + return false; + // We don't need to check if read has changed, as it is enough to know that the empty + // state was valid in the past. The same race can also happen after the while loop and + // before the return operation } else { // we use memcpy here, to ensure that there is no logic in copying the data std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType)); - nextReadPosition = currentReadPosition + 1U; - popWasSuccessful = true; } // We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted @@ -147,9 +146,9 @@ inline bool SpscSofi::pop(ValueType& valueOut) noexcep // while this thread is blocked, we would still need more than 500 years to overflow // m_readPosition and encounter the ABA problem } while (!m_readPosition.compare_exchange_weak( - currentReadPosition, nextReadPosition, std::memory_order_acq_rel, std::memory_order_acquire)); + currentReadPosition, currentReadPosition + 1U, std::memory_order_acq_rel, std::memory_order_acquire)); - return popWasSuccessful; + return true; } template @@ -185,8 +184,7 @@ inline bool SpscSofi::push(const ValueType& valueIn, V // 5. The consumer thread missed the chance to pop the element in the blink of an eye m_writePosition.store(nextWritePosition, std::memory_order_release); - // While memory synchronization is not needed with m_readPosition, we need - // memory_order_acquire to avoid the reordering of the operation. + // We need to establish an happens-before relationship with 'm_writePosition.load(std::memory_order_relaxed)' uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire); // Check if queue is full: since we have an extra element (INTERNAL_CAPACITY_ADD_ON), we need to From 421a0ee3113cd3654a8368eef25b4dae9ddb55e3 Mon Sep 17 00:00:00 2001 From: albtam Date: Tue, 23 Apr 2024 21:19:22 +0200 Subject: [PATCH 6/8] iox-#2177 Address reviewer's comments --- .../buffer/include/iox/detail/spsc_sofi.inl | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl index 79383e04be..e6f1c5464f 100644 --- a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl +++ b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl @@ -117,7 +117,6 @@ inline bool SpscSofi::pop(ValueType& valueOut) noexcep // 3. The consumer thread loads m_readPosition => 3. The pop method returns false // => Whereas the queue was full, pop returned false giving the impression that the queue if empty uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire); - uint64_t nextReadPosition{0}; do { @@ -125,22 +124,21 @@ inline bool SpscSofi::pop(ValueType& valueOut) noexcep // See explanation of the corresponding synchronization point in push() if (currentReadPosition == m_writePosition.load(std::memory_order_acquire)) { - nextReadPosition = currentReadPosition; return false; // We don't need to check if read has changed, as it is enough to know that the empty // state was valid in the past. The same race can also happen after the while loop and // before the return operation } - else - { - // we use memcpy here, to ensure that there is no logic in copying the data - std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType)); - } + + // we use memcpy here, to ensure that there is no logic in copying the data + std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType)); + // We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted + // ============================================= // While memory synchronization is not needed for m_readPosition, we need to ensure that the - // 'memcpy' happens before the CAS operation. - // Corresponding m_readPosition.load(acquire) is in the push method + // 'memcpy' happens before updating m_readPosition. + // Corresponding m_readPosition load/acquire is in the push method // ============================================= // ABA problem: m_readPosition is an uint64_t. Assuming a thread is pushing at a rate of 1 GHz // while this thread is blocked, we would still need more than 500 years to overflow @@ -184,8 +182,10 @@ inline bool SpscSofi::push(const ValueType& valueIn, V // 5. The consumer thread missed the chance to pop the element in the blink of an eye m_writePosition.store(nextWritePosition, std::memory_order_release); - // We need to establish an happens-before relationship with 'm_writePosition.load(std::memory_order_relaxed)' - uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire); + // Memory order relaxed is enough since: + // - no synchronization needed when loading + // - the operation cannot move below without observable changes + uint64_t currentReadPosition = m_readPosition.load(std::memory_order_relaxed); // Check if queue is full: since we have an extra element (INTERNAL_CAPACITY_ADD_ON), we need to // check if there is a free position for the *next* write position @@ -206,14 +206,14 @@ inline bool SpscSofi::push(const ValueType& valueIn, V // Another issue might be that two consecutive pushes (not concurrent) happen on different CPU // cores without synchronization, then the memory also needs to be synchronized for the overflow // case. - // Memory order failure is memory_order_relaxed since there is no further synchronization //// + // Memory order failure is memory_order_relaxed since there is no further synchronization // needed if there is no overflow // ====================================== // ABA problem: m_readPosition is an uint64_t. Assuming a thread is popping at a rate of 1 GHz while // this thread is blocked, we would still need more than 500 years to overflow m_readPosition and // encounter the ABA problem if (m_readPosition.compare_exchange_strong( - currentReadPosition, currentReadPosition + 1U, std::memory_order_acq_rel, std::memory_order_relaxed)) + currentReadPosition, currentReadPosition + 1U, std::memory_order_acq_rel, std::memory_order_acquire)) { // Since INTERNAL_SOFI_CAPACITY = CapacityValue + 1, it can happen that we return more // elements than the CapacityValue by calling push and pop concurrently (in case of an From 081d5f2c3b6b06594aa90200e900f722d04b0c84 Mon Sep 17 00:00:00 2001 From: albtam Date: Wed, 1 May 2024 18:01:48 +0200 Subject: [PATCH 7/8] iox-#2177 Reverse early return and relaxed memory order --- .../buffer/include/iox/detail/spsc_sofi.inl | 83 ++++++++++--------- 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl index e6f1c5464f..3d935d2362 100644 --- a/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl +++ b/iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl @@ -99,24 +99,13 @@ inline bool SpscSofi::empty() const noexcept template inline bool SpscSofi::pop(ValueType& valueOut) noexcept { - // We need 'm_readPosition.load(std::memory_order_acquire)' to happen before - // 'm_writePosition.load(std::memory_order_acquire)' to avoid the following scenario where the - // CPU reorders the load of m_readPosition and m_writePosition: - // 0. Initial situation (the queue is full) - // |----|--B--|--C--| - // ^ ^ - // w=3 r=1 - // 1. The consumer thread loads m_writePosition => 3 - // |----|--B--|--C--| - // ^ ^ - // w=3 r=1 - // 2. The producer thread pushes two times - // |--D--|--E--|--C--| - // ^ ^ - // r=3 w=5 - // 3. The consumer thread loads m_readPosition => 3. The pop method returns false - // => Whereas the queue was full, pop returned false giving the impression that the queue if empty - uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire); + uint64_t nextReadPosition{0}; + bool popWasSuccessful{true}; + // Memory order relaxed is enough since: + // - no synchronization needed for m_readPosition + // - if m_writePosition is loaded before m_readPosition and m_readPosition changed, it will be detected by the + // compare_exchange loop + uint64_t currentReadPosition = m_readPosition.load(std::memory_order_relaxed); do { @@ -124,29 +113,45 @@ inline bool SpscSofi::pop(ValueType& valueOut) noexcep // See explanation of the corresponding synchronization point in push() if (currentReadPosition == m_writePosition.load(std::memory_order_acquire)) { - return false; - // We don't need to check if read has changed, as it is enough to know that the empty - // state was valid in the past. The same race can also happen after the while loop and - // before the return operation + nextReadPosition = currentReadPosition; + popWasSuccessful = false; + // We cannot just return false (i.e. we need to continue the loop) to avoid the following situation: + // 0. Initial situation (the queue is full) + // |----|--B--|--C--| + // ^ ^ + // w=3 r=1 + // 1. The consumer thread loads m_writePosition => 3 + // |----|--B--|--C--| + // ^ ^ + // w=3 r=1 + // 2. The producer thread pushes two times + // |--D--|--E--|-----| + // ^ ^ + // r=3 w=5 + // 3. The consumer thread loads m_readPosition => 3 The pop method returns false + // => Whereas the queue was full, pop returned false giving the impression that the queue if empty } + else + { + // we use memcpy here, to ensure that there is no logic in copying the data + std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType)); + nextReadPosition = currentReadPosition + 1U; + popWasSuccessful = true; - // we use memcpy here, to ensure that there is no logic in copying the data - std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType)); - - - // We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted - // ============================================= - // While memory synchronization is not needed for m_readPosition, we need to ensure that the - // 'memcpy' happens before updating m_readPosition. - // Corresponding m_readPosition load/acquire is in the push method - // ============================================= - // ABA problem: m_readPosition is an uint64_t. Assuming a thread is pushing at a rate of 1 GHz - // while this thread is blocked, we would still need more than 500 years to overflow - // m_readPosition and encounter the ABA problem + // We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted + // ============================================= + // While memory synchronization is not needed for m_readPosition, we need to ensure that the + // 'memcpy' happens before updating m_readPosition. + // Corresponding m_readPosition load/acquire is in the CAS loop of push method + // ============================================= + // ABA problem: m_readPosition is an uint64_t. Assuming a thread is pushing at a rate of 1 GHz + // while this thread is blocked, we would still need more than 500 years to overflow + // m_readPosition and encounter the ABA problem + } } while (!m_readPosition.compare_exchange_weak( - currentReadPosition, currentReadPosition + 1U, std::memory_order_acq_rel, std::memory_order_acquire)); + currentReadPosition, nextReadPosition, std::memory_order_acq_rel, std::memory_order_acquire)); - return true; + return popWasSuccessful; } template @@ -206,8 +211,8 @@ inline bool SpscSofi::push(const ValueType& valueIn, V // Another issue might be that two consecutive pushes (not concurrent) happen on different CPU // cores without synchronization, then the memory also needs to be synchronized for the overflow // case. - // Memory order failure is memory_order_relaxed since there is no further synchronization - // needed if there is no overflow + // Memory order failure needs to be memory_order_acquire to match the corresponding m_readPosition store/release in + // the CAS loop of the pop method // ====================================== // ABA problem: m_readPosition is an uint64_t. Assuming a thread is popping at a rate of 1 GHz while // this thread is blocked, we would still need more than 500 years to overflow m_readPosition and From 8e0af98014d8c125eb304f120ffdbd21a0359e64 Mon Sep 17 00:00:00 2001 From: albtam Date: Wed, 28 Aug 2024 23:34:37 +0200 Subject: [PATCH 8/8] iox-#2177 Update changelog --- doc/website/release-notes/iceoryx-unreleased.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/website/release-notes/iceoryx-unreleased.md b/doc/website/release-notes/iceoryx-unreleased.md index c37f60d2bb..b82c0a936a 100644 --- a/doc/website/release-notes/iceoryx-unreleased.md +++ b/doc/website/release-notes/iceoryx-unreleased.md @@ -137,6 +137,8 @@ - Listener examples need to take all samples in the callback [#2251](https://github.com/eclipse-iceoryx/iceoryx/issues/2251) - 'iox::string' tests can exceed the translation unit compilation timeout [#2278](https://github.com/eclipse-iceoryx/iceoryx/issues/2278) - Building iceoryx with bazel on Windows is broken [#2320](https://github.com/eclipse-iceoryx/iceoryx/issues/2320) +- Fix wrong memory orders in SpscSoFi [#2177](https://github.com/eclipse-iceoryx/iceoryx/issues/2177) +- **Refactoring:**