Skip to content

Commit

Permalink
iox-#2047 Fix 'array-bounds' warning in chunk distributor
Browse files Browse the repository at this point in the history
  • Loading branch information
elBoberido committed Nov 11, 2023
1 parent 3a6ab58 commit 2bb0363
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "iox/detail/unique_id.hpp"
#include "iox/not_null.hpp"

#include <algorithm>
#include <iterator>
#include <thread>

namespace iox
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ template <typename ChunkDistributorDataType>
inline uint64_t ChunkDistributor<ChunkDistributorDataType>::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
{
uint64_t numberOfQueuesTheChunkWasDeliveredTo{0U};
typename ChunkDistributorDataType::QueueContainer_t remainingQueues;
using QueueContainer = decltype(getMembers()->m_queues);
QueueContainer fullQueuesAwaitingDelivery;
{
typename MemberType_t::LockGuard_t lock(*getMembers());

Expand All @@ -158,7 +159,7 @@ inline uint64_t ChunkDistributor<ChunkDistributorDataType>::deliverToAllStoredQu
{
if (isBlockingQueue)
{
remainingQueues.emplace_back(queue);
fullQueuesAwaitingDelivery.emplace_back(queue);
}
else
{
Expand All @@ -171,43 +172,40 @@ inline uint64_t ChunkDistributor<ChunkDistributorDataType>::deliverToAllStoredQu

// busy waiting until every queue is served
iox::detail::adaptive_wait adaptiveWait;
while (!remainingQueues.empty())
while (!fullQueuesAwaitingDelivery.empty())
{
adaptiveWait.wait();
{
// create intersection of current queues and remainingQueues
// create intersection of current queues and fullQueuesAwaitingDelivery
// reason: it is possible that since the last iteration some subscriber have already unsubscribed
// and without this intersection we would deliver to dead queues
typename MemberType_t::LockGuard_t lock(*getMembers());
typename ChunkDistributorDataType::QueueContainer_t queueIntersection(remainingQueues.size());
auto greaterThan = [](RelativePointer<ChunkQueueData_t>& a, RelativePointer<ChunkQueueData_t>& b) -> bool {
QueueContainer remainingQueues;
using QueueContainerValue = typename QueueContainer::value_type;
auto greaterThan = [](QueueContainerValue& a, QueueContainerValue& b) -> bool {
return reinterpret_cast<uint64_t>(a.get()) > reinterpret_cast<uint64_t>(b.get());
};
std::sort(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), greaterThan);
std::sort(remainingQueues.begin(), remainingQueues.end(), greaterThan);
std::sort(fullQueuesAwaitingDelivery.begin(), fullQueuesAwaitingDelivery.end(), greaterThan);

auto iter = std::set_intersection(getMembers()->m_queues.begin(),
getMembers()->m_queues.end(),
remainingQueues.begin(),
remainingQueues.end(),
queueIntersection.begin(),
greaterThan);
queueIntersection.resize(static_cast<uint64_t>(iter - queueIntersection.begin()));
remainingQueues = queueIntersection;
std::set_intersection(getMembers()->m_queues.begin(),
getMembers()->m_queues.end(),
fullQueuesAwaitingDelivery.begin(),
fullQueuesAwaitingDelivery.end(),
std::back_inserter(remainingQueues),
greaterThan);
fullQueuesAwaitingDelivery.clear();

// deliver to remaining queues
for (uint64_t i = remainingQueues.size() - 1U; !remainingQueues.empty(); --i)
for (auto& queue : remainingQueues)
{
if (pushToQueue(remainingQueues[i].get(), chunk))
if (pushToQueue(queue.get(), chunk))
{
remainingQueues.erase(remainingQueues.begin() + i);
++numberOfQueuesTheChunkWasDeliveredTo;
}

// don't move this up since the for loop counts downwards and the algorithm would break
if (i == 0U)
else
{
break;
fullQueuesAwaitingDelivery.push_back(queue);
}
}
}
Expand Down

0 comments on commit 2bb0363

Please sign in to comment.