Skip to content

Commit

Permalink
Merge pull request #1939 from ApexAI/iox-1938-fix-unfair-iceperf
Browse files Browse the repository at this point in the history
iox-#1938 fix unfair iceperf
  • Loading branch information
elfenpiff authored Mar 15, 2023
2 parents dc9e1a8 + c8ccd0e commit f42c003
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 41 deletions.
2 changes: 1 addition & 1 deletion doc/design/chunk_header.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ auto pub = iox::popo::Publisher<MyPayload>(serviceDescription, userHeaderHook);
- part of the specified chunk-payload might also be used as padding for the user-payload alignment
- the user will continue to specify the chunk-payload; if a user-header or custom user-payload alignment is used, the user needs to take this into account
- is it necessary to store a flag in `ChunkHeader` if a user extension is used?
- we could maintain a list of known user-header IDs or ranges of IDs similar to `IANA` https://tools.ietf.org/id/draft-cotton-tsvwg-iana-ports-00.html#privateports
- we could maintain a list of known user-header IDs or ranges of IDs similar to `IANA` https://datatracker.ietf.org/doc/id/draft-cotton-tsvwg-iana-ports-00.html#privateports
- the IDs could be stored in the `ChunkHeader` and everything with an ID larger than `0xC000` is free to use
- to make this somewhat save, the `ChunkHeaderHook` must be mandatory with e.g. a `virtual uint16_t getId() = 0;` method which will be called in the `ChunkSender`
- alternatively, the user-header struct must have a `constexpr uint16_t USER_HEADER_ID`; if it's not present, we could set the ID to `0xC000` which is the first ID free to use
Expand Down
44 changes: 28 additions & 16 deletions iceoryx_examples/iceperf/mq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ PerfTopic MQ::receivePerfTopic() noexcept

void MQ::open(const std::string& name, const iox::posix::IpcChannelSide channelSide) noexcept
{
int32_t openFlags = O_RDWR;
int32_t openFlags = O_RDWR | O_NONBLOCK;
if (channelSide == iox::posix::IpcChannelSide::SERVER)
{
openFlags |= O_CREAT;
Expand Down Expand Up @@ -208,23 +208,35 @@ void MQ::open(const std::string& name, const iox::posix::IpcChannelSide channelS

void MQ::send(const char* buffer, uint32_t length) noexcept
{
iox::posix::posixCall(mq_send)(m_mqDescriptorPublisher, buffer, length, 1U)
.failureReturnValue(ERROR_CODE)
.evaluate()
.or_else([&](auto& r) {
std::cout << std::endl
<< "send error for " << m_publisherMqName << ", " << r.getHumanReadableErrnum() << std::endl;
exit(1);
});
while (iox::posix::posixCall(mq_send)(m_mqDescriptorPublisher, buffer, length, 1U)
.failureReturnValue(ERROR_CODE)
.ignoreErrnos(EAGAIN)
.evaluate()
.or_else([&](auto& r) {
std::cout << std::endl
<< "send error for " << m_publisherMqName << ", " << r.getHumanReadableErrnum()
<< std::endl;
exit(1);
})
->errnum
== EAGAIN)
{
}
}

void MQ::receive(char* buffer) noexcept
{
iox::posix::posixCall(mq_receive)(m_mqDescriptorSubscriber, buffer, MAX_MESSAGE_SIZE, nullptr)
.failureReturnValue(ERROR_CODE)
.evaluate()
.or_else([&](auto& r) {
std::cout << "receive error for " << m_subscriberMqName << ", " << r.getHumanReadableErrnum() << std::endl;
exit(1);
});
while (iox::posix::posixCall(mq_receive)(m_mqDescriptorSubscriber, buffer, MAX_MESSAGE_SIZE, nullptr)
.failureReturnValue(ERROR_CODE)
.ignoreErrnos(EAGAIN)
.evaluate()
.or_else([&](auto& r) {
std::cout << "receive error for " << m_subscriberMqName << ", " << r.getHumanReadableErrnum()
<< std::endl;
exit(1);
})
->errnum
== EAGAIN)
{
}
}
56 changes: 32 additions & 24 deletions iceoryx_examples/iceperf/uds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void UDS::initFollower() noexcept
void UDS::init() noexcept
{
// init subscriber
iox::posix::posixCall(iox_socket)(AF_LOCAL, SOCK_DGRAM, 0)
iox::posix::posixCall(iox_socket)(AF_LOCAL, SOCK_DGRAM | SOCK_NONBLOCK, 0)
.failureReturnValue(ERROR_CODE)
.evaluate()
.and_then([this](auto& r) { m_sockfdSubscriber = r.value; })
Expand All @@ -112,7 +112,7 @@ void UDS::init() noexcept
});

// init publisher
iox::posix::posixCall(iox_socket)(AF_LOCAL, SOCK_DGRAM, 0)
iox::posix::posixCall(iox_socket)(AF_LOCAL, SOCK_DGRAM | SOCK_NONBLOCK, 0)
.failureReturnValue(ERROR_CODE)
.evaluate()
.and_then([this](auto& r) { m_sockfdPublisher = r.value; })
Expand All @@ -135,15 +135,15 @@ void UDS::waitForLeader() noexcept
reinterpret_cast<struct sockaddr*>(&m_sockAddrPublisher),
sizeof(m_sockAddrPublisher))
.failureReturnValue(ERROR_CODE)
.ignoreErrnos(ENOENT)
.ignoreErrnos(ENOENT, ECONNREFUSED)
.evaluate()
.or_else([](auto& r) {
std::cout << "send error " << r.getHumanReadableErrnum() << std::endl;
exit(1);
})
.value();

if (sendCall.errnum == ENOENT)
if (sendCall.errnum == ENOENT || sendCall.errnum == ECONNREFUSED)
{
constexpr std::chrono::milliseconds RETRY_INTERVAL{10};
std::this_thread::sleep_for(RETRY_INTERVAL);
Expand Down Expand Up @@ -239,32 +239,40 @@ void UDS::send(const char* buffer, uint32_t length) noexcept
// only return from this loop when the message could be send successfully
// if the OS socket message buffer if full, retry until it is free'd by
// the OS and the message could be send
while (iox::posix::posixCall(iox_sendto)(m_sockfdPublisher,
buffer,
length,
0,
reinterpret_cast<struct sockaddr*>(&m_sockAddrPublisher),
sizeof(m_sockAddrPublisher))
while (true)
{
auto result = iox::posix::posixCall(iox_sendto)(m_sockfdPublisher,
buffer,
length,
0,
reinterpret_cast<struct sockaddr*>(&m_sockAddrPublisher),
sizeof(m_sockAddrPublisher))
.failureReturnValue(ERROR_CODE)
.ignoreErrnos(ENOBUFS, EAGAIN)
.evaluate()
.or_else([](auto& r) {
std::cout << std::endl << "send error " << r.getHumanReadableErrnum() << std::endl;
exit(1);
});
if (result->errnum != ENOBUFS && result->errnum != EAGAIN)
{
break;
}
}
}

void UDS::receive(char* buffer) noexcept
{
while (iox::posix::posixCall(iox_recvfrom)(m_sockfdSubscriber, buffer, MAX_MESSAGE_SIZE, 0, nullptr, nullptr)
.failureReturnValue(ERROR_CODE)
.ignoreErrnos(ENOBUFS)
.ignoreErrnos(EAGAIN)
.evaluate()
.or_else([](auto& r) {
std::cout << std::endl << "send error " << r.getHumanReadableErrnum() << std::endl;
std::cout << "receive error " << r.getHumanReadableErrnum() << std::endl;
exit(1);
})
->errnum
== ENOBUFS)
== EAGAIN)
{
}
}

void UDS::receive(char* buffer) noexcept
{
iox::posix::posixCall(iox_recvfrom)(m_sockfdSubscriber, buffer, MAX_MESSAGE_SIZE, 0, nullptr, nullptr)
.failureReturnValue(ERROR_CODE)
.evaluate()
.or_else([](auto& r) {
std::cout << "receive error " << r.getHumanReadableErrnum() << std::endl;
exit(1);
});
}
2 changes: 2 additions & 0 deletions iceoryx_platform/mac/include/iceoryx_platform/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <cstdint>
#include <sys/socket.h>

#define SOCK_NONBLOCK 0

int iox_bind(int sockfd, const struct sockaddr* addr, socklen_t addrlen);
int iox_socket(int domain, int type, int protocol);
int iox_setsockopt(int sockfd, int level, int optname, const void* optval, socklen_t optlen);
Expand Down
1 change: 1 addition & 0 deletions iceoryx_platform/win/include/iceoryx_platform/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cstdint>

#define AF_LOCAL AF_INET
#define SOCK_NONBLOCK 0
using sa_family_t = int;

int iox_bind(int sockfd, const struct sockaddr* addr, socklen_t addrlen);
Expand Down

0 comments on commit f42c003

Please sign in to comment.