Skip to content

Commit

Permalink
liburing zero copy
Browse files Browse the repository at this point in the history
By pre-registering the send buffers we can make use of zero copy in
order to avoid the kernel memcpy-ing the buffers.

Send buffers are allocated by us and will be set as available once
liburing notifies us about it.
  • Loading branch information
jmillan committed Dec 19, 2023
1 parent c2c4323 commit 1277f5c
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 16 deletions.
4 changes: 3 additions & 1 deletion worker/include/DepLibUring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class DepLibUring
private:
// io_uring instance.
io_uring ring;
// Event file descriptor to watch for completions.
// Event file descriptor to watch for io_uring completions.
int efd;
// libuv handle used to poll io_uring completions.
uv_poll_t* uvHandle{ nullptr };
Expand All @@ -115,6 +115,8 @@ class DepLibUring
std::queue<size_t> availableUserDataEntries;
// Pre-allocated SendBuffer's.
SendBuffer sendBuffers[QueueDepth];
// iovec structs to be registered for Zero Copy.
struct iovec iovecs[QueueDepth];
// Submission queue entry process count.
uint64_t sqeProcessCount{ 0u };
// Submission queue entry miss count.
Expand Down
12 changes: 12 additions & 0 deletions worker/include/Utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ namespace Utils

static void GetAddressInfo(const struct sockaddr* addr, int& family, std::string& ip, uint16_t& port);

static size_t GetAddressLen(const struct sockaddr* addr)
{
if (addr->sa_family == AF_INET)
{
return sizeof(struct sockaddr_in);
}
else
{
return sizeof(struct sockaddr_in6);
}
}

static bool CompareAddresses(const struct sockaddr* addr1, const struct sockaddr* addr2)
{
// Compare family.
Expand Down
84 changes: 69 additions & 15 deletions worker/src/DepLibUring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "DepLibUring.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#include <sys/eventfd.h>
#include <sys/utsname.h>

Expand Down Expand Up @@ -40,27 +41,66 @@ inline static void onFdEvent(uv_poll_t* handle, int status, int events)
struct io_uring_cqe* cqe = cqes[i];
auto* userData = static_cast<DepLibUring::UserData*>(io_uring_cqe_get_data(cqe));

if (cqe->res < 0)
// CQE notification for a zero-copy submission.
if (cqe->flags & IORING_CQE_F_NOTIF)
{
MS_ERROR("sending failed: %s", std::strerror(-cqe->res));
// The send buffer is now in the network card, run the send callback.
if (userData->cb)
{
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}

liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
}

// CQE for a zero-copy submission, a CQE notification will follow.
if (cqe->flags & IORING_CQE_F_MORE)
{
if (cqe->res < 0)
{
if (userData->cb)
{
(*userData->cb)(false);
delete userData->cb;
userData->cb = nullptr;
}
}

// NOTE: Do not release the user data as it will be done upon reception
// of CQE notification.
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
}

// Failed SQE.
if (cqe->res < 0)
{
if (userData->cb)
{
(*userData->cb)(false);
delete userData->cb;
userData->cb = nullptr;
}
}
// Successfull SQE.
else
{
if (userData->cb)
{
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}
}

io_uring_cqe_seen(liburing->GetRing(), cqe);
liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);
}
}

Expand Down Expand Up @@ -258,6 +298,20 @@ DepLibUring::LibUring::LibUring()
this->userDatas[i].store = this->sendBuffers[i];
this->availableUserDataEntries.push(i);
}

// Initialize iovecs.
for (size_t i{ 0 }; i < DepLibUring::QueueDepth; ++i)
{
this->iovecs[i].iov_base = this->sendBuffers[i];
this->iovecs[i].iov_len = DepLibUring::SendBufferSize;
}

err = io_uring_register_buffers(std::addressof(this->ring), this->iovecs, DepLibUring::QueueDepth);

if (err < 0)
{
MS_THROW_ERROR("io_uring_register_buffers() failed: %s", std::strerror(-err));
}
}

DepLibUring::LibUring::~LibUring()
Expand Down Expand Up @@ -372,8 +426,6 @@ bool DepLibUring::LibUring::PrepareSend(
return false;
}

userData->cb = cb;

// The send data buffer belongs to us, no need to memcpy.
if (this->IsDataInSendBuffers(data))
{
Expand All @@ -384,20 +436,22 @@ bool DepLibUring::LibUring::PrepareSend(
std::memcpy(userData->store, data, len);
}

userData->cb = cb;

io_uring_sqe_set_data(sqe, userData);

socklen_t addrlen = 0;
socklen_t addrlen = Utils::IP::GetAddressLen(addr);

if (addr->sa_family == AF_INET)
{
addrlen = sizeof(struct sockaddr_in);
}
else if (addr->sa_family == AF_INET6)
{
addrlen = sizeof(struct sockaddr_in6);
}
auto iovec = this->iovecs[userData->idx];
iovec.iov_len = len;

io_uring_prep_send_zc(sqe, sockfd, iovec.iov_base, iovec.iov_len, 0, 0);
io_uring_prep_send_set_addr(sqe, addr, addrlen);

io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen);
// Tell io_uring that we are providing the already registered send buffer
// for zero copy.
sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
sqe->buf_index = userData->idx;

this->sqeProcessCount++;

Expand Down

0 comments on commit 1277f5c

Please sign in to comment.