From d5facb5b55222bc9eeaddd36662db6590d1fa628 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Fri, 9 Aug 2024 17:45:26 +0200 Subject: [PATCH] DepLibUring improvements (#1440) --- .github/workflows/mediasoup-rust.yaml | 1 + .../workflows/mediasoup-worker-fuzzer.yaml | 9 +- .../workflows/mediasoup-worker-prebuild.yaml | 2 + .github/workflows/mediasoup-worker.yaml | 16 ++- worker/include/DepLibUring.hpp | 10 +- worker/src/DepLibUring.cpp | 98 +++++++++---------- worker/src/DepUsrSCTP.cpp | 22 +++-- worker/src/RTC/Router.cpp | 32 ++++-- worker/src/RTC/RtpStreamSend.cpp | 14 ++- worker/src/RTC/SrtpSession.cpp | 1 + worker/src/RTC/Transport.cpp | 14 ++- worker/src/Worker.cpp | 41 +++++--- worker/src/handles/TcpConnectionHandle.cpp | 12 ++- worker/src/handles/UdpSocketHandle.cpp | 12 ++- 14 files changed, 175 insertions(+), 109 deletions(-) diff --git a/.github/workflows/mediasoup-rust.yaml b/.github/workflows/mediasoup-rust.yaml index 9b76ffcb22..812cbae213 100644 --- a/.github/workflows/mediasoup-rust.yaml +++ b/.github/workflows/mediasoup-rust.yaml @@ -18,6 +18,7 @@ jobs: ci: - os: ubuntu-20.04 - os: ubuntu-22.04 + - os: ubuntu-24.04 - os: macos-12 - os: macos-14 - os: windows-2022 diff --git a/.github/workflows/mediasoup-worker-fuzzer.yaml b/.github/workflows/mediasoup-worker-fuzzer.yaml index 8f47946802..c748ce6e67 100644 --- a/.github/workflows/mediasoup-worker-fuzzer.yaml +++ b/.github/workflows/mediasoup-worker-fuzzer.yaml @@ -13,9 +13,10 @@ jobs: strategy: matrix: build: - - os: ubuntu-22.04 + - os: ubuntu-24.04 cc: clang cxx: clang++ + pip-break-system-packages: true build-type: - Release - Debug @@ -34,12 +35,12 @@ jobs: uses: actions/checkout@v4 # We need to install pip invoke manually. - - if: runner.os != 'macOS' + - if: ${{ !matrix.build.pip-break-system-packages }} name: pip3 install invoke run: pip3 install invoke - # In macOS we need to specify this option. - - if: runner.os == 'macOS' + # In modern OSs we need to run pip with this option. + - if: ${{ matrix.build.pip-break-system-packages }} name: pip3 install --break-system-packages invoke run: pip3 install --break-system-packages invoke diff --git a/.github/workflows/mediasoup-worker-prebuild.yaml b/.github/workflows/mediasoup-worker-prebuild.yaml index e0c458a903..5c51a72be8 100644 --- a/.github/workflows/mediasoup-worker-prebuild.yaml +++ b/.github/workflows/mediasoup-worker-prebuild.yaml @@ -19,6 +19,8 @@ jobs: cc: gcc cxx: g++ # Worker prebuild for Linux with kernel version 6 Ubuntu (22.04). + # Let's not use Ubutu 24.04 to avoid same potential problem as described + # above. - os: ubuntu-22.04 cc: gcc cxx: g++ diff --git a/.github/workflows/mediasoup-worker.yaml b/.github/workflows/mediasoup-worker.yaml index b9522d2215..cfa1498022 100644 --- a/.github/workflows/mediasoup-worker.yaml +++ b/.github/workflows/mediasoup-worker.yaml @@ -31,12 +31,22 @@ jobs: - os: ubuntu-22.04 cc: clang cxx: clang++ + - os: ubuntu-24.04 + cc: gcc + cxx: g++ + pip-break-system-packages: true + - os: ubuntu-24.04 + cc: clang + cxx: clang++ + pip-break-system-packages: true - os: macos-12 cc: gcc cxx: g++ + pip-break-system-packages: true - os: macos-14 cc: clang cxx: clang++ + pip-break-system-packages: true - os: windows-2022 cc: cl cxx: cl @@ -75,12 +85,12 @@ jobs: ${{ matrix.build.os }}-node-${{matrix.build.cc}}- # We need to install pip invoke manually. - - if: runner.os != 'macOS' + - if: ${{ !matrix.build.pip-break-system-packages }} name: pip3 install invoke run: pip3 install invoke - # In macOS we need to specify this option. - - if: runner.os == 'macOS' + # In modern OSs we need to run pip with this option. + - if: ${{ matrix.build.pip-break-system-packages }} name: pip3 install --break-system-packages invoke run: pip3 install --break-system-packages invoke diff --git a/worker/include/DepLibUring.hpp b/worker/include/DepLibUring.hpp index 973acaff42..a0b041c899 100644 --- a/worker/include/DepLibUring.hpp +++ b/worker/include/DepLibUring.hpp @@ -33,9 +33,10 @@ class DepLibUring using SendBuffer = uint8_t[SendBufferSize]; - static bool IsRuntimeSupported(); static void ClassInit(); static void ClassDestroy(); + static void CheckRuntimeSupport(); + static bool IsEnabled(); static flatbuffers::Offset FillBuffer(flatbuffers::FlatBufferBuilder& builder); static void StartPollingCQEs(); static void StopPollingCQEs(); @@ -50,9 +51,12 @@ class DepLibUring class LibUring; + // Whether liburing is enabled or not after runtime checks. + static bool enabled; thread_local static LibUring* liburing; public: + // Singleton. class LibUring { public: @@ -98,6 +102,10 @@ class DepLibUring } private: + void SetInactive() + { + this->active = false; + } UserData* GetUserData(); bool IsDataInSendBuffers(const uint8_t* data) const { diff --git a/worker/src/DepLibUring.cpp b/worker/src/DepLibUring.cpp index 95d2b6631e..c5e8aed778 100644 --- a/worker/src/DepLibUring.cpp +++ b/worker/src/DepLibUring.cpp @@ -10,7 +10,7 @@ #include /* Static variables. */ - +bool DepLibUring::enabled{ false }; /* liburing instance per thread. */ thread_local DepLibUring::LibUring* DepLibUring::liburing{ nullptr }; /* Completion queue entry array used to retrieve processes tasks. */ @@ -114,7 +114,36 @@ inline static void onFdEvent(uv_poll_t* handle, int status, int events) /* Static class methods */ -bool DepLibUring::IsRuntimeSupported() +void DepLibUring::ClassInit() +{ + const auto mayor = io_uring_major_version(); + const auto minor = io_uring_minor_version(); + + MS_DEBUG_TAG(info, "liburing version: \"%i.%i\"", mayor, minor); + + // This must be called first. + DepLibUring::CheckRuntimeSupport(); + + if (DepLibUring::IsEnabled()) + { + DepLibUring::liburing = new LibUring(); + + MS_DEBUG_TAG(info, "liburing enabled"); + } + else + { + MS_DEBUG_TAG(info, "liburing not enabled"); + } +} + +void DepLibUring::ClassDestroy() +{ + MS_TRACE(); + + delete DepLibUring::liburing; +} + +void DepLibUring::CheckRuntimeSupport() { // clang-format off struct utsname buffer{}; @@ -134,43 +163,19 @@ bool DepLibUring::IsRuntimeSupported() // liburing `sento` capabilities are supported for kernel versions greather // than or equal to 6. - return kernelMayorLong >= 6; -} - -void DepLibUring::ClassInit() -{ - const auto mayor = io_uring_major_version(); - const auto minor = io_uring_minor_version(); - - MS_DEBUG_TAG(info, "liburing version: \"%i.%i\"", mayor, minor); - - if (DepLibUring::IsRuntimeSupported()) - { - DepLibUring::liburing = new LibUring(); - - MS_DEBUG_TAG(info, "liburing supported, enabled"); - } - else - { - MS_DEBUG_TAG(info, "liburing not supported, not enabled"); - } + DepLibUring::enabled = kernelMayorLong >= 6; } -void DepLibUring::ClassDestroy() +bool DepLibUring::IsEnabled() { - MS_TRACE(); - - delete DepLibUring::liburing; + return DepLibUring::enabled; } flatbuffers::Offset DepLibUring::FillBuffer(flatbuffers::FlatBufferBuilder& builder) { MS_TRACE(); - if (!DepLibUring::liburing) - { - return 0; - } + MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled"); return DepLibUring::liburing->FillBuffer(builder); } @@ -179,10 +184,7 @@ void DepLibUring::StartPollingCQEs() { MS_TRACE(); - if (!DepLibUring::liburing) - { - return; - } + MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled"); DepLibUring::liburing->StartPollingCQEs(); } @@ -191,10 +193,7 @@ void DepLibUring::StopPollingCQEs() { MS_TRACE(); - if (!DepLibUring::liburing) - { - return; - } + MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled"); DepLibUring::liburing->StopPollingCQEs(); } @@ -203,7 +202,7 @@ uint8_t* DepLibUring::GetSendBuffer() { MS_TRACE(); - MS_ASSERT(DepLibUring::liburing, "DepLibUring::liburing is not set"); + MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled"); return DepLibUring::liburing->GetSendBuffer(); } @@ -213,7 +212,7 @@ bool DepLibUring::PrepareSend( { MS_TRACE(); - MS_ASSERT(DepLibUring::liburing, "DepLibUring::liburing is not set"); + MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled"); return DepLibUring::liburing->PrepareSend(sockfd, data, len, addr, cb); } @@ -223,7 +222,7 @@ bool DepLibUring::PrepareWrite( { MS_TRACE(); - MS_ASSERT(DepLibUring::liburing, "DepLibUring::liburing is not set"); + MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled"); return DepLibUring::liburing->PrepareWrite(sockfd, data1, len1, data2, len2, cb); } @@ -232,10 +231,7 @@ void DepLibUring::Submit() { MS_TRACE(); - if (!DepLibUring::liburing) - { - return; - } + MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled"); DepLibUring::liburing->Submit(); } @@ -244,10 +240,7 @@ void DepLibUring::SetActive() { MS_TRACE(); - if (!DepLibUring::liburing) - { - return; - } + MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled"); DepLibUring::liburing->SetActive(); } @@ -256,10 +249,7 @@ bool DepLibUring::IsActive() { MS_TRACE(); - if (!DepLibUring::liburing) - { - return false; - } + MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled"); return DepLibUring::liburing->IsActive(); } @@ -580,7 +570,7 @@ void DepLibUring::LibUring::Submit() MS_TRACE(); // Unset active flag. - this->active = false; + SetInactive(); auto err = io_uring_submit(std::addressof(this->ring)); diff --git a/worker/src/DepUsrSCTP.cpp b/worker/src/DepUsrSCTP.cpp index 700bac3348..a833a5aefb 100644 --- a/worker/src/DepUsrSCTP.cpp +++ b/worker/src/DepUsrSCTP.cpp @@ -251,19 +251,25 @@ void DepUsrSCTP::Checker::OnTimer(TimerHandle* /*timer*/) const int elapsedMs = this->lastCalledAtMs ? static_cast(nowMs - this->lastCalledAtMs) : 0; #ifdef MS_LIBURING_SUPPORTED - // Activate liburing usage. - // 'usrsctp_handle_timers()' will synchronously call the send/recv - // callbacks for the pending data. If there are multiple messages to be - // sent over the network then we will send those messages within a single - // system call. - DepLibUring::SetActive(); + if (DepLibUring::IsEnabled()) + { + // Activate liburing usage. + // 'usrsctp_handle_timers()' will synchronously call the send/recv + // callbacks for the pending data. If there are multiple messages to be + // sent over the network then we will send those messages within a single + // system call. + DepLibUring::SetActive(); + } #endif usrsctp_handle_timers(elapsedMs); #ifdef MS_LIBURING_SUPPORTED - // Submit all prepared submission entries. - DepLibUring::Submit(); + if (DepLibUring::IsEnabled()) + { + // Submit all prepared submission entries. + DepLibUring::Submit(); + } #endif this->lastCalledAtMs = nowMs; diff --git a/worker/src/RTC/Router.cpp b/worker/src/RTC/Router.cpp index fab0173952..b6e8ff513c 100644 --- a/worker/src/RTC/Router.cpp +++ b/worker/src/RTC/Router.cpp @@ -665,8 +665,11 @@ namespace RTC std::shared_ptr sharedPacket; #ifdef MS_LIBURING_SUPPORTED - // Activate liburing usage. - DepLibUring::SetActive(); + if (DepLibUring::IsEnabled()) + { + // Activate liburing usage. + DepLibUring::SetActive(); + } #endif for (auto* consumer : consumers) @@ -683,8 +686,11 @@ namespace RTC } #ifdef MS_LIBURING_SUPPORTED - // Submit all prepared submission entries. - DepLibUring::Submit(); + if (DepLibUring::IsEnabled()) + { + // Submit all prepared submission entries. + DepLibUring::Submit(); + } #endif } @@ -925,10 +931,13 @@ namespace RTC if (!dataConsumers.empty()) { #ifdef MS_LIBURING_SUPPORTED - // Activate liburing usage. - // The effective sending could be synchronous, thus we would send those - // messages within a single system call. - DepLibUring::SetActive(); + if (DepLibUring::IsEnabled()) + { + // Activate liburing usage. + // The effective sending could be synchronous, thus we would send those + // messages within a single system call. + DepLibUring::SetActive(); + } #endif for (auto* dataConsumer : dataConsumers) @@ -937,8 +946,11 @@ namespace RTC } #ifdef MS_LIBURING_SUPPORTED - // Submit all prepared submission entries. - DepLibUring::Submit(); + if (DepLibUring::IsEnabled()) + { + // Submit all prepared submission entries. + DepLibUring::Submit(); + } #endif } } diff --git a/worker/src/RTC/RtpStreamSend.cpp b/worker/src/RTC/RtpStreamSend.cpp index c0c6f6ce2a..d508bce83e 100644 --- a/worker/src/RTC/RtpStreamSend.cpp +++ b/worker/src/RTC/RtpStreamSend.cpp @@ -128,8 +128,11 @@ namespace RTC this->nackCount++; #ifdef MS_LIBURING_SUPPORTED - // Activate liburing usage. - DepLibUring::SetActive(); + if (DepLibUring::IsEnabled()) + { + // Activate liburing usage. + DepLibUring::SetActive(); + } #endif for (auto it = nackPacket->Begin(); it != nackPacket->End(); ++it) @@ -173,8 +176,11 @@ namespace RTC } #ifdef MS_LIBURING_SUPPORTED - // Submit all prepared submission entries. - DepLibUring::Submit(); + if (DepLibUring::IsEnabled()) + { + // Submit all prepared submission entries. + DepLibUring::Submit(); + } #endif } diff --git a/worker/src/RTC/SrtpSession.cpp b/worker/src/RTC/SrtpSession.cpp index 20755dc656..43eb597c15 100644 --- a/worker/src/RTC/SrtpSession.cpp +++ b/worker/src/RTC/SrtpSession.cpp @@ -204,6 +204,7 @@ namespace RTC uint8_t* encryptBuffer = EncryptBuffer; #ifdef MS_LIBURING_SUPPORTED + if (DepLibUring::IsEnabled()) { if (!DepLibUring::IsActive()) { diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index 89f5800ea3..eb469e70fb 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -2158,8 +2158,11 @@ namespace RTC std::unique_ptr packet{ new RTC::RTCP::CompoundPacket() }; #ifdef MS_LIBURING_SUPPORTED - // Activate liburing usage. - DepLibUring::SetActive(); + if (DepLibUring::IsEnabled()) + { + // Activate liburing usage. + DepLibUring::SetActive(); + } #endif for (auto& kv : this->mapConsumers) @@ -2207,8 +2210,11 @@ namespace RTC } #ifdef MS_LIBURING_SUPPORTED - // Submit all prepared submission entries. - DepLibUring::Submit(); + if (DepLibUring::IsEnabled()) + { + // Submit all prepared submission entries. + DepLibUring::Submit(); + } #endif } diff --git a/worker/src/Worker.cpp b/worker/src/Worker.cpp index 5a3e7f4670..2785301357 100644 --- a/worker/src/Worker.cpp +++ b/worker/src/Worker.cpp @@ -44,8 +44,11 @@ Worker::Worker(::Channel::ChannelSocket* channel) : channel(channel) DepUsrSCTP::CreateChecker(); #ifdef MS_LIBURING_SUPPORTED - // Start polling CQEs, which will create a uv_pool_t handle. - DepLibUring::StartPollingCQEs(); + if (DepLibUring::IsEnabled()) + { + // Start polling CQEs, which will create a uv_pool_t handle. + DepLibUring::StartPollingCQEs(); + } #endif // Tell the Node process that we are running. @@ -106,8 +109,11 @@ void Worker::Close() DepUsrSCTP::CloseChecker(); #ifdef MS_LIBURING_SUPPORTED - // Stop polling CQEs, which will close the uv_pool_t handle. - DepLibUring::StopPollingCQEs(); + if (DepLibUring::IsEnabled()) + { + // Stop polling CQEs, which will close the uv_pool_t handle. + DepLibUring::StopPollingCQEs(); + } #endif // Close the Channel. @@ -142,17 +148,26 @@ flatbuffers::Offset Worker::FillBuffer( // Add channelMessageHandlers. auto channelMessageHandlers = this->shared->channelMessageRegistrator->FillBuffer(builder); - return FBS::Worker::CreateDumpResponseDirect( - builder, - Logger::Pid, - &webRtcServerIds, - &routerIds, - channelMessageHandlers #ifdef MS_LIBURING_SUPPORTED - , - DepLibUring::FillBuffer(builder) + if (DepLibUring::IsEnabled()) + { + return FBS::Worker::CreateDumpResponseDirect( + builder, + Logger::Pid, + &webRtcServerIds, + &routerIds, + channelMessageHandlers, + DepLibUring::FillBuffer(builder)); + } + else + { + return FBS::Worker::CreateDumpResponseDirect( + builder, Logger::Pid, &webRtcServerIds, &routerIds, channelMessageHandlers); + } +#else + return FBS::Worker::CreateDumpResponseDirect( + builder, Logger::Pid, &webRtcServerIds, &routerIds, channelMessageHandlers); #endif - ); } flatbuffers::Offset Worker::FillBufferResourceUsage( diff --git a/worker/src/handles/TcpConnectionHandle.cpp b/worker/src/handles/TcpConnectionHandle.cpp index 475a078ad0..ea53cb8ff9 100644 --- a/worker/src/handles/TcpConnectionHandle.cpp +++ b/worker/src/handles/TcpConnectionHandle.cpp @@ -168,11 +168,14 @@ void TcpConnectionHandle::Start() } #ifdef MS_LIBURING_SUPPORTED - err = uv_fileno(reinterpret_cast(this->uvHandle), std::addressof(this->fd)); - - if (err != 0) + if (DepLibUring::IsEnabled()) { - MS_THROW_ERROR("uv_fileno() failed: %s", uv_strerror(err)); + err = uv_fileno(reinterpret_cast(this->uvHandle), std::addressof(this->fd)); + + if (err != 0) + { + MS_THROW_ERROR("uv_fileno() failed: %s", uv_strerror(err)); + } } #endif } @@ -209,6 +212,7 @@ void TcpConnectionHandle::Write( } #ifdef MS_LIBURING_SUPPORTED + if (DepLibUring::IsEnabled()) { if (!DepLibUring::IsActive()) { diff --git a/worker/src/handles/UdpSocketHandle.cpp b/worker/src/handles/UdpSocketHandle.cpp index 2cb42e40c5..82da018c73 100644 --- a/worker/src/handles/UdpSocketHandle.cpp +++ b/worker/src/handles/UdpSocketHandle.cpp @@ -88,11 +88,14 @@ UdpSocketHandle::UdpSocketHandle(uv_udp_t* uvHandle) : uvHandle(uvHandle) } #ifdef MS_LIBURING_SUPPORTED - err = uv_fileno(reinterpret_cast(this->uvHandle), std::addressof(this->fd)); - - if (err != 0) + if (DepLibUring::IsEnabled()) { - MS_THROW_ERROR("uv_fileno() failed: %s", uv_strerror(err)); + err = uv_fileno(reinterpret_cast(this->uvHandle), std::addressof(this->fd)); + + if (err != 0) + { + MS_THROW_ERROR("uv_fileno() failed: %s", uv_strerror(err)); + } } #endif } @@ -144,6 +147,7 @@ void UdpSocketHandle::Send( } #ifdef MS_LIBURING_SUPPORTED + if (DepLibUring::IsEnabled()) { if (!DepLibUring::IsActive()) {