diff --git a/doc/design/chunk_header.md b/doc/design/chunk_header.md index 228b715e70..63564be1e9 100644 --- a/doc/design/chunk_header.md +++ b/doc/design/chunk_header.md @@ -277,7 +277,7 @@ auto pub = iox::popo::Publisher(serviceDescription, userHeaderHook); - part of the specified chunk-payload might also be used as padding for the user-payload alignment - the user will continue to specify the chunk-payload; if a user-header or custom user-payload alignment is used, the user needs to take this into account - is it necessary to store a flag in `ChunkHeader` if a user extension is used? - - we could maintain a list of known user-header IDs or ranges of IDs similar to `IANA` https://tools.ietf.org/id/draft-cotton-tsvwg-iana-ports-00.html#privateports + - we could maintain a list of known user-header IDs or ranges of IDs similar to `IANA` https://datatracker.ietf.org/doc/id/draft-cotton-tsvwg-iana-ports-00.html#privateports - the IDs could be stored in the `ChunkHeader` and everything with an ID larger than `0xC000` is free to use - to make this somewhat save, the `ChunkHeaderHook` must be mandatory with e.g. a `virtual uint16_t getId() = 0;` method which will be called in the `ChunkSender` - alternatively, the user-header struct must have a `constexpr uint16_t USER_HEADER_ID`; if it's not present, we could set the ID to `0xC000` which is the first ID free to use diff --git a/iceoryx_examples/iceperf/mq.cpp b/iceoryx_examples/iceperf/mq.cpp index 400f39053c..cda1b06772 100644 --- a/iceoryx_examples/iceperf/mq.cpp +++ b/iceoryx_examples/iceperf/mq.cpp @@ -163,7 +163,7 @@ PerfTopic MQ::receivePerfTopic() noexcept void MQ::open(const std::string& name, const iox::posix::IpcChannelSide channelSide) noexcept { - int32_t openFlags = O_RDWR; + int32_t openFlags = O_RDWR | O_NONBLOCK; if (channelSide == iox::posix::IpcChannelSide::SERVER) { openFlags |= O_CREAT; @@ -208,23 +208,35 @@ void MQ::open(const std::string& name, const iox::posix::IpcChannelSide channelS void MQ::send(const char* buffer, uint32_t length) noexcept { - iox::posix::posixCall(mq_send)(m_mqDescriptorPublisher, buffer, length, 1U) - .failureReturnValue(ERROR_CODE) - .evaluate() - .or_else([&](auto& r) { - std::cout << std::endl - << "send error for " << m_publisherMqName << ", " << r.getHumanReadableErrnum() << std::endl; - exit(1); - }); + while (iox::posix::posixCall(mq_send)(m_mqDescriptorPublisher, buffer, length, 1U) + .failureReturnValue(ERROR_CODE) + .ignoreErrnos(EAGAIN) + .evaluate() + .or_else([&](auto& r) { + std::cout << std::endl + << "send error for " << m_publisherMqName << ", " << r.getHumanReadableErrnum() + << std::endl; + exit(1); + }) + ->errnum + == EAGAIN) + { + } } void MQ::receive(char* buffer) noexcept { - iox::posix::posixCall(mq_receive)(m_mqDescriptorSubscriber, buffer, MAX_MESSAGE_SIZE, nullptr) - .failureReturnValue(ERROR_CODE) - .evaluate() - .or_else([&](auto& r) { - std::cout << "receive error for " << m_subscriberMqName << ", " << r.getHumanReadableErrnum() << std::endl; - exit(1); - }); + while (iox::posix::posixCall(mq_receive)(m_mqDescriptorSubscriber, buffer, MAX_MESSAGE_SIZE, nullptr) + .failureReturnValue(ERROR_CODE) + .ignoreErrnos(EAGAIN) + .evaluate() + .or_else([&](auto& r) { + std::cout << "receive error for " << m_subscriberMqName << ", " << r.getHumanReadableErrnum() + << std::endl; + exit(1); + }) + ->errnum + == EAGAIN) + { + } } diff --git a/iceoryx_examples/iceperf/uds.cpp b/iceoryx_examples/iceperf/uds.cpp index fa9b6ddb95..65800199b9 100644 --- a/iceoryx_examples/iceperf/uds.cpp +++ b/iceoryx_examples/iceperf/uds.cpp @@ -93,7 +93,7 @@ void UDS::initFollower() noexcept void UDS::init() noexcept { // init subscriber - iox::posix::posixCall(iox_socket)(AF_LOCAL, SOCK_DGRAM, 0) + iox::posix::posixCall(iox_socket)(AF_LOCAL, SOCK_DGRAM | SOCK_NONBLOCK, 0) .failureReturnValue(ERROR_CODE) .evaluate() .and_then([this](auto& r) { m_sockfdSubscriber = r.value; }) @@ -112,7 +112,7 @@ void UDS::init() noexcept }); // init publisher - iox::posix::posixCall(iox_socket)(AF_LOCAL, SOCK_DGRAM, 0) + iox::posix::posixCall(iox_socket)(AF_LOCAL, SOCK_DGRAM | SOCK_NONBLOCK, 0) .failureReturnValue(ERROR_CODE) .evaluate() .and_then([this](auto& r) { m_sockfdPublisher = r.value; }) @@ -135,7 +135,7 @@ void UDS::waitForLeader() noexcept reinterpret_cast(&m_sockAddrPublisher), sizeof(m_sockAddrPublisher)) .failureReturnValue(ERROR_CODE) - .ignoreErrnos(ENOENT) + .ignoreErrnos(ENOENT, ECONNREFUSED) .evaluate() .or_else([](auto& r) { std::cout << "send error " << r.getHumanReadableErrnum() << std::endl; @@ -143,7 +143,7 @@ void UDS::waitForLeader() noexcept }) .value(); - if (sendCall.errnum == ENOENT) + if (sendCall.errnum == ENOENT || sendCall.errnum == ECONNREFUSED) { constexpr std::chrono::milliseconds RETRY_INTERVAL{10}; std::this_thread::sleep_for(RETRY_INTERVAL); @@ -239,32 +239,40 @@ void UDS::send(const char* buffer, uint32_t length) noexcept // only return from this loop when the message could be send successfully // if the OS socket message buffer if full, retry until it is free'd by // the OS and the message could be send - while (iox::posix::posixCall(iox_sendto)(m_sockfdPublisher, - buffer, - length, - 0, - reinterpret_cast(&m_sockAddrPublisher), - sizeof(m_sockAddrPublisher)) + while (true) + { + auto result = iox::posix::posixCall(iox_sendto)(m_sockfdPublisher, + buffer, + length, + 0, + reinterpret_cast(&m_sockAddrPublisher), + sizeof(m_sockAddrPublisher)) + .failureReturnValue(ERROR_CODE) + .ignoreErrnos(ENOBUFS, EAGAIN) + .evaluate() + .or_else([](auto& r) { + std::cout << std::endl << "send error " << r.getHumanReadableErrnum() << std::endl; + exit(1); + }); + if (result->errnum != ENOBUFS && result->errnum != EAGAIN) + { + break; + } + } +} + +void UDS::receive(char* buffer) noexcept +{ + while (iox::posix::posixCall(iox_recvfrom)(m_sockfdSubscriber, buffer, MAX_MESSAGE_SIZE, 0, nullptr, nullptr) .failureReturnValue(ERROR_CODE) - .ignoreErrnos(ENOBUFS) + .ignoreErrnos(EAGAIN) .evaluate() .or_else([](auto& r) { - std::cout << std::endl << "send error " << r.getHumanReadableErrnum() << std::endl; + std::cout << "receive error " << r.getHumanReadableErrnum() << std::endl; exit(1); }) ->errnum - == ENOBUFS) + == EAGAIN) { } } - -void UDS::receive(char* buffer) noexcept -{ - iox::posix::posixCall(iox_recvfrom)(m_sockfdSubscriber, buffer, MAX_MESSAGE_SIZE, 0, nullptr, nullptr) - .failureReturnValue(ERROR_CODE) - .evaluate() - .or_else([](auto& r) { - std::cout << "receive error " << r.getHumanReadableErrnum() << std::endl; - exit(1); - }); -} diff --git a/iceoryx_platform/mac/include/iceoryx_platform/socket.hpp b/iceoryx_platform/mac/include/iceoryx_platform/socket.hpp index 9ffcadc01c..729575a1c5 100644 --- a/iceoryx_platform/mac/include/iceoryx_platform/socket.hpp +++ b/iceoryx_platform/mac/include/iceoryx_platform/socket.hpp @@ -20,6 +20,8 @@ #include #include +#define SOCK_NONBLOCK 0 + int iox_bind(int sockfd, const struct sockaddr* addr, socklen_t addrlen); int iox_socket(int domain, int type, int protocol); int iox_setsockopt(int sockfd, int level, int optname, const void* optval, socklen_t optlen); diff --git a/iceoryx_platform/win/include/iceoryx_platform/socket.hpp b/iceoryx_platform/win/include/iceoryx_platform/socket.hpp index 9668759f7c..e7a3591436 100644 --- a/iceoryx_platform/win/include/iceoryx_platform/socket.hpp +++ b/iceoryx_platform/win/include/iceoryx_platform/socket.hpp @@ -24,6 +24,7 @@ #include #define AF_LOCAL AF_INET +#define SOCK_NONBLOCK 0 using sa_family_t = int; int iox_bind(int sockfd, const struct sockaddr* addr, socklen_t addrlen);