Skip to content

Commit

Permalink
Address reviewer's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
albtam committed Apr 10, 2024
1 parent 8b3c167 commit f15ca92
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 25 deletions.
12 changes: 5 additions & 7 deletions iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,21 @@ namespace iox
{
namespace concurrent
{
/// @brief Thread safe (without locks) single producer and single consumer queue with a safe
/// @brief Thread safe lock-free single producer and single consumer queue with a safe
/// overflowing behavior
/// @note When SpscSoFi is full and a sender tries to push, the data at the current read pos will be
/// returned. This behavior mimics a FiFo queue but prevents resource leaks when pushing into
/// a full SpscSoFi.
/// SpscSoFi is especially designed to provide fixed capacity storage.
/// SpscSoFi only allocates memory when created, capacity can be adjusted explicitly.
/// It's an expected behavior that when push/pop are called concurrently and SpscSoFi is full, as
/// many elements as specified with 'CapacityValue' can be removed
/// @param[in] ValueType DataType to be stored, must be trivially copyable
/// @param[in] CapacityValue Capacity of the SpscSofi
template <class ValueType, uint64_t CapacityValue>
class SpscSofi
{
// We need to make sure that the copy operation doesn't have any logic
static_assert(std::is_trivially_copyable<ValueType>::value,
"SpscSofi can handle only trivially copyable data types");
"SpscSofi can only handle trivially copyable data types since 'memcpy' is used internally");
/// @brief Check if Atomic integer is lockfree on platform
/// ATOMIC_INT_LOCK_FREE = 2 - is always lockfree
/// ATOMIC_INT_LOCK_FREE = 1 - is sometimes lockfree
Expand Down Expand Up @@ -86,8 +84,8 @@ class SpscSofi
// |--A--|--B--|--C--|
// ^
// w=3, r=0,
// 4. Then increment the read position
// |--A--|--B--|--C--|
// 4. Then increment the read position and return the overflowing 'A'
// |-----|--B--|--C--|
// ^ ^
// w=3 r=1
// 5. The producer thread is suspended, the consumer thread pops a value
Expand Down Expand Up @@ -118,7 +116,7 @@ class SpscSofi
/// @note restricted thread safe: can only be called from one thread. The authorization to push into the
/// SpscSofi can be transferred to another thread if appropriate synchronization mechanisms are used.
/// @return return true if push was successful else false.
/// @code
/// @remarks
/// 1. SpscSofi is empty |-----|-----|
/// 2. push an element |--A--|-----|
/// 3. push an element |--A--|--B--|
Expand Down
37 changes: 19 additions & 18 deletions iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,6 @@ inline bool SpscSofi<ValueType, CapacityValue>::pop(ValueType& valueOut) noexcep
{
// Memory synchronization is not needed but we need to prevent operation reordering to avoid the following scenario
// where the CPU reorder the load of m_readPosition and m_writePosition:
// 0. Initial situation (the queue is full)
// |----|--B--|--C--|
// ^ ^
// w=3 r=1
// 1. The consumer thread loads m_writePosition => 3
// |----|--B--|--C--|
// ^ ^
// w=3 r=1
// 2. The producer thread pushes two times
// |--D--|--E--|-----|
// ^ ^
// r=3 w=5
// 3. The consumer thread loads m_readPosition => 3. The pop method returns false
// => Whereas the queue was full, pop returned false giving the impression that the queue if empty
uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire);
uint64_t nextReadPosition{0};

Expand All @@ -128,6 +114,21 @@ inline bool SpscSofi<ValueType, CapacityValue>::pop(ValueType& valueOut) noexcep
{
nextReadPosition = currentReadPosition;
popWasSuccessful = false;
// We cannot just return false (i.e. we need to continue the loop) to avoid the following situation:
// 0. Initial situation (the queue is full)
// |----|--B--|--C--|
// ^ ^
// w=3 r=1
// 1. The consumer thread loads m_writePosition => 3
// |----|--B--|--C--|
// ^ ^
// w=3 r=1
// 2. The producer thread pushes two times
// |--D--|--E--|-----|
// ^ ^
// r=3 w=5
// 3. The consumer thread loads m_readPosition => 3 The pop method returns false
// => Whereas the queue was full, pop returned false giving the impression that the queue if empty
}
else
{
Expand All @@ -138,9 +139,9 @@ inline bool SpscSofi<ValueType, CapacityValue>::pop(ValueType& valueOut) noexcep
}

// We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted
// While memory synchronization is not needed for m_readPosition we need to have a
// corresponding m_readPosition.store(release) to the m_readPosition.load(acquire) in the
// push method
// While memory synchronization is not needed for m_readPosition, we need to ensure that the
// 'memcpy' happens before the CAS operation.
// Corresponding m_readPosition.load(acquire) is in the push method
// =============================================
// ABA problem: m_readPosition is an uint64_t. Assuming a thread is pushing at a rate of 1 GHz
// while this thread is blocked, we would still need more than 500 years to overflow
Expand Down Expand Up @@ -185,7 +186,7 @@ inline bool SpscSofi<ValueType, CapacityValue>::push(const ValueType& valueIn, V
m_writePosition.store(nextWritePosition, std::memory_order_release);

// While memory synchronization is not needed with m_readPosition, we need
// memory_order_acquire to avoid the reordering of the operation
// memory_order_acquire to avoid the reordering of the operation.
uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire);

// Check if queue is full: since we have an extra element (INTERNAL_CAPACITY_ADD_ON), we need to
Expand Down

0 comments on commit f15ca92

Please sign in to comment.