Skip to content

Commit

Permalink
DepLibUring improvements (#1440)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibc authored Aug 9, 2024
1 parent 28a1483 commit d5facb5
Show file tree
Hide file tree
Showing 14 changed files with 175 additions and 109 deletions.
1 change: 1 addition & 0 deletions .github/workflows/mediasoup-rust.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jobs:
ci:
- os: ubuntu-20.04
- os: ubuntu-22.04
- os: ubuntu-24.04
- os: macos-12
- os: macos-14
- os: windows-2022
Expand Down
9 changes: 5 additions & 4 deletions .github/workflows/mediasoup-worker-fuzzer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ jobs:
strategy:
matrix:
build:
- os: ubuntu-22.04
- os: ubuntu-24.04
cc: clang
cxx: clang++
pip-break-system-packages: true
build-type:
- Release
- Debug
Expand All @@ -34,12 +35,12 @@ jobs:
uses: actions/checkout@v4

# We need to install pip invoke manually.
- if: runner.os != 'macOS'
- if: ${{ !matrix.build.pip-break-system-packages }}
name: pip3 install invoke
run: pip3 install invoke

# In macOS we need to specify this option.
- if: runner.os == 'macOS'
# In modern OSs we need to run pip with this option.
- if: ${{ matrix.build.pip-break-system-packages }}
name: pip3 install --break-system-packages invoke
run: pip3 install --break-system-packages invoke

Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/mediasoup-worker-prebuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ jobs:
cc: gcc
cxx: g++
# Worker prebuild for Linux with kernel version 6 Ubuntu (22.04).
# Let's not use Ubutu 24.04 to avoid same potential problem as described
# above.
- os: ubuntu-22.04
cc: gcc
cxx: g++
Expand Down
16 changes: 13 additions & 3 deletions .github/workflows/mediasoup-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,22 @@ jobs:
- os: ubuntu-22.04
cc: clang
cxx: clang++
- os: ubuntu-24.04
cc: gcc
cxx: g++
pip-break-system-packages: true
- os: ubuntu-24.04
cc: clang
cxx: clang++
pip-break-system-packages: true
- os: macos-12
cc: gcc
cxx: g++
pip-break-system-packages: true
- os: macos-14
cc: clang
cxx: clang++
pip-break-system-packages: true
- os: windows-2022
cc: cl
cxx: cl
Expand Down Expand Up @@ -75,12 +85,12 @@ jobs:
${{ matrix.build.os }}-node-${{matrix.build.cc}}-
# We need to install pip invoke manually.
- if: runner.os != 'macOS'
- if: ${{ !matrix.build.pip-break-system-packages }}
name: pip3 install invoke
run: pip3 install invoke

# In macOS we need to specify this option.
- if: runner.os == 'macOS'
# In modern OSs we need to run pip with this option.
- if: ${{ matrix.build.pip-break-system-packages }}
name: pip3 install --break-system-packages invoke
run: pip3 install --break-system-packages invoke

Expand Down
10 changes: 9 additions & 1 deletion worker/include/DepLibUring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ class DepLibUring

using SendBuffer = uint8_t[SendBufferSize];

static bool IsRuntimeSupported();
static void ClassInit();
static void ClassDestroy();
static void CheckRuntimeSupport();
static bool IsEnabled();
static flatbuffers::Offset<FBS::LibUring::Dump> FillBuffer(flatbuffers::FlatBufferBuilder& builder);
static void StartPollingCQEs();
static void StopPollingCQEs();
Expand All @@ -50,9 +51,12 @@ class DepLibUring

class LibUring;

// Whether liburing is enabled or not after runtime checks.
static bool enabled;
thread_local static LibUring* liburing;

public:
// Singleton.
class LibUring
{
public:
Expand Down Expand Up @@ -98,6 +102,10 @@ class DepLibUring
}

private:
void SetInactive()
{
this->active = false;
}
UserData* GetUserData();
bool IsDataInSendBuffers(const uint8_t* data) const
{
Expand Down
98 changes: 44 additions & 54 deletions worker/src/DepLibUring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <sys/utsname.h>

/* Static variables. */

bool DepLibUring::enabled{ false };
/* liburing instance per thread. */
thread_local DepLibUring::LibUring* DepLibUring::liburing{ nullptr };
/* Completion queue entry array used to retrieve processes tasks. */
Expand Down Expand Up @@ -114,7 +114,36 @@ inline static void onFdEvent(uv_poll_t* handle, int status, int events)

/* Static class methods */

bool DepLibUring::IsRuntimeSupported()
void DepLibUring::ClassInit()
{
const auto mayor = io_uring_major_version();
const auto minor = io_uring_minor_version();

MS_DEBUG_TAG(info, "liburing version: \"%i.%i\"", mayor, minor);

// This must be called first.
DepLibUring::CheckRuntimeSupport();

if (DepLibUring::IsEnabled())
{
DepLibUring::liburing = new LibUring();

MS_DEBUG_TAG(info, "liburing enabled");
}
else
{
MS_DEBUG_TAG(info, "liburing not enabled");
}
}

void DepLibUring::ClassDestroy()
{
MS_TRACE();

delete DepLibUring::liburing;
}

void DepLibUring::CheckRuntimeSupport()
{
// clang-format off
struct utsname buffer{};
Expand All @@ -134,43 +163,19 @@ bool DepLibUring::IsRuntimeSupported()

// liburing `sento` capabilities are supported for kernel versions greather
// than or equal to 6.
return kernelMayorLong >= 6;
}

void DepLibUring::ClassInit()
{
const auto mayor = io_uring_major_version();
const auto minor = io_uring_minor_version();

MS_DEBUG_TAG(info, "liburing version: \"%i.%i\"", mayor, minor);

if (DepLibUring::IsRuntimeSupported())
{
DepLibUring::liburing = new LibUring();

MS_DEBUG_TAG(info, "liburing supported, enabled");
}
else
{
MS_DEBUG_TAG(info, "liburing not supported, not enabled");
}
DepLibUring::enabled = kernelMayorLong >= 6;
}

void DepLibUring::ClassDestroy()
bool DepLibUring::IsEnabled()
{
MS_TRACE();

delete DepLibUring::liburing;
return DepLibUring::enabled;
}

flatbuffers::Offset<FBS::LibUring::Dump> DepLibUring::FillBuffer(flatbuffers::FlatBufferBuilder& builder)
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return 0;
}
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

return DepLibUring::liburing->FillBuffer(builder);
}
Expand All @@ -179,10 +184,7 @@ void DepLibUring::StartPollingCQEs()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return;
}
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

DepLibUring::liburing->StartPollingCQEs();
}
Expand All @@ -191,10 +193,7 @@ void DepLibUring::StopPollingCQEs()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return;
}
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

DepLibUring::liburing->StopPollingCQEs();
}
Expand All @@ -203,7 +202,7 @@ uint8_t* DepLibUring::GetSendBuffer()
{
MS_TRACE();

MS_ASSERT(DepLibUring::liburing, "DepLibUring::liburing is not set");
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

return DepLibUring::liburing->GetSendBuffer();
}
Expand All @@ -213,7 +212,7 @@ bool DepLibUring::PrepareSend(
{
MS_TRACE();

MS_ASSERT(DepLibUring::liburing, "DepLibUring::liburing is not set");
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

return DepLibUring::liburing->PrepareSend(sockfd, data, len, addr, cb);
}
Expand All @@ -223,7 +222,7 @@ bool DepLibUring::PrepareWrite(
{
MS_TRACE();

MS_ASSERT(DepLibUring::liburing, "DepLibUring::liburing is not set");
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

return DepLibUring::liburing->PrepareWrite(sockfd, data1, len1, data2, len2, cb);
}
Expand All @@ -232,10 +231,7 @@ void DepLibUring::Submit()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return;
}
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

DepLibUring::liburing->Submit();
}
Expand All @@ -244,10 +240,7 @@ void DepLibUring::SetActive()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return;
}
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

DepLibUring::liburing->SetActive();
}
Expand All @@ -256,10 +249,7 @@ bool DepLibUring::IsActive()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return false;
}
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

return DepLibUring::liburing->IsActive();
}
Expand Down Expand Up @@ -580,7 +570,7 @@ void DepLibUring::LibUring::Submit()
MS_TRACE();

// Unset active flag.
this->active = false;
SetInactive();

auto err = io_uring_submit(std::addressof(this->ring));

Expand Down
22 changes: 14 additions & 8 deletions worker/src/DepUsrSCTP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,25 @@ void DepUsrSCTP::Checker::OnTimer(TimerHandle* /*timer*/)
const int elapsedMs = this->lastCalledAtMs ? static_cast<int>(nowMs - this->lastCalledAtMs) : 0;

#ifdef MS_LIBURING_SUPPORTED
// Activate liburing usage.
// 'usrsctp_handle_timers()' will synchronously call the send/recv
// callbacks for the pending data. If there are multiple messages to be
// sent over the network then we will send those messages within a single
// system call.
DepLibUring::SetActive();
if (DepLibUring::IsEnabled())
{
// Activate liburing usage.
// 'usrsctp_handle_timers()' will synchronously call the send/recv
// callbacks for the pending data. If there are multiple messages to be
// sent over the network then we will send those messages within a single
// system call.
DepLibUring::SetActive();
}
#endif

usrsctp_handle_timers(elapsedMs);

#ifdef MS_LIBURING_SUPPORTED
// Submit all prepared submission entries.
DepLibUring::Submit();
if (DepLibUring::IsEnabled())
{
// Submit all prepared submission entries.
DepLibUring::Submit();
}
#endif

this->lastCalledAtMs = nowMs;
Expand Down
Loading

0 comments on commit d5facb5

Please sign in to comment.