-
Notifications
You must be signed in to change notification settings - Fork 403
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
iox-#2047 Fix 'array-bounds' warning in chunk distributor
- Loading branch information
1 parent
097ec19
commit 20e77a9
Showing
2 changed files
with
23 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. | ||
// Copyright (c) 2021 - 2022 by Apex.AI Inc. All rights reserved. | ||
// Copyright (c) 2023 by Mathias Kraus <[email protected]>. All rights reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
|
@@ -140,7 +141,8 @@ template <typename ChunkDistributorDataType> | |
inline uint64_t ChunkDistributor<ChunkDistributorDataType>::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept | ||
{ | ||
uint64_t numberOfQueuesTheChunkWasDeliveredTo{0U}; | ||
typename ChunkDistributorDataType::QueueContainer_t remainingQueues; | ||
using QueueContainer = decltype(getMembers()->m_queues); | ||
QueueContainer fullQueuesAwaitingDelivery; | ||
{ | ||
typename MemberType_t::LockGuard_t lock(*getMembers()); | ||
|
||
|
@@ -158,7 +160,7 @@ inline uint64_t ChunkDistributor<ChunkDistributorDataType>::deliverToAllStoredQu | |
{ | ||
if (isBlockingQueue) | ||
{ | ||
remainingQueues.emplace_back(queue); | ||
fullQueuesAwaitingDelivery.emplace_back(queue); | ||
} | ||
else | ||
{ | ||
|
@@ -171,43 +173,40 @@ inline uint64_t ChunkDistributor<ChunkDistributorDataType>::deliverToAllStoredQu | |
|
||
// busy waiting until every queue is served | ||
iox::detail::adaptive_wait adaptiveWait; | ||
while (!remainingQueues.empty()) | ||
while (!fullQueuesAwaitingDelivery.empty()) | ||
{ | ||
adaptiveWait.wait(); | ||
{ | ||
// create intersection of current queues and remainingQueues | ||
// create intersection of current queues and fullQueuesAwaitingDelivery | ||
// reason: it is possible that since the last iteration some subscriber have already unsubscribed | ||
// and without this intersection we would deliver to dead queues | ||
typename MemberType_t::LockGuard_t lock(*getMembers()); | ||
typename ChunkDistributorDataType::QueueContainer_t queueIntersection(remainingQueues.size()); | ||
auto greaterThan = [](RelativePointer<ChunkQueueData_t>& a, RelativePointer<ChunkQueueData_t>& b) -> bool { | ||
QueueContainer remainingQueues; | ||
using QueueContainerValue = typename QueueContainer::value_type; | ||
auto greaterThan = [](QueueContainerValue& a, QueueContainerValue& b) -> bool { | ||
return reinterpret_cast<uint64_t>(a.get()) > reinterpret_cast<uint64_t>(b.get()); | ||
}; | ||
std::sort(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), greaterThan); | ||
std::sort(remainingQueues.begin(), remainingQueues.end(), greaterThan); | ||
std::sort(fullQueuesAwaitingDelivery.begin(), fullQueuesAwaitingDelivery.end(), greaterThan); | ||
|
||
auto iter = std::set_intersection(getMembers()->m_queues.begin(), | ||
getMembers()->m_queues.end(), | ||
remainingQueues.begin(), | ||
remainingQueues.end(), | ||
queueIntersection.begin(), | ||
greaterThan); | ||
queueIntersection.resize(static_cast<uint64_t>(iter - queueIntersection.begin())); | ||
remainingQueues = queueIntersection; | ||
std::set_intersection(getMembers()->m_queues.begin(), | ||
getMembers()->m_queues.end(), | ||
fullQueuesAwaitingDelivery.begin(), | ||
fullQueuesAwaitingDelivery.end(), | ||
std::back_inserter(remainingQueues), | ||
greaterThan); | ||
fullQueuesAwaitingDelivery.clear(); | ||
|
||
// deliver to remaining queues | ||
for (uint64_t i = remainingQueues.size() - 1U; !remainingQueues.empty(); --i) | ||
for (auto& queue : remainingQueues) | ||
{ | ||
if (pushToQueue(remainingQueues[i].get(), chunk)) | ||
if (pushToQueue(queue.get(), chunk)) | ||
{ | ||
remainingQueues.erase(remainingQueues.begin() + i); | ||
++numberOfQueuesTheChunkWasDeliveredTo; | ||
} | ||
|
||
// don't move this up since the for loop counts downwards and the algorithm would break | ||
if (i == 0U) | ||
else | ||
{ | ||
break; | ||
fullQueuesAwaitingDelivery.push_back(queue); | ||
} | ||
} | ||
} | ||
|