diff --git a/worker/include/DepLibUring.hpp b/worker/include/DepLibUring.hpp index 5c2481736c..7d3d3848e4 100644 --- a/worker/include/DepLibUring.hpp +++ b/worker/include/DepLibUring.hpp @@ -15,13 +15,23 @@ class DepLibUring /* Struct for the user data field of SQE and CQE. */ struct UserData { - uint8_t store[1500]{}; + // Pointer to send buffer. + uint8_t* store{ nullptr }; + // Frame len buffer for TCP. + uint8_t frameLen[2] = { 0 }; + // iovec for TCP, first item for framing, second item for payload. + struct iovec iov[2]; + // Send callback. onSendCallback* cb{ nullptr }; + // Index in userDatas array. size_t idx{ 0 }; }; /* Number of submission queue entries (SQE). */ static constexpr size_t QueueDepth{ 1024 * 4 }; + static constexpr size_t SendBufferSize{ 1500 }; + + using SendBuffer = uint8_t[SendBufferSize]; static bool IsRuntimeSupported(); static void ClassInit(); @@ -29,10 +39,11 @@ class DepLibUring static flatbuffers::Offset FillBuffer(flatbuffers::FlatBufferBuilder& builder); static void StartPollingCQEs(); static void StopPollingCQEs(); + static uint8_t* GetSendBuffer(); static bool PrepareSend( - int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb); + int sockfd, const uint8_t* data, size_t len, const struct sockaddr* addr, onSendCallback* cb); static bool PrepareWrite( - int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb); + int sockfd, const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2, onSendCallback* cb); static void Submit(); static void SetActive(); static bool IsActive(); @@ -50,10 +61,16 @@ class DepLibUring flatbuffers::Offset FillBuffer(flatbuffers::FlatBufferBuilder& builder) const; void StartPollingCQEs(); void StopPollingCQEs(); + uint8_t* GetSendBuffer(); bool PrepareSend( - int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb); + int sockfd, const uint8_t* data, size_t len, const struct sockaddr* addr, onSendCallback* cb); bool PrepareWrite( - int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb); + int sockfd, + const uint8_t* data1, + size_t len1, + const uint8_t* data2, + size_t len2, + onSendCallback* cb); void Submit(); void SetActive() { @@ -78,6 +95,10 @@ class DepLibUring private: UserData* GetUserData(); + bool IsDataInSendBuffers(const uint8_t* data) const + { + return data >= this->sendBuffers[0] && data <= this->sendBuffers[DepLibUring::QueueDepth - 1]; + } private: // io_uring instance. @@ -88,10 +109,12 @@ class DepLibUring uv_poll_t* uvHandle{ nullptr }; // Whether we are currently sending RTP over io_uring. bool active{ false }; - // Pre-allocated UserData entries. - UserData userDataBuffer[QueueDepth]{}; + // Pre-allocated UserData's. + UserData userDatas[QueueDepth]{}; // Indexes of available UserData entries. std::queue availableUserDataEntries; + // Pre-allocated SendBuffer's. + SendBuffer sendBuffers[QueueDepth]; // Submission queue entry process count. uint64_t sqeProcessCount{ 0u }; // Submission queue entry miss count. diff --git a/worker/src/DepLibUring.cpp b/worker/src/DepLibUring.cpp index 627da3ecf0..4ce469bc17 100644 --- a/worker/src/DepLibUring.cpp +++ b/worker/src/DepLibUring.cpp @@ -151,8 +151,17 @@ void DepLibUring::StopPollingCQEs() DepLibUring::liburing->StopPollingCQEs(); } +uint8_t* DepLibUring::GetSendBuffer() +{ + MS_TRACE(); + + MS_ASSERT(DepLibUring::liburing, "DepLibUring::liburing is not set"); + + return DepLibUring::liburing->GetSendBuffer(); +} + bool DepLibUring::PrepareSend( - int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb) + int sockfd, const uint8_t* data, size_t len, const struct sockaddr* addr, onSendCallback* cb) { MS_TRACE(); @@ -162,7 +171,7 @@ bool DepLibUring::PrepareSend( } bool DepLibUring::PrepareWrite( - int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb) + int sockfd, const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2, onSendCallback* cb) { MS_TRACE(); @@ -246,6 +255,7 @@ DepLibUring::LibUring::LibUring() // Initialize available UserData entries. for (size_t i{ 0 }; i < DepLibUring::QueueDepth; ++i) { + this->userDatas[i].store = this->sendBuffers[i]; this->availableUserDataEntries.push(i); } } @@ -319,8 +329,24 @@ void DepLibUring::LibUring::StopPollingCQEs() uv_close(reinterpret_cast(this->uvHandle), static_cast(onCloseFd)); } +uint8_t* DepLibUring::LibUring::GetSendBuffer() +{ + MS_TRACE(); + + if (this->availableUserDataEntries.empty()) + { + MS_WARN_DEV("no user data entry available"); + + return nullptr; + } + + auto idx = this->availableUserDataEntries.front(); + + return this->userDatas[idx].store; +} + bool DepLibUring::LibUring::PrepareSend( - int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb) + int sockfd, const uint8_t* data, size_t len, const struct sockaddr* addr, onSendCallback* cb) { MS_TRACE(); @@ -346,9 +372,18 @@ bool DepLibUring::LibUring::PrepareSend( return false; } - std::memcpy(userData->store, data, len); userData->cb = cb; + // The send data buffer belongs to us, no need to memcpy. + if (this->IsDataInSendBuffers(data)) + { + MS_ASSERT(data == userData->store, "send buffer does not match userData store"); + } + else + { + std::memcpy(userData->store, data, len); + } + io_uring_sqe_set_data(sqe, userData); socklen_t addrlen = 0; @@ -370,7 +405,7 @@ bool DepLibUring::LibUring::PrepareSend( } bool DepLibUring::LibUring::PrepareWrite( - int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb) + int sockfd, const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2, onSendCallback* cb) { MS_TRACE(); @@ -396,12 +431,36 @@ bool DepLibUring::LibUring::PrepareWrite( return false; } - std::memcpy(userData->store, data1, len1); - std::memcpy(userData->store + len1, data2, len2); + // The send data buffer belongs to us, no need to memcpy. + // NOTE: data1 contains the TCP framing buffer and data2 the actual payload. + if (this->IsDataInSendBuffers(data2)) + { + MS_ASSERT(data2 == userData->store, "send buffer does not match userData store"); + + // Always memcpy the frame len as it resides in the stack memory. + std::memcpy(userData->frameLen, data1, len1); + + userData->iov[0].iov_base = userData->frameLen; + userData->iov[0].iov_len = len1; + userData->iov[1].iov_base = userData->store; + userData->iov[1].iov_len = len2; + } + else + { + std::memcpy(userData->store, data1, len1); + std::memcpy(userData->store + len1, data2, len2); + + userData->iov[0].iov_base = userData->store; + userData->iov[0].iov_len = len1; + userData->iov[1].iov_base = userData->store + len1; + userData->iov[1].iov_len = len2; + } + userData->cb = cb; io_uring_sqe_set_data(sqe, userData); - io_uring_prep_write(sqe, sockfd, userData->store, len1 + len2, 0); + + io_uring_prep_writev(sqe, sockfd, userData->iov, 2, 0); this->sqeProcessCount++; @@ -433,8 +492,6 @@ DepLibUring::UserData* DepLibUring::LibUring::GetUserData() if (this->availableUserDataEntries.empty()) { - MS_WARN_DEV("no user data entry available"); - return nullptr; } @@ -442,7 +499,7 @@ DepLibUring::UserData* DepLibUring::LibUring::GetUserData() this->availableUserDataEntries.pop(); - auto* userData = std::addressof(this->userDataBuffer[idx]); + auto* userData = std::addressof(this->userDatas[idx]); userData->idx = idx; return userData; diff --git a/worker/src/RTC/SrtpSession.cpp b/worker/src/RTC/SrtpSession.cpp index 0a136d4e31..5c2c810e0f 100644 --- a/worker/src/RTC/SrtpSession.cpp +++ b/worker/src/RTC/SrtpSession.cpp @@ -3,6 +3,9 @@ #include "RTC/SrtpSession.hpp" #include "DepLibSRTP.hpp" +#ifdef MS_LIBURING_SUPPORTED +#include "DepLibUring.hpp" +#endif #include "Logger.hpp" #include "MediaSoupErrors.hpp" #include // std::memset(), std::memcpy() @@ -198,9 +201,30 @@ namespace RTC return false; } - std::memcpy(EncryptBuffer, *data, *len); + uint8_t* encryptBuffer = EncryptBuffer; - const srtp_err_status_t err = srtp_protect(this->session, static_cast(EncryptBuffer), len); +#ifdef MS_LIBURING_SUPPORTED + { + if (!DepLibUring::IsActive()) + { + goto protect; + } + + // Use a preallocated buffer, if available. + auto* sendBuffer = DepLibUring::GetSendBuffer(); + + if (sendBuffer) + { + encryptBuffer = sendBuffer; + } + } + + protect: +#endif + + std::memcpy(encryptBuffer, *data, *len); + + const srtp_err_status_t err = srtp_protect(this->session, static_cast(encryptBuffer), len); if (DepLibSRTP::IsError(err)) { @@ -210,7 +234,7 @@ namespace RTC } // Update the given data pointer. - *data = (const uint8_t*)EncryptBuffer; + *data = const_cast(encryptBuffer); return true; }