diff --git a/CHANGELOG.md b/CHANGELOG.md index 24a5e5405b..2c0acaefc2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### NEXT * worker: Do not use references for async callbacks ([PR #1274](https://github.com/versatica/mediasoup/pull/1274)). +* liburing: Enable zero copy ([PR #1273](https://github.com/versatica/mediasoup/pull/1273)). ### 3.13.12 diff --git a/worker/include/DepLibUring.hpp b/worker/include/DepLibUring.hpp index 7d3d3848e4..973acaff42 100644 --- a/worker/include/DepLibUring.hpp +++ b/worker/include/DepLibUring.hpp @@ -80,6 +80,10 @@ class DepLibUring { return this->active; } + bool IsZeroCopyEnabled() const + { + return this->zeroCopyEnabled; + } io_uring* GetRing() { return std::addressof(this->ring); @@ -103,18 +107,22 @@ class DepLibUring private: // io_uring instance. io_uring ring; - // Event file descriptor to watch for completions. + // Event file descriptor to watch for io_uring completions. int efd; // libuv handle used to poll io_uring completions. uv_poll_t* uvHandle{ nullptr }; // Whether we are currently sending RTP over io_uring. bool active{ false }; + // Whether Zero Copy feature is enabled. + bool zeroCopyEnabled{ true }; // Pre-allocated UserData's. UserData userDatas[QueueDepth]{}; // Indexes of available UserData entries. std::queue availableUserDataEntries; // Pre-allocated SendBuffer's. SendBuffer sendBuffers[QueueDepth]; + // iovec structs to be registered for Zero Copy. + struct iovec iovecs[QueueDepth]; // Submission queue entry process count. uint64_t sqeProcessCount{ 0u }; // Submission queue entry miss count. diff --git a/worker/include/Utils.hpp b/worker/include/Utils.hpp index b11b9a18fc..120afb0b62 100644 --- a/worker/include/Utils.hpp +++ b/worker/include/Utils.hpp @@ -23,6 +23,8 @@ namespace Utils static void GetAddressInfo(const struct sockaddr* addr, int& family, std::string& ip, uint16_t& port); + static size_t GetAddressLen(const struct sockaddr* addr); + static bool CompareAddresses(const struct sockaddr* addr1, const struct sockaddr* addr2) { // Compare family. diff --git a/worker/src/DepLibUring.cpp b/worker/src/DepLibUring.cpp index a2c9226769..bfd44aba4d 100644 --- a/worker/src/DepLibUring.cpp +++ b/worker/src/DepLibUring.cpp @@ -4,6 +4,7 @@ #include "DepLibUring.hpp" #include "Logger.hpp" #include "MediaSoupErrors.hpp" +#include "Utils.hpp" #include #include @@ -30,9 +31,13 @@ inline static void onFdEvent(uv_poll_t* handle, int status, int events) // the counter in order to avoid libuv calling this callback indefinitely. eventfd_t v; int err = eventfd_read(liburing->GetEventFd(), std::addressof(v)); + if (err < 0) { - MS_ABORT("eventfd_read() failed: %s", std::strerror(-err)); + // Get positive errno. + int error = -err; + + MS_ABORT("eventfd_read() failed: %s", std::strerror(error)); }; for (unsigned int i{ 0 }; i < count; ++i) @@ -40,27 +45,69 @@ inline static void onFdEvent(uv_poll_t* handle, int status, int events) struct io_uring_cqe* cqe = cqes[i]; auto* userData = static_cast(io_uring_cqe_get_data(cqe)); - if (cqe->res < 0) + if (liburing->IsZeroCopyEnabled()) { - MS_ERROR("sending failed: %s", std::strerror(-cqe->res)); + // CQE notification for a zero-copy submission. + if (cqe->flags & IORING_CQE_F_NOTIF) + { + // The send buffer is now in the network card, run the send callback. + if (userData->cb) + { + (*userData->cb)(true); + delete userData->cb; + userData->cb = nullptr; + } + + liburing->ReleaseUserDataEntry(userData->idx); + io_uring_cqe_seen(liburing->GetRing(), cqe); + + continue; + } + // CQE for a zero-copy submission, a CQE notification will follow. + if (cqe->flags & IORING_CQE_F_MORE) + { + if (cqe->res < 0) + { + if (userData->cb) + { + (*userData->cb)(false); + delete userData->cb; + userData->cb = nullptr; + } + } + + // NOTE: Do not release the user data as it will be done upon reception + // of CQE notification. + io_uring_cqe_seen(liburing->GetRing(), cqe); + + continue; + } + } + + // Successfull SQE. + if (cqe->res >= 0) + { if (userData->cb) { - (*userData->cb)(false); + (*userData->cb)(true); delete userData->cb; + userData->cb = nullptr; } } + // Failed SQE. else { if (userData->cb) { - (*userData->cb)(true); + (*userData->cb)(false); delete userData->cb; + userData->cb = nullptr; } } - io_uring_cqe_seen(liburing->GetRing(), cqe); liburing->ReleaseUserDataEntry(userData->idx); + io_uring_cqe_seen(liburing->GetRing(), cqe); } } @@ -234,7 +281,10 @@ DepLibUring::LibUring::LibUring() if (err < 0) { - MS_THROW_ERROR("io_uring_queue_init() failed: %s", std::strerror(-err)); + // Get positive errno. + int error = -err; + + MS_THROW_ERROR("io_uring_queue_init() failed: %s", std::strerror(error)); } // Create an eventfd instance. @@ -249,7 +299,10 @@ DepLibUring::LibUring::LibUring() if (err < 0) { - MS_THROW_ERROR("io_uring_register_eventfd() failed: %s", std::strerror(-err)); + // Get positive errno. + int error = -err; + + MS_THROW_ERROR("io_uring_register_eventfd() failed: %s", std::strerror(error)); } // Initialize available UserData entries. @@ -258,6 +311,35 @@ DepLibUring::LibUring::LibUring() this->userDatas[i].store = this->sendBuffers[i]; this->availableUserDataEntries.push(i); } + + // Initialize iovecs. + for (size_t i{ 0 }; i < DepLibUring::QueueDepth; ++i) + { + this->iovecs[i].iov_base = this->sendBuffers[i]; + this->iovecs[i].iov_len = DepLibUring::SendBufferSize; + } + + err = io_uring_register_buffers(std::addressof(this->ring), this->iovecs, DepLibUring::QueueDepth); + + if (err < 0) + { + // Get positive errno. + int error = -err; + + if (error == ENOMEM) + { + this->zeroCopyEnabled = false; + + MS_WARN_TAG( + info, + "io_uring_register_buffers() failed due to low memlock limit (ulimit -l), disabling zero copy: %s", + std::strerror(error)); + } + else + { + MS_THROW_ERROR("io_uring_register_buffers() failed: %s", std::strerror(error)); + } + } } DepLibUring::LibUring::~LibUring() @@ -269,7 +351,10 @@ DepLibUring::LibUring::~LibUring() if (err != 0) { - MS_ABORT("close() failed: %s", std::strerror(-err)); + // Get positive errno. + int error = -err; + + MS_ABORT("close() failed: %s", std::strerror(error)); } // Close the ring. @@ -372,8 +457,6 @@ bool DepLibUring::LibUring::PrepareSend( return false; } - userData->cb = cb; - // The send data buffer belongs to us, no need to memcpy. if (this->IsDataInSendBuffers(data)) { @@ -384,21 +467,30 @@ bool DepLibUring::LibUring::PrepareSend( std::memcpy(userData->store, data, len); } + userData->cb = cb; + io_uring_sqe_set_data(sqe, userData); - socklen_t addrlen = 0; + socklen_t addrlen = Utils::IP::GetAddressLen(addr); - if (addr->sa_family == AF_INET) + if (this->zeroCopyEnabled) { - addrlen = sizeof(struct sockaddr_in); + auto iovec = this->iovecs[userData->idx]; + iovec.iov_len = len; + + io_uring_prep_send_zc(sqe, sockfd, iovec.iov_base, iovec.iov_len, 0, 0); + io_uring_prep_send_set_addr(sqe, addr, addrlen); + + // Tell io_uring that we are providing the already registered send buffer + // for zero copy. + sqe->ioprio |= IORING_RECVSEND_FIXED_BUF; + sqe->buf_index = userData->idx; } - else if (addr->sa_family == AF_INET6) + else { - addrlen = sizeof(struct sockaddr_in6); + io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen); } - io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen); - this->sqeProcessCount++; return true; @@ -482,7 +574,10 @@ void DepLibUring::LibUring::Submit() } else { - MS_ERROR("io_uring_submit() failed: %s", std::strerror(-err)); + // Get positive errno. + int error = -err; + + MS_ERROR("io_uring_submit() failed: %s", std::strerror(error)); } } diff --git a/worker/src/Utils/IP.cpp b/worker/src/Utils/IP.cpp index 81a5c7e093..9460fc2488 100644 --- a/worker/src/Utils/IP.cpp +++ b/worker/src/Utils/IP.cpp @@ -91,6 +91,29 @@ namespace Utils ip.assign(ipBuffer); } + size_t IP::GetAddressLen(const struct sockaddr* addr) + { + MS_TRACE(); + + switch (addr->sa_family) + { + case AF_INET: + { + return sizeof(struct sockaddr_in); + } + + case AF_INET6: + { + return sizeof(struct sockaddr_in6); + } + + default: + { + MS_ABORT("unknown network family: %d", static_cast(addr->sa_family)); + } + } + } + void IP::NormalizeIp(std::string& ip) { MS_TRACE();