diff --git a/doc/website/release-notes/iceoryx-unreleased.md b/doc/website/release-notes/iceoryx-unreleased.md index 14e0f9fa2b..4e3e1944f0 100644 --- a/doc/website/release-notes/iceoryx-unreleased.md +++ b/doc/website/release-notes/iceoryx-unreleased.md @@ -58,6 +58,7 @@ - Fast POD data in `iox::vector` [#2082](https://github.com/eclipse-iceoryx/iceoryx/issues/2082) - MinGW support for Windows [#2150](https://github.com/eclipse-iceoryx/iceoryx/issues/2150) - Add support for `iox::string` in `UnixDomainSocket` and created `unix_domain_socket.inl` [#2040](https://github.com/eclipse-iceoryx/iceoryx/issues/2040) +- Add support for `iox::string` in `MessageQueue` and created `message_queue.inl` [#1963](https://github.com/eclipse-iceoryx/iceoryx/issues/1963) **Bugfixes:** diff --git a/iceoryx_hoofs/posix/ipc/include/iox/detail/message_queue.inl b/iceoryx_hoofs/posix/ipc/include/iox/detail/message_queue.inl new file mode 100644 index 0000000000..d626049228 --- /dev/null +++ b/iceoryx_hoofs/posix/ipc/include/iox/detail/message_queue.inl @@ -0,0 +1,213 @@ +// Copyright 2024, Eclipse Foundation and the iceoryx contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 +#ifndef IOX_HOOFS_POSIX_IPC_MESSAGE_QUEUE_INL +#define IOX_HOOFS_POSIX_IPC_MESSAGE_QUEUE_INL + +#include "iox/duration.hpp" +#include "iox/message_queue.hpp" +#include "iox/not_null.hpp" +#include "iox/posix_call.hpp" + +namespace iox +{ +template +expected +MessageQueue::timedSendImpl(not_null msg, uint64_t msgSize, const units::Duration& timeout) const noexcept +{ + uint64_t msgSizeToSend = msgSize; + if constexpr (Terminator == Termination::NULL_TERMINATOR) + { + msgSizeToSend += NULL_TERMINATOR_SIZE; + } + + if (msgSizeToSend > static_cast(m_attributes.mq_msgsize)) + { + IOX_LOG(ERROR, "the message which should be sent to the message queue '" << m_name << "' is too long"); + return err(PosixIpcChannelError::MESSAGE_TOO_LONG); + } + + timespec timeOut = timeout.timespec(units::TimeSpecReference::Epoch); + auto mqCall = IOX_POSIX_CALL(mq_timedsend)(m_mqDescriptor, msg, msgSizeToSend, 1U, &timeOut) + .failureReturnValue(ERROR_CODE) + // don't use the suppressErrorMessagesForErrnos method since QNX used EINTR instead of ETIMEDOUT + .ignoreErrnos(TIMEOUT_ERRNO) + .evaluate(); + + if (mqCall.has_error()) + { + return err(errnoToEnum(mqCall.error().errnum)); + } + + if (mqCall->errnum == TIMEOUT_ERRNO) + { + return err(errnoToEnum(ETIMEDOUT)); + } + + return ok(); +} + +template +expected MessageQueue::sendImpl(not_null msg, uint64_t msgSize) const noexcept +{ + uint64_t msgSizeToSend = msgSize; + if constexpr (Terminator == Termination::NULL_TERMINATOR) + { + msgSizeToSend += NULL_TERMINATOR_SIZE; + } + + if (msgSizeToSend > static_cast(m_attributes.mq_msgsize)) + { + IOX_LOG(ERROR, "the message which should be sent to the message queue '" << m_name << "' is too long"); + return err(PosixIpcChannelError::MESSAGE_TOO_LONG); + } + + auto mqCall = + IOX_POSIX_CALL(mq_send)(m_mqDescriptor, msg, msgSizeToSend, 1U).failureReturnValue(ERROR_CODE).evaluate(); + + if (mqCall.has_error()) + { + return err(errnoToEnum(mqCall.error().errnum)); + } + + if (mqCall->errnum == TIMEOUT_ERRNO) + { + return err(errnoToEnum(ETIMEDOUT)); + } + + return ok(); +} + +template +expected +MessageQueue::timedReceiveImpl(not_null msg, uint64_t maxMsgSize, const units::Duration& timeout) const noexcept +{ + timespec timeOut = timeout.timespec(units::TimeSpecReference::Epoch); + auto mqCall = IOX_POSIX_CALL(mq_timedreceive)(m_mqDescriptor, msg, maxMsgSize, nullptr, &timeOut) + .failureReturnValue(ERROR_CODE) + // don't use the suppressErrorMessagesForErrnos method since QNX used EINTR instead of ETIMEDOUT + .ignoreErrnos(TIMEOUT_ERRNO) + .evaluate(); + + if (mqCall.has_error()) + { + return err(errnoToEnum(mqCall.error().errnum)); + } + + if (mqCall->errnum == TIMEOUT_ERRNO) + { + return err(errnoToEnum(ETIMEDOUT)); + } + + return receiveVerification(msg, static_cast(mqCall->value)); +} + +template +expected MessageQueue::receiveImpl(not_null msg, + uint64_t maxMsgSize) const noexcept +{ + auto mqCall = + IOX_POSIX_CALL(mq_receive)(m_mqDescriptor, msg, maxMsgSize, nullptr).failureReturnValue(ERROR_CODE).evaluate(); + + if (mqCall.has_error()) + { + return err(errnoToEnum(mqCall.error().errnum)); + } + + if (mqCall->errnum == TIMEOUT_ERRNO) + { + return err(errnoToEnum(ETIMEDOUT)); + } + + return receiveVerification(msg, static_cast(mqCall->value)); +} + +template +expected MessageQueue::send(const iox::string& buf) const noexcept +{ + return sendImpl(buf.c_str(), buf.size()); +} + +template +expected MessageQueue::timedSend(const iox::string& buf, + const units::Duration& timeout) const noexcept +{ + return timedSendImpl(buf.c_str(), buf.size(), timeout); +} + +template +expected MessageQueue::receive(iox::string& buf) const noexcept +{ + static_assert(N <= MAX_MESSAGE_SIZE, "Size exceeds transmission limit!"); + + auto result = expected(in_place, uint64_t(0)); + buf.unsafe_raw_access([&](auto* str, const auto info) -> uint64_t { + result = receiveImpl(str, info.total_size); + if (result.has_error()) + { + return 0; + } + return result.value(); + }); + if (result.has_error()) + { + return err(result.error()); + } + return ok(); +} + +template +expected MessageQueue::timedReceive(iox::string& buf, + const units::Duration& timeout) const noexcept +{ + static_assert(N <= MAX_MESSAGE_SIZE, "Size exceeds transmission limit!"); + + auto result = expected(in_place, uint64_t(0)); + buf.unsafe_raw_access([&](auto* str, const auto info) -> uint64_t { + result = timedReceiveImpl(str, info.total_size, timeout); + if (result.has_error()) + { + return 0; + } + return result.value(); + }); + if (result.has_error()) + { + return err(result.error()); + } + return ok(); +} + +template +expected MessageQueue::receiveVerification(not_null msg, + uint64_t msgLenght) const noexcept +{ + if constexpr (Terminator == Termination::NULL_TERMINATOR) + { + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + if (msg[msgLenght - NULL_TERMINATOR_SIZE] != 0) + { + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + msg[0] = 0; + return err(PosixIpcChannelError::INTERNAL_LOGIC_ERROR); + } + return ok(msgLenght - NULL_TERMINATOR_SIZE); + } + + return ok(msgLenght); +} +} // namespace iox + +#endif \ No newline at end of file diff --git a/iceoryx_hoofs/posix/ipc/include/iox/message_queue.hpp b/iceoryx_hoofs/posix/ipc/include/iox/message_queue.hpp index 6717e7978b..318846c2a3 100644 --- a/iceoryx_hoofs/posix/ipc/include/iox/message_queue.hpp +++ b/iceoryx_hoofs/posix/ipc/include/iox/message_queue.hpp @@ -25,6 +25,7 @@ #include "iox/builder.hpp" #include "iox/duration.hpp" #include "iox/expected.hpp" +#include "iox/not_null.hpp" #include "iox/optional.hpp" #include "iox/posix_ipc_channel.hpp" @@ -58,6 +59,7 @@ class MessageQueue static constexpr uint64_t MAX_NUMBER_OF_MESSAGES = 10; using Builder_t = MessageQueueBuilder; + using Message_t = iox::string; MessageQueue() noexcept = delete; MessageQueue(const MessageQueue& other) = delete; @@ -88,6 +90,38 @@ class MessageQueue expected timedSend(const std::string& msg, const units::Duration& timeout) const noexcept; + /// @brief send a message using iox::string + /// @tparam N capacity of the iox::string + /// @param[in] buf data to send + /// @return PosixIpcChannelError if error occured + template + expected send(const iox::string& buf) const noexcept; + + /// @brief try to send a message for a given timeout duration using iox::string + /// @tparam N capacity of the iox::string + /// @param[in] buf data to send + /// @param[in] timeout for the send operation + /// @return PosixIpcChannelError if error occured + template + expected timedSend(const iox::string& buf, + const units::Duration& timeout) const noexcept; + + /// @brief receive message using iox::string + /// @tparam N capacity of the iox::string + /// @param[in] buf data to receive + /// @return PosixIpcChannelError if error occured + template + expected receive(iox::string& buf) const noexcept; + + /// @brief try to receive message for a given timeout duration using iox::string + /// @tparam N capacity of the iox::string + /// @param[in] buf data to receive + /// @param[in] timeout for the send operation + /// @return PosixIpcChannelError if error occured + template + expected timedReceive(iox::string& buf, + const units::Duration& timeout) const noexcept; + static expected isOutdated() noexcept; private: @@ -109,6 +143,26 @@ class MessageQueue sanitizeIpcChannelName(const PosixIpcChannelName_t& name) noexcept; expected destroy() noexcept; + enum class Termination + { + NONE, + NULL_TERMINATOR + }; + + template + expected + timedSendImpl(not_null msg, uint64_t msgSize, const units::Duration& timeout) const noexcept; + template + expected + timedReceiveImpl(not_null msg, uint64_t maxMsgSize, const units::Duration& timeout) const noexcept; + template + expected sendImpl(not_null msg, uint64_t msgSize) const noexcept; + template + expected receiveImpl(not_null msg, uint64_t maxMsgSize) const noexcept; + template + expected receiveVerification(not_null msg, + uint64_t msgLenght) const noexcept; + private: PosixIpcChannelName_t m_name; mq_attr m_attributes{}; @@ -149,4 +203,6 @@ class MessageQueueBuilder } // namespace iox +#include "detail/message_queue.inl" + #endif // IOX_HOOFS_POSIX_IPC_MESSAGE_QUEUE_HPP diff --git a/iceoryx_hoofs/posix/ipc/source/message_queue.cpp b/iceoryx_hoofs/posix/ipc/source/message_queue.cpp index c4f3fe9056..3fce5d785d 100644 --- a/iceoryx_hoofs/posix/ipc/source/message_queue.cpp +++ b/iceoryx_hoofs/posix/ipc/source/message_queue.cpp @@ -169,39 +169,26 @@ expected MessageQueue::destroy() noexcept expected MessageQueue::send(const std::string& msg) const noexcept { - const uint64_t messageSize = msg.size() + NULL_TERMINATOR_SIZE; - if (messageSize > static_cast(m_attributes.mq_msgsize)) - { - return err(PosixIpcChannelError::MESSAGE_TOO_LONG); - } - - auto mqCall = - IOX_POSIX_CALL(mq_send)(m_mqDescriptor, msg.c_str(), messageSize, 1U).failureReturnValue(ERROR_CODE).evaluate(); - - if (mqCall.has_error()) - { - return err(errnoToEnum(mqCall.error().errnum)); - } - - return ok(); + return sendImpl(msg.c_str(), msg.size()); } expected MessageQueue::receive() const noexcept { - /// NOLINTJUSTIFICATION required as raw memory buffer for mq_receive - /// NOLINTNEXTLINE(hicpp-avoid-c-arrays,cppcoreguidelines-avoid-c-arrays) - char message[MAX_MESSAGE_SIZE]; - - auto mqCall = IOX_POSIX_CALL(mq_receive)(m_mqDescriptor, &message[0], MAX_MESSAGE_SIZE, nullptr) - .failureReturnValue(ERROR_CODE) - .evaluate(); - - if (mqCall.has_error()) + auto result = expected(in_place, uint64_t(0)); + Message_t msg; + msg.unsafe_raw_access([&](auto* str, const auto info) -> uint64_t { + result = this->receiveImpl(str, info.total_size); + if (result.has_error()) + { + return 0; + } + return result.value(); + }); + if (result.has_error()) { - return err(errnoToEnum(mqCall.error().errnum)); + return err(result.error()); } - - return ok(std::string(&(message[0]))); + return ok(msg.c_str()); } expected MessageQueue::open(const PosixIpcChannelName_t& name, @@ -274,60 +261,27 @@ expected MessageQueue::unlink() noexcept expected MessageQueue::timedReceive(const units::Duration& timeout) const noexcept { - timespec timeOut = timeout.timespec(units::TimeSpecReference::Epoch); - /// NOLINTJUSTIFICATION required as internal buffer for receive - /// NOLINTNEXTLINE(hicpp-avoid-c-arrays,cppcoreguidelines-avoid-c-arrays) - char message[MAX_MESSAGE_SIZE]; - - auto mqCall = IOX_POSIX_CALL(mq_timedreceive)(m_mqDescriptor, &message[0], MAX_MESSAGE_SIZE, nullptr, &timeOut) - .failureReturnValue(ERROR_CODE) - // don't use the suppressErrorMessagesForErrnos method since QNX used EINTR instead of ETIMEDOUT - .ignoreErrnos(TIMEOUT_ERRNO) - .evaluate(); - - if (mqCall.has_error()) - { - return err(errnoToEnum(mqCall.error().errnum)); - } - - if (mqCall->errnum == TIMEOUT_ERRNO) + auto result = expected(in_place, uint64_t(0)); + Message_t msg; + msg.unsafe_raw_access([&](auto* str, const auto info) -> uint64_t { + result = this->timedReceiveImpl(str, info.total_size, timeout); + if (result.has_error()) + { + return 0; + } + return result.value(); + }); + if (result.has_error()) { - return err(errnoToEnum(ETIMEDOUT)); + return err(result.error()); } - - return ok(std::string(&(message[0]))); + return ok(msg.c_str()); } expected MessageQueue::timedSend(const std::string& msg, const units::Duration& timeout) const noexcept { - const uint64_t messageSize = msg.size() + NULL_TERMINATOR_SIZE; - if (messageSize > static_cast(m_attributes.mq_msgsize)) - { - IOX_LOG(ERROR, - "the message '" << msg << "' which should be sent to the message queue '" << m_name << "' is too long"); - return err(PosixIpcChannelError::MESSAGE_TOO_LONG); - } - - timespec timeOut = timeout.timespec(units::TimeSpecReference::Epoch); - - auto mqCall = IOX_POSIX_CALL(mq_timedsend)(m_mqDescriptor, msg.c_str(), messageSize, 1U, &timeOut) - .failureReturnValue(ERROR_CODE) - // don't use the suppressErrorMessagesForErrnos method since QNX used EINTR instead of ETIMEDOUT - .ignoreErrnos(TIMEOUT_ERRNO) - .evaluate(); - - if (mqCall.has_error()) - { - return err(errnoToEnum(mqCall.error().errnum)); - } - - if (mqCall->errnum == TIMEOUT_ERRNO) - { - return err(errnoToEnum(ETIMEDOUT)); - } - - return ok(); + return timedSendImpl(msg.c_str(), msg.size(), timeout); } expected MessageQueue::isOutdated() noexcept