Skip to content

Commit

Permalink
Merge pull request #2169 from lucabart97/iox-1693-iox-string-messageq…
Browse files Browse the repository at this point in the history
…ueue

iox-eclipse-iceoryx#1963 support for iox::string in MessageQueue
  • Loading branch information
elBoberido authored Jan 29, 2024
2 parents 2c612f5 + 890848f commit 38e9b6b
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 74 deletions.
1 change: 1 addition & 0 deletions doc/website/release-notes/iceoryx-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
- Fast POD data in `iox::vector` [#2082](https://github.com/eclipse-iceoryx/iceoryx/issues/2082)
- MinGW support for Windows [#2150](https://github.com/eclipse-iceoryx/iceoryx/issues/2150)
- Add support for `iox::string` in `UnixDomainSocket` and created `unix_domain_socket.inl` [#2040](https://github.com/eclipse-iceoryx/iceoryx/issues/2040)
- Add support for `iox::string` in `MessageQueue` and created `message_queue.inl` [#1963](https://github.com/eclipse-iceoryx/iceoryx/issues/1963)

**Bugfixes:**

Expand Down
213 changes: 213 additions & 0 deletions iceoryx_hoofs/posix/ipc/include/iox/detail/message_queue.inl
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2024, Eclipse Foundation and the iceoryx contributors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
#ifndef IOX_HOOFS_POSIX_IPC_MESSAGE_QUEUE_INL
#define IOX_HOOFS_POSIX_IPC_MESSAGE_QUEUE_INL

#include "iox/duration.hpp"
#include "iox/message_queue.hpp"
#include "iox/not_null.hpp"
#include "iox/posix_call.hpp"

namespace iox
{
template <typename Type, MessageQueue::Termination Terminator>
expected<void, PosixIpcChannelError>
MessageQueue::timedSendImpl(not_null<const Type*> msg, uint64_t msgSize, const units::Duration& timeout) const noexcept
{
uint64_t msgSizeToSend = msgSize;
if constexpr (Terminator == Termination::NULL_TERMINATOR)
{
msgSizeToSend += NULL_TERMINATOR_SIZE;
}

if (msgSizeToSend > static_cast<uint64_t>(m_attributes.mq_msgsize))
{
IOX_LOG(ERROR, "the message which should be sent to the message queue '" << m_name << "' is too long");
return err(PosixIpcChannelError::MESSAGE_TOO_LONG);
}

timespec timeOut = timeout.timespec(units::TimeSpecReference::Epoch);
auto mqCall = IOX_POSIX_CALL(mq_timedsend)(m_mqDescriptor, msg, msgSizeToSend, 1U, &timeOut)
.failureReturnValue(ERROR_CODE)
// don't use the suppressErrorMessagesForErrnos method since QNX used EINTR instead of ETIMEDOUT
.ignoreErrnos(TIMEOUT_ERRNO)
.evaluate();

if (mqCall.has_error())
{
return err(errnoToEnum(mqCall.error().errnum));
}

if (mqCall->errnum == TIMEOUT_ERRNO)
{
return err(errnoToEnum(ETIMEDOUT));
}

return ok();
}

template <typename Type, MessageQueue::Termination Terminator>
expected<void, PosixIpcChannelError> MessageQueue::sendImpl(not_null<const Type*> msg, uint64_t msgSize) const noexcept
{
uint64_t msgSizeToSend = msgSize;
if constexpr (Terminator == Termination::NULL_TERMINATOR)
{
msgSizeToSend += NULL_TERMINATOR_SIZE;
}

if (msgSizeToSend > static_cast<uint64_t>(m_attributes.mq_msgsize))
{
IOX_LOG(ERROR, "the message which should be sent to the message queue '" << m_name << "' is too long");
return err(PosixIpcChannelError::MESSAGE_TOO_LONG);
}

auto mqCall =
IOX_POSIX_CALL(mq_send)(m_mqDescriptor, msg, msgSizeToSend, 1U).failureReturnValue(ERROR_CODE).evaluate();

if (mqCall.has_error())
{
return err(errnoToEnum(mqCall.error().errnum));
}

if (mqCall->errnum == TIMEOUT_ERRNO)
{
return err(errnoToEnum(ETIMEDOUT));
}

return ok();
}

template <typename Type, MessageQueue::Termination Terminator>
expected<uint64_t, PosixIpcChannelError>
MessageQueue::timedReceiveImpl(not_null<Type*> msg, uint64_t maxMsgSize, const units::Duration& timeout) const noexcept
{
timespec timeOut = timeout.timespec(units::TimeSpecReference::Epoch);
auto mqCall = IOX_POSIX_CALL(mq_timedreceive)(m_mqDescriptor, msg, maxMsgSize, nullptr, &timeOut)
.failureReturnValue(ERROR_CODE)
// don't use the suppressErrorMessagesForErrnos method since QNX used EINTR instead of ETIMEDOUT
.ignoreErrnos(TIMEOUT_ERRNO)
.evaluate();

if (mqCall.has_error())
{
return err(errnoToEnum(mqCall.error().errnum));
}

if (mqCall->errnum == TIMEOUT_ERRNO)
{
return err(errnoToEnum(ETIMEDOUT));
}

return receiveVerification<Type, Terminator>(msg, static_cast<uint64_t>(mqCall->value));
}

template <typename Type, MessageQueue::Termination Terminator>
expected<uint64_t, PosixIpcChannelError> MessageQueue::receiveImpl(not_null<Type*> msg,
uint64_t maxMsgSize) const noexcept
{
auto mqCall =
IOX_POSIX_CALL(mq_receive)(m_mqDescriptor, msg, maxMsgSize, nullptr).failureReturnValue(ERROR_CODE).evaluate();

if (mqCall.has_error())
{
return err(errnoToEnum(mqCall.error().errnum));
}

if (mqCall->errnum == TIMEOUT_ERRNO)
{
return err(errnoToEnum(ETIMEDOUT));
}

return receiveVerification<Type, Terminator>(msg, static_cast<uint64_t>(mqCall->value));
}

template <uint64_t N>
expected<void, PosixIpcChannelError> MessageQueue::send(const iox::string<N>& buf) const noexcept
{
return sendImpl<char, Termination::NULL_TERMINATOR>(buf.c_str(), buf.size());
}

template <uint64_t N>
expected<void, PosixIpcChannelError> MessageQueue::timedSend(const iox::string<N>& buf,
const units::Duration& timeout) const noexcept
{
return timedSendImpl<char, Termination::NULL_TERMINATOR>(buf.c_str(), buf.size(), timeout);
}

template <uint64_t N>
expected<void, PosixIpcChannelError> MessageQueue::receive(iox::string<N>& buf) const noexcept
{
static_assert(N <= MAX_MESSAGE_SIZE, "Size exceeds transmission limit!");

auto result = expected<uint64_t, PosixIpcChannelError>(in_place, uint64_t(0));
buf.unsafe_raw_access([&](auto* str, const auto info) -> uint64_t {
result = receiveImpl<char, Termination::NULL_TERMINATOR>(str, info.total_size);
if (result.has_error())
{
return 0;
}
return result.value();
});
if (result.has_error())
{
return err(result.error());
}
return ok();
}

template <uint64_t N>
expected<void, PosixIpcChannelError> MessageQueue::timedReceive(iox::string<N>& buf,
const units::Duration& timeout) const noexcept
{
static_assert(N <= MAX_MESSAGE_SIZE, "Size exceeds transmission limit!");

auto result = expected<uint64_t, PosixIpcChannelError>(in_place, uint64_t(0));
buf.unsafe_raw_access([&](auto* str, const auto info) -> uint64_t {
result = timedReceiveImpl<char, Termination::NULL_TERMINATOR>(str, info.total_size, timeout);
if (result.has_error())
{
return 0;
}
return result.value();
});
if (result.has_error())
{
return err(result.error());
}
return ok();
}

template <typename Type, MessageQueue::Termination Terminator>
expected<uint64_t, PosixIpcChannelError> MessageQueue::receiveVerification(not_null<Type*> msg,
uint64_t msgLenght) const noexcept
{
if constexpr (Terminator == Termination::NULL_TERMINATOR)
{
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
if (msg[msgLenght - NULL_TERMINATOR_SIZE] != 0)
{
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
msg[0] = 0;
return err(PosixIpcChannelError::INTERNAL_LOGIC_ERROR);
}
return ok<uint64_t>(msgLenght - NULL_TERMINATOR_SIZE);
}

return ok<uint64_t>(msgLenght);
}
} // namespace iox

#endif
56 changes: 56 additions & 0 deletions iceoryx_hoofs/posix/ipc/include/iox/message_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "iox/builder.hpp"
#include "iox/duration.hpp"
#include "iox/expected.hpp"
#include "iox/not_null.hpp"
#include "iox/optional.hpp"
#include "iox/posix_ipc_channel.hpp"

Expand Down Expand Up @@ -58,6 +59,7 @@ class MessageQueue
static constexpr uint64_t MAX_NUMBER_OF_MESSAGES = 10;

using Builder_t = MessageQueueBuilder;
using Message_t = iox::string<MAX_MESSAGE_SIZE>;

MessageQueue() noexcept = delete;
MessageQueue(const MessageQueue& other) = delete;
Expand Down Expand Up @@ -88,6 +90,38 @@ class MessageQueue
expected<void, PosixIpcChannelError> timedSend(const std::string& msg,
const units::Duration& timeout) const noexcept;

/// @brief send a message using iox::string
/// @tparam N capacity of the iox::string
/// @param[in] buf data to send
/// @return PosixIpcChannelError if error occured
template <uint64_t N>
expected<void, PosixIpcChannelError> send(const iox::string<N>& buf) const noexcept;

/// @brief try to send a message for a given timeout duration using iox::string
/// @tparam N capacity of the iox::string
/// @param[in] buf data to send
/// @param[in] timeout for the send operation
/// @return PosixIpcChannelError if error occured
template <uint64_t N>
expected<void, PosixIpcChannelError> timedSend(const iox::string<N>& buf,
const units::Duration& timeout) const noexcept;

/// @brief receive message using iox::string
/// @tparam N capacity of the iox::string
/// @param[in] buf data to receive
/// @return PosixIpcChannelError if error occured
template <uint64_t N>
expected<void, PosixIpcChannelError> receive(iox::string<N>& buf) const noexcept;

/// @brief try to receive message for a given timeout duration using iox::string
/// @tparam N capacity of the iox::string
/// @param[in] buf data to receive
/// @param[in] timeout for the send operation
/// @return PosixIpcChannelError if error occured
template <uint64_t N>
expected<void, PosixIpcChannelError> timedReceive(iox::string<N>& buf,
const units::Duration& timeout) const noexcept;

static expected<bool, PosixIpcChannelError> isOutdated() noexcept;

private:
Expand All @@ -109,6 +143,26 @@ class MessageQueue
sanitizeIpcChannelName(const PosixIpcChannelName_t& name) noexcept;
expected<void, PosixIpcChannelError> destroy() noexcept;

enum class Termination
{
NONE,
NULL_TERMINATOR
};

template <typename Type, Termination Terminator>
expected<void, PosixIpcChannelError>
timedSendImpl(not_null<const Type*> msg, uint64_t msgSize, const units::Duration& timeout) const noexcept;
template <typename Type, Termination Terminator>
expected<uint64_t, PosixIpcChannelError>
timedReceiveImpl(not_null<Type*> msg, uint64_t maxMsgSize, const units::Duration& timeout) const noexcept;
template <typename Type, Termination Terminator>
expected<void, PosixIpcChannelError> sendImpl(not_null<const Type*> msg, uint64_t msgSize) const noexcept;
template <typename Type, Termination Terminator>
expected<uint64_t, PosixIpcChannelError> receiveImpl(not_null<Type*> msg, uint64_t maxMsgSize) const noexcept;
template <typename Type, Termination Terminator>
expected<uint64_t, PosixIpcChannelError> receiveVerification(not_null<Type*> msg,
uint64_t msgLenght) const noexcept;

private:
PosixIpcChannelName_t m_name;
mq_attr m_attributes{};
Expand Down Expand Up @@ -149,4 +203,6 @@ class MessageQueueBuilder

} // namespace iox

#include "detail/message_queue.inl"

#endif // IOX_HOOFS_POSIX_IPC_MESSAGE_QUEUE_HPP
Loading

0 comments on commit 38e9b6b

Please sign in to comment.