Skip to content

Commit

Permalink
liburing: avoid extra memcpy on RTP
Browse files Browse the repository at this point in the history
Use an already allocated send buffer to directly encrypt the RTP rather
than using the static EncryptBuffer that needs to be memcpy-ed before
sending.

If there are no send buffers available, the EncryptBuffer will be used.
  • Loading branch information
jmillan committed Dec 11, 2023
1 parent 858b27a commit dc1689e
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 21 deletions.
37 changes: 30 additions & 7 deletions worker/include/DepLibUring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,35 @@ class DepLibUring
/* Struct for the user data field of SQE and CQE. */
struct UserData
{
uint8_t store[1500]{};
// Pointer to send buffer.
uint8_t* store{ nullptr };
// Frame len buffer for TCP.
uint8_t frameLen[2] = { 0 };
// iovec for TCP, first item for framing, second item for payload.
struct iovec iov[2];
// Send callback.
onSendCallback* cb{ nullptr };
// Index in userDatas array.
size_t idx{ 0 };
};

/* Number of submission queue entries (SQE). */
static constexpr size_t QueueDepth{ 1024 * 4 };
static constexpr size_t SendBufferSize{ 1500 };

using SendBuffer = uint8_t[SendBufferSize];

static bool IsRuntimeSupported();
static void ClassInit();
static void ClassDestroy();
static flatbuffers::Offset<FBS::LibUring::Dump> FillBuffer(flatbuffers::FlatBufferBuilder& builder);
static void StartPollingCQEs();
static void StopPollingCQEs();
static uint8_t* GetSendBuffer();
static bool PrepareSend(
int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb);
int sockfd, const uint8_t* data, size_t len, const struct sockaddr* addr, onSendCallback* cb);
static bool PrepareWrite(
int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb);
int sockfd, const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2, onSendCallback* cb);
static void Submit();
static void SetActive();
static bool IsActive();
Expand All @@ -50,10 +61,16 @@ class DepLibUring
flatbuffers::Offset<FBS::LibUring::Dump> FillBuffer(flatbuffers::FlatBufferBuilder& builder) const;
void StartPollingCQEs();
void StopPollingCQEs();
uint8_t* GetSendBuffer();
bool PrepareSend(
int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb);
int sockfd, const uint8_t* data, size_t len, const struct sockaddr* addr, onSendCallback* cb);
bool PrepareWrite(
int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb);
int sockfd,
const uint8_t* data1,
size_t len1,
const uint8_t* data2,
size_t len2,
onSendCallback* cb);
void Submit();
void SetActive()
{
Expand All @@ -78,6 +95,10 @@ class DepLibUring

private:
UserData* GetUserData();
bool IsDataInSendBuffers(const uint8_t* data) const
{
return data >= this->sendBuffers[0] && data <= this->sendBuffers[DepLibUring::QueueDepth - 1];
}

private:
// io_uring instance.
Expand All @@ -88,10 +109,12 @@ class DepLibUring
uv_poll_t* uvHandle{ nullptr };
// Whether we are currently sending RTP over io_uring.
bool active{ false };
// Pre-allocated UserData entries.
UserData userDataBuffer[QueueDepth]{};
// Pre-allocated UserData's.
UserData userDatas[QueueDepth]{};
// Indexes of available UserData entries.
std::queue<size_t> availableUserDataEntries;
// Pre-allocated SendBuffer's.
SendBuffer sendBuffers[QueueDepth];
// Submission queue entry process count.
uint64_t sqeProcessCount{ 0u };
// Submission queue entry miss count.
Expand Down
79 changes: 68 additions & 11 deletions worker/src/DepLibUring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,17 @@ void DepLibUring::StopPollingCQEs()
DepLibUring::liburing->StopPollingCQEs();
}

uint8_t* DepLibUring::GetSendBuffer()
{
MS_TRACE();

MS_ASSERT(DepLibUring::liburing, "DepLibUring::liburing is not set");

return DepLibUring::liburing->GetSendBuffer();
}

bool DepLibUring::PrepareSend(
int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb)
int sockfd, const uint8_t* data, size_t len, const struct sockaddr* addr, onSendCallback* cb)
{
MS_TRACE();

Expand All @@ -162,7 +171,7 @@ bool DepLibUring::PrepareSend(
}

bool DepLibUring::PrepareWrite(
int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb)
int sockfd, const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2, onSendCallback* cb)
{
MS_TRACE();

Expand Down Expand Up @@ -246,6 +255,7 @@ DepLibUring::LibUring::LibUring()
// Initialize available UserData entries.
for (size_t i{ 0 }; i < DepLibUring::QueueDepth; ++i)
{
this->userDatas[i].store = this->sendBuffers[i];
this->availableUserDataEntries.push(i);
}
}
Expand Down Expand Up @@ -319,8 +329,24 @@ void DepLibUring::LibUring::StopPollingCQEs()
uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onCloseFd));
}

uint8_t* DepLibUring::LibUring::GetSendBuffer()
{
MS_TRACE();

if (this->availableUserDataEntries.empty())
{
MS_WARN_DEV("no user data entry available");

return nullptr;
}

auto idx = this->availableUserDataEntries.front();

return this->userDatas[idx].store;
}

bool DepLibUring::LibUring::PrepareSend(
int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb)
int sockfd, const uint8_t* data, size_t len, const struct sockaddr* addr, onSendCallback* cb)
{
MS_TRACE();

Expand All @@ -346,9 +372,18 @@ bool DepLibUring::LibUring::PrepareSend(
return false;
}

std::memcpy(userData->store, data, len);
userData->cb = cb;

// The send data buffer belongs to us, no need to memcpy.
if (this->IsDataInSendBuffers(data))
{
MS_ASSERT(data == userData->store, "send buffer does not match userData store");
}
else
{
std::memcpy(userData->store, data, len);
}

io_uring_sqe_set_data(sqe, userData);

socklen_t addrlen = 0;
Expand All @@ -370,7 +405,7 @@ bool DepLibUring::LibUring::PrepareSend(
}

bool DepLibUring::LibUring::PrepareWrite(
int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb)
int sockfd, const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2, onSendCallback* cb)
{
MS_TRACE();

Expand All @@ -396,12 +431,36 @@ bool DepLibUring::LibUring::PrepareWrite(
return false;
}

std::memcpy(userData->store, data1, len1);
std::memcpy(userData->store + len1, data2, len2);
// The send data buffer belongs to us, no need to memcpy.
// NOTE: data1 contains the TCP framing buffer and data2 the actual payload.
if (this->IsDataInSendBuffers(data2))
{
MS_ASSERT(data2 == userData->store, "send buffer does not match userData store");

// Always memcpy the frame len as it resides in the stack memory.
std::memcpy(userData->frameLen, data1, len1);

userData->iov[0].iov_base = userData->frameLen;
userData->iov[0].iov_len = len1;
userData->iov[1].iov_base = userData->store;
userData->iov[1].iov_len = len2;
}
else
{
std::memcpy(userData->store, data1, len1);
std::memcpy(userData->store + len1, data2, len2);

userData->iov[0].iov_base = userData->store;
userData->iov[0].iov_len = len1;
userData->iov[1].iov_base = userData->store + len1;
userData->iov[1].iov_len = len2;
}

userData->cb = cb;

io_uring_sqe_set_data(sqe, userData);
io_uring_prep_write(sqe, sockfd, userData->store, len1 + len2, 0);

io_uring_prep_writev(sqe, sockfd, userData->iov, 2, 0);

this->sqeProcessCount++;

Expand Down Expand Up @@ -433,16 +492,14 @@ DepLibUring::UserData* DepLibUring::LibUring::GetUserData()

if (this->availableUserDataEntries.empty())
{
MS_WARN_DEV("no user data entry available");

return nullptr;
}

auto idx = this->availableUserDataEntries.front();

this->availableUserDataEntries.pop();

auto* userData = std::addressof(this->userDataBuffer[idx]);
auto* userData = std::addressof(this->userDatas[idx]);
userData->idx = idx;

return userData;
Expand Down
30 changes: 27 additions & 3 deletions worker/src/RTC/SrtpSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

#include "RTC/SrtpSession.hpp"
#include "DepLibSRTP.hpp"
#ifdef MS_LIBURING_SUPPORTED
#include "DepLibUring.hpp"
#endif
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include <cstring> // std::memset(), std::memcpy()
Expand Down Expand Up @@ -198,9 +201,30 @@ namespace RTC
return false;
}

std::memcpy(EncryptBuffer, *data, *len);
uint8_t* encryptBuffer = EncryptBuffer;

const srtp_err_status_t err = srtp_protect(this->session, static_cast<void*>(EncryptBuffer), len);
#ifdef MS_LIBURING_SUPPORTED
{
if (!DepLibUring::IsActive())
{
goto protect;
}

// Use a preallocated buffer, if available.
auto* sendBuffer = DepLibUring::GetSendBuffer();

if (sendBuffer)
{
encryptBuffer = sendBuffer;
}
}

protect:
#endif

std::memcpy(encryptBuffer, *data, *len);

const srtp_err_status_t err = srtp_protect(this->session, static_cast<void*>(encryptBuffer), len);

if (DepLibSRTP::IsError(err))
{
Expand All @@ -210,7 +234,7 @@ namespace RTC
}

// Update the given data pointer.
*data = (const uint8_t*)EncryptBuffer;
*data = const_cast<const uint8_t*>(encryptBuffer);

return true;
}
Expand Down

0 comments on commit dc1689e

Please sign in to comment.