Skip to content

Commit

Permalink
iox-#2177 Address reviewer's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
albtam committed Mar 20, 2024
1 parent db57a3e commit 8cc9277
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 38 deletions.
54 changes: 23 additions & 31 deletions iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,8 @@ namespace concurrent
/// a full SpscSoFi.
/// SpscSoFi is especially designed to provide fixed capacity storage.
/// SpscSoFi only allocates memory when created, capacity can be adjusted explicitly.
/// @example
/// It's an expected behavior that when push/pop are called concurrently and SpscSoFi is full, as
/// many elements as specified with 'CapacityValue' can be removed
/// 0: Initial situation:
/// |--A--|--B--|
/// 1. Thread 1 pushes a new element. Since it is an overflowing situation, the overwritten value is
/// removed and returned to the caller
/// |--A--|--B--|
/// 2. Right before push() returns, pop() detects that an element is about to be removed, and remove
/// the next element
/// |--C--|----|
/// @param[in] ValueType DataType to be stored, must be trivially copyable
/// @param[in] CapacityValue Capacity of the SpscSofi
template <class ValueType, uint64_t CapacityValue>
Expand All @@ -71,42 +62,43 @@ class SpscSofi
// |--A--|--B--|
// ^
// w=2, r=0
// 2. We want to push a new element
// 3. Advance the read position (this effectively reduces the capacity and is the reason the internal capacity
// needs to be larger; the consumer cannot pop out CAPACITY amount of samples even though the queue is full if
// the push thread is suspended right after this operation)
// 2. The producer thread pushes a new element
// 3. Increment the read position (this effectively reduces the capacity and is the reason the internal capacity
// needs to be larger;
// |--A--|--B--|
// ^ ^
// w=2 r=1
// 4. Take overflow data
// |-----|--B--|
// ^ ^
// w=2 r=1
// 5. Write new data
// |--C--|--B--|
// ^ ^
// w=2 r=1
// 6. Advance next write position
// |--C--|--B--|
// ^
// w=3, r=1
// 4. The producer thread is suspended, the consumer thread pops a value
// |--A--|-----|
// ^
// w=2, r=2
// 5. The consumer tries to pop another value but the queue looks empty as
// write position == read position: the consumer cannot pop
// out CAPACITY amount of samples even though the queue was full
// ========================================================================
// With "capacity add-on"
// 1. CapacityValue = 2, InternalCapacity = 3
// |--A--|--B--|----|
// ^ ^
// r=0 w=2
// 2. We want to push a new element
// 3. We first write at index 2 % capacity
// 2. The producer threads pushes a new element
// 3. First write the element at index 2 % capacity and increment the write index
// |--A--|--B--|--C--|
// ^
// w=3, r=0,
// 2. We want to push a new element:
// 4. We detect that the queue if full so we retrieve the value pointed by the read pointer: the value A is
// returned
// |-(A)-|--B--|--C--|
// 4. Then increment the read position
// |--A--|--B--|--C--|
// ^ ^
// w=3 r=1
// 5. The producer thread is suspended, the consumer thread pops a value
// |--A--|-----|--C--|
// ^ ^
// w=3 r=2
// 6. The consumer thread pops another value
// |--A--|-----|-----|
// ^
// w=3, r=3
// 7. Now, write position == read position so we cannot pop another element: the queue looks empty. We managed to pop CapacityValue elements
// ========================================================================
static constexpr uint32_t INTERNAL_CAPACITY_ADDON = 1;

Expand Down
12 changes: 5 additions & 7 deletions iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ inline bool SpscSofi<ValueType, CapacityValue>::pop(ValueType& valueOut) noexcep
// r=3 w=5
// 3. The consumer thread loads m_readPosition => 3. The pop method returns false
// => Whereas the queue was full, pop returned false giving the impression that the queue if empty
// TODO: To which release/store statement does it correspond?
// TODO(@albtam): Explain yo which release/store statement it corresponds
uint64_t currentReadPos = m_readPosition.load(std::memory_order_acquire);

do
Expand Down Expand Up @@ -151,12 +151,10 @@ inline bool SpscSofi<ValueType, CapacityValue>::push(const ValueType& valueIn, V
{
constexpr bool SOFI_OVERFLOW{false};

// SYNC POINT READ: m_data
// We need to synchronize data to avoid the following scenario:
// 1. A thread calls push() and updates data with a new value
// 2. Another thread calls push() and enters the overflow case. The data could be read before
// any synchronization
uint64_t currentWritePos = m_writePosition.load(std::memory_order_acquire);
// Memory order relaxed is enough since:
// - no synchronization needed as we are loading a value only modified in this method and this method cannot be accessed concurrently
// - the operation cannot move below without observable changes
uint64_t currentWritePos = m_writePosition.load(std::memory_order_relaxed);
uint64_t nextWritePos = currentWritePos + 1U;

m_data[currentWritePos % m_size] = valueIn;
Expand Down

0 comments on commit 8cc9277

Please sign in to comment.