From 46f881921eec21e0ff1864f66f860b9661c1396e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 2 Jan 2024 14:33:00 +0100 Subject: [PATCH 01/16] ListenInfo: Add transport socket flags --- node/src/Router.ts | 19 +++++++- node/src/Transport.ts | 24 ++++++++++ node/src/tests/test-PlainTransport.ts | 37 ++++++++++++++ npm-scripts.mjs | 8 ++-- worker/fbs/transport.fbs | 10 +++- worker/include/Logger.hpp | 13 +++++ worker/include/RTC/PortManager.hpp | 21 ++++---- worker/include/RTC/TcpServer.hpp | 9 +++- worker/include/RTC/Transport.hpp | 1 + worker/include/RTC/UdpSocket.hpp | 4 +- worker/src/RTC/PipeTransport.cpp | 22 +++------ worker/src/RTC/PlainTransport.cpp | 46 ++++++------------ worker/src/RTC/PortManager.cpp | 69 +++++++++++++++++++++++---- worker/src/RTC/TcpServer.cpp | 15 ++++-- worker/src/RTC/UdpSocket.cpp | 8 ++-- worker/src/RTC/WebRtcServer.cpp | 8 ++-- worker/src/RTC/WebRtcTransport.cpp | 8 ++-- 17 files changed, 228 insertions(+), 94 deletions(-) diff --git a/node/src/Router.ts b/node/src/Router.ts index 9b836f67ad..28dad26213 100644 --- a/node/src/Router.ts +++ b/node/src/Router.ts @@ -7,7 +7,8 @@ import { Transport, TransportListenInfo, TransportListenIp, - TransportProtocol + TransportProtocol, + TransportSocketFlag } from './Transport'; import { WebRtcTransport, WebRtcTransportOptions, parseWebRtcTransportDumpResponse } from './WebRtcTransport'; import { PlainTransport, PlainTransportOptions, parsePlainTransportDumpResponse } from './PlainTransport'; @@ -570,6 +571,7 @@ export class Router listenInfo.ip, listenInfo.announcedIp, listenInfo.port, + socketFlagsToInteger(listenInfo.flags), listenInfo.sendBufferSize, listenInfo.recvBufferSize )); @@ -749,6 +751,7 @@ export class Router listenInfo!.ip, listenInfo!.announcedIp, listenInfo!.port, + socketFlagsToInteger(listenInfo!.flags), listenInfo!.sendBufferSize, listenInfo!.recvBufferSize ), @@ -759,6 +762,7 @@ export class Router rtcpListenInfo.ip, rtcpListenInfo.announcedIp, rtcpListenInfo.port, + socketFlagsToInteger(rtcpListenInfo.flags), rtcpListenInfo.sendBufferSize, rtcpListenInfo.recvBufferSize ) : undefined, @@ -897,6 +901,7 @@ export class Router listenInfo!.ip, listenInfo!.announcedIp, listenInfo!.port, + socketFlagsToInteger(listenInfo!.flags), listenInfo!.sendBufferSize, listenInfo!.recvBufferSize ), @@ -1619,3 +1624,15 @@ export function parseRouterDumpResponse( mapDataConsumerIdDataProducerId : parseStringStringVector(binary, 'mapDataConsumerIdDataProducerId') }; } + +function socketFlagsToInteger(flags: TransportSocketFlag[] = []): number +{ + let flagsInteger = 0; + + for (const flag of flags) + { + flagsInteger |= flag; + } + + return flagsInteger; +} diff --git a/node/src/Transport.ts b/node/src/Transport.ts index 7ae79d39df..869d76c2d1 100644 --- a/node/src/Transport.ts +++ b/node/src/Transport.ts @@ -73,6 +73,11 @@ export type TransportListenInfo = */ port?: number; + /** + * Socket flags. + */ + flags?: TransportSocketFlag[]; + /** * Send buffer size (bytes). */ @@ -107,6 +112,25 @@ export type TransportListenIp = */ export type TransportProtocol = 'udp' | 'tcp'; +/** + * UDP/TCP socket flag. + */ +// NOTE: ESLint absurdly complains about "'TransportSocketFlag' is already +// declared in the upper scope". +// eslint-disable-next-line no-shadow +export enum TransportSocketFlag +{ + /** + * Disable dual-stack support so only IPv6 is used (only if ip is IPv6). + */ + IPV6ONLY = FbsTransport.SocketFlag.IPV6ONLY, + /** + * Make different transports bind to the same ip and port (only for UDP). + * Useful for multicast scenarios with plain transport. Use with caution. + */ + UDP_REUSEPORT = FbsTransport.SocketFlag.UDP_REUSEPORT +} + export type TransportTuple = { localIp: string; diff --git a/node/src/tests/test-PlainTransport.ts b/node/src/tests/test-PlainTransport.ts index 3e75c50586..da0986e0f7 100644 --- a/node/src/tests/test-PlainTransport.ts +++ b/node/src/tests/test-PlainTransport.ts @@ -316,6 +316,43 @@ test('router.createPlainTransport() with non bindable IP rejects with Error', as .toThrow(Error); }, 2000); +test('two transports listening in same IP:port succeed if UDP_REUSEPORT flag is set', async () => +{ + let transport1: mediasoup.types.PlainTransport | undefined; + let transport2: mediasoup.types.PlainTransport | undefined; + + await expect(async () => + { + const multicastIp = '224.0.0.1'; + const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 }); + + transport1 = await router.createPlainTransport( + { + listenInfo : + { + protocol : 'udp', + ip : multicastIp, + port : port, + flags : [ mediasoup.types.TransportSocketFlag.UDP_REUSEPORT ] + } + }); + + transport2 = await router.createPlainTransport( + { + listenInfo : + { + protocol : 'udp', + ip : multicastIp, + port : port, + flags : [ mediasoup.types.TransportSocketFlag.UDP_REUSEPORT ] + } + }); + }).not.toThrow(); + + transport1?.close(); + transport2?.close(); +}, 2000); + test('plainTransport.getStats() succeeds', async () => { const data = await transport.getStats(); diff --git a/npm-scripts.mjs b/npm-scripts.mjs index 6dab36a258..a734c35e33 100644 --- a/npm-scripts.mjs +++ b/npm-scripts.mjs @@ -1,7 +1,7 @@ -import process from 'node:process'; -import os from 'node:os'; -import fs from 'node:fs'; -import path from 'node:path'; +import * as process from 'node:process'; +import * as os from 'node:os'; +import * as fs from 'node:fs'; +import * as path from 'node:path'; import { execSync } from 'node:child_process'; import fetch from 'node-fetch'; import tar from 'tar'; diff --git a/worker/fbs/transport.fbs b/worker/fbs/transport.fbs index d2a77d4ae9..22415fb862 100644 --- a/worker/fbs/transport.fbs +++ b/worker/fbs/transport.fbs @@ -13,11 +13,17 @@ enum Protocol: uint8 { TCP } +enum SocketFlag: uint8 { + IPV6ONLY = 1, + UDP_REUSEPORT = 2 +} + table ListenInfo { - protocol: FBS.Transport.Protocol = UDP; + protocol: Protocol = UDP; ip: string (required); announced_ip: string; port: uint16 = 0; + flags: uint8 = 0; send_buffer_size: uint32 = 0; recv_buffer_size: uint32 = 0; } @@ -85,7 +91,7 @@ table Tuple { local_port: uint16; remote_ip: string; remote_port: uint16; - protocol: FBS.Transport.Protocol = UDP; + protocol: Protocol = UDP; } table RtpListener { diff --git a/worker/include/Logger.hpp b/worker/include/Logger.hpp index 53c16f32a4..e3c22bb71f 100644 --- a/worker/include/Logger.hpp +++ b/worker/include/Logger.hpp @@ -126,6 +126,19 @@ ((value & 0x02) ? '1' : '0'), \ ((value & 0x01) ? '1' : '0') +// Usage: +// MS_DEBUG_DEV("Leading text "MS_UINT8_TO_BINARY_PATTERN, MS_UINT8_TO_BINARY(value)); +#define MS_UINT8_TO_BINARY_PATTERN "%c%c%c%c%c%c%c%c" +#define MS_UINT8_TO_BINARY(value) \ + ((value & 0x80) ? '1' : '0'), \ + ((value & 0x40) ? '1' : '0'), \ + ((value & 0x20) ? '1' : '0'), \ + ((value & 0x10) ? '1' : '0'), \ + ((value & 0x08) ? '1' : '0'), \ + ((value & 0x04) ? '1' : '0'), \ + ((value & 0x02) ? '1' : '0'), \ + ((value & 0x01) ? '1' : '0') + class Logger { public: diff --git a/worker/include/RTC/PortManager.hpp b/worker/include/RTC/PortManager.hpp index 4300548385..775a7235d9 100644 --- a/worker/include/RTC/PortManager.hpp +++ b/worker/include/RTC/PortManager.hpp @@ -20,21 +20,21 @@ namespace RTC }; public: - static uv_udp_t* BindUdp(std::string& ip) + static uv_udp_t* BindUdp(std::string& ip, uint8_t flags) { - return reinterpret_cast(Bind(Transport::UDP, ip)); + return reinterpret_cast(Bind(Transport::UDP, ip, flags)); } - static uv_udp_t* BindUdp(std::string& ip, uint16_t port) + static uv_udp_t* BindUdp(std::string& ip, uint16_t port, uint8_t flags) { - return reinterpret_cast(Bind(Transport::UDP, ip, port)); + return reinterpret_cast(Bind(Transport::UDP, ip, port, flags)); } - static uv_tcp_t* BindTcp(std::string& ip) + static uv_tcp_t* BindTcp(std::string& ip, uint8_t flags) { - return reinterpret_cast(Bind(Transport::TCP, ip)); + return reinterpret_cast(Bind(Transport::TCP, ip, flags)); } - static uv_tcp_t* BindTcp(std::string& ip, uint16_t port) + static uv_tcp_t* BindTcp(std::string& ip, uint16_t port, uint8_t flags) { - return reinterpret_cast(Bind(Transport::TCP, ip, port)); + return reinterpret_cast(Bind(Transport::TCP, ip, port, flags)); } static void UnbindUdp(std::string& ip, uint16_t port) { @@ -46,10 +46,11 @@ namespace RTC } private: - static uv_handle_t* Bind(Transport transport, std::string& ip); - static uv_handle_t* Bind(Transport transport, std::string& ip, uint16_t port); + static uv_handle_t* Bind(Transport transport, std::string& ip, uint8_t flags); + static uv_handle_t* Bind(Transport transport, std::string& ip, uint16_t port, uint8_t flags); static void Unbind(Transport transport, std::string& ip, uint16_t port); static std::vector& GetPorts(Transport transport, const std::string& ip); + static uint8_t ConvertSocketFlags(uint8_t flags); private: thread_local static absl::flat_hash_map> mapUdpIpPorts; diff --git a/worker/include/RTC/TcpServer.hpp b/worker/include/RTC/TcpServer.hpp index 89e03ca6b8..102d85c2fc 100644 --- a/worker/include/RTC/TcpServer.hpp +++ b/worker/include/RTC/TcpServer.hpp @@ -23,9 +23,14 @@ namespace RTC }; public: - TcpServer(Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip); TcpServer( - Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint16_t port); + Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint8_t flags); + TcpServer( + Listener* listener, + RTC::TcpConnection::Listener* connListener, + std::string& ip, + uint16_t port, + uint8_t flags); ~TcpServer() override; /* Pure virtual methods inherited from ::TcpServerHandle. */ diff --git a/worker/include/RTC/Transport.hpp b/worker/include/RTC/Transport.hpp index 61bdd3ec65..e5702f515d 100644 --- a/worker/include/RTC/Transport.hpp +++ b/worker/include/RTC/Transport.hpp @@ -126,6 +126,7 @@ namespace RTC uint16_t port{ 0u }; uint32_t sendBufferSize{ 0u }; uint32_t recvBufferSize{ 0u }; + uint8_t flags{ 0u }; }; private: diff --git a/worker/include/RTC/UdpSocket.hpp b/worker/include/RTC/UdpSocket.hpp index 46e90f3edd..aff4a5b8c3 100644 --- a/worker/include/RTC/UdpSocket.hpp +++ b/worker/include/RTC/UdpSocket.hpp @@ -21,8 +21,8 @@ namespace RTC }; public: - UdpSocket(Listener* listener, std::string& ip); - UdpSocket(Listener* listener, std::string& ip, uint16_t port); + UdpSocket(Listener* listener, std::string& ip, uint8_t flags); + UdpSocket(Listener* listener, std::string& ip, uint16_t port, uint8_t flags); ~UdpSocket() override; /* Pure virtual methods inherited from ::UdpSocketHandle. */ diff --git a/worker/src/RTC/PipeTransport.cpp b/worker/src/RTC/PipeTransport.cpp index 950697b111..ae0ea29231 100644 --- a/worker/src/RTC/PipeTransport.cpp +++ b/worker/src/RTC/PipeTransport.cpp @@ -50,19 +50,10 @@ namespace RTC this->listenInfo.announcedIp.assign(options->listenInfo()->announcedIp()->str()); } - this->listenInfo.port = options->listenInfo()->port(); - - if (flatbuffers::IsFieldPresent( - options->listenInfo(), FBS::Transport::ListenInfo::VT_SENDBUFFERSIZE)) - { - this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); - } - - if (flatbuffers::IsFieldPresent( - options->listenInfo(), FBS::Transport::ListenInfo::VT_RECVBUFFERSIZE)) - { - this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); - } + this->listenInfo.port = options->listenInfo()->port(); + this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); + this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); + this->listenInfo.flags = options->listenInfo()->flags(); this->rtx = options->enableRtx(); @@ -77,11 +68,12 @@ namespace RTC // This may throw. if (this->listenInfo.port != 0) { - this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.port); + this->udpSocket = new RTC::UdpSocket( + this, this->listenInfo.ip, this->listenInfo.port, this->listenInfo.flags); } else { - this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip); + this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.flags); } if (this->listenInfo.sendBufferSize != 0) diff --git a/worker/src/RTC/PlainTransport.cpp b/worker/src/RTC/PlainTransport.cpp index 305e67516f..36d0fa53d8 100644 --- a/worker/src/RTC/PlainTransport.cpp +++ b/worker/src/RTC/PlainTransport.cpp @@ -52,19 +52,10 @@ namespace RTC this->listenInfo.announcedIp.assign(options->listenInfo()->announcedIp()->str()); } - this->listenInfo.port = options->listenInfo()->port(); - - if (flatbuffers::IsFieldPresent( - options->listenInfo(), FBS::Transport::ListenInfo::VT_SENDBUFFERSIZE)) - { - this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); - } - - if (flatbuffers::IsFieldPresent( - options->listenInfo(), FBS::Transport::ListenInfo::VT_RECVBUFFERSIZE)) - { - this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); - } + this->listenInfo.port = options->listenInfo()->port(); + this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); + this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); + this->listenInfo.flags = options->listenInfo()->flags(); this->rtcpMux = options->rtcpMux(); this->comedia = options->comedia(); @@ -90,19 +81,10 @@ namespace RTC this->rtcpListenInfo.announcedIp.assign(options->rtcpListenInfo()->announcedIp()->str()); } - this->rtcpListenInfo.port = options->rtcpListenInfo()->port(); - - if (flatbuffers::IsFieldPresent( - options->rtcpListenInfo(), FBS::Transport::ListenInfo::VT_SENDBUFFERSIZE)) - { - this->rtcpListenInfo.sendBufferSize = options->rtcpListenInfo()->sendBufferSize(); - } - - if (flatbuffers::IsFieldPresent( - options->rtcpListenInfo(), FBS::Transport::ListenInfo::VT_RECVBUFFERSIZE)) - { - this->rtcpListenInfo.recvBufferSize = options->rtcpListenInfo()->recvBufferSize(); - } + this->rtcpListenInfo.port = options->rtcpListenInfo()->port(); + this->rtcpListenInfo.sendBufferSize = options->rtcpListenInfo()->sendBufferSize(); + this->rtcpListenInfo.recvBufferSize = options->rtcpListenInfo()->recvBufferSize(); + this->rtcpListenInfo.flags = options->rtcpListenInfo()->flags(); } // If rtcpListenInfo is not given, just clone listenInfo. else @@ -160,11 +142,12 @@ namespace RTC // This may throw. if (this->listenInfo.port != 0) { - this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.port); + this->udpSocket = new RTC::UdpSocket( + this, this->listenInfo.ip, this->listenInfo.port, this->listenInfo.flags); } else { - this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip); + this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.flags); } if (this->listenInfo.sendBufferSize != 0) @@ -184,12 +167,13 @@ namespace RTC // This may throw. if (this->rtcpListenInfo.port != 0) { - this->rtcpUdpSocket = - new RTC::UdpSocket(this, this->rtcpListenInfo.ip, this->rtcpListenInfo.port); + this->rtcpUdpSocket = new RTC::UdpSocket( + this, this->rtcpListenInfo.ip, this->rtcpListenInfo.port, this->rtcpListenInfo.flags); } else { - this->rtcpUdpSocket = new RTC::UdpSocket(this, this->rtcpListenInfo.ip); + this->rtcpUdpSocket = + new RTC::UdpSocket(this, this->rtcpListenInfo.ip, this->rtcpListenInfo.flags); } if (this->rtcpListenInfo.sendBufferSize != 0) diff --git a/worker/src/RTC/PortManager.cpp b/worker/src/RTC/PortManager.cpp index 0d19d686f3..82eaee0e4f 100644 --- a/worker/src/RTC/PortManager.cpp +++ b/worker/src/RTC/PortManager.cpp @@ -7,6 +7,7 @@ #include "MediaSoupErrors.hpp" #include "Settings.hpp" #include "Utils.hpp" +#include "FBS/transport.h" #include // std:make_tuple() #include // std::piecewise_construct @@ -38,18 +39,20 @@ namespace RTC /* Class methods. */ - uv_handle_t* PortManager::Bind(Transport transport, std::string& ip) + uv_handle_t* PortManager::Bind(Transport transport, std::string& ip, uint8_t flags) { MS_TRACE(); // First normalize the IP. This may throw if invalid IP. Utils::IP::NormalizeIp(ip); + // Covert given flags into libuv flags. + flags = ConvertSocketFlags(flags); + int err; const int family = Utils::IP::GetFamily(ip); struct sockaddr_storage bindAddr; // NOLINT(cppcoreguidelines-pro-type-member-init) size_t portIdx; - int flags{ 0 }; std::vector& ports = PortManager::GetPorts(transport, ip); size_t attempt{ 0u }; const size_t numAttempts = ports.size(); @@ -60,12 +63,18 @@ namespace RTC switch (transport) { case Transport::UDP: + { transportStr.assign("udp"); + break; + } case Transport::TCP: + { transportStr.assign("tcp"); + break; + } } switch (family) @@ -91,9 +100,6 @@ namespace RTC MS_THROW_ERROR("uv_ip6_addr() failed: %s", uv_strerror(err)); } - // Don't also bind into IPv4 when listening in IPv6. - flags |= UV_UDP_IPV6ONLY; - break; } @@ -160,12 +166,18 @@ namespace RTC switch (family) { case AF_INET: + { (reinterpret_cast(&bindAddr))->sin_port = htons(port); + break; + } case AF_INET6: + { (reinterpret_cast(&bindAddr))->sin6_port = htons(port); + break; + } } // Try to bind on it. @@ -356,29 +368,37 @@ namespace RTC return static_cast(uvHandle); } - uv_handle_t* PortManager::Bind(Transport transport, std::string& ip, uint16_t port) + uv_handle_t* PortManager::Bind(Transport transport, std::string& ip, uint16_t port, uint8_t flags) { MS_TRACE(); // First normalize the IP. This may throw if invalid IP. Utils::IP::NormalizeIp(ip); + // Covert given flags into libuv flags. + flags = ConvertSocketFlags(flags); + int err; const int family = Utils::IP::GetFamily(ip); struct sockaddr_storage bindAddr; // NOLINT(cppcoreguidelines-pro-type-member-init) - int flags{ 0 }; uv_handle_t* uvHandle{ nullptr }; std::string transportStr; switch (transport) { case Transport::UDP: + { transportStr.assign("udp"); + break; + } case Transport::TCP: + { transportStr.assign("tcp"); + break; + } } switch (family) @@ -404,9 +424,6 @@ namespace RTC MS_THROW_ERROR("uv_ip6_addr() failed: %s", uv_strerror(err)); } - // Don't also bind into IPv4 when listening in IPv6. - flags |= UV_UDP_IPV6ONLY; - break; } @@ -421,27 +438,39 @@ namespace RTC switch (family) { case AF_INET: + { (reinterpret_cast(&bindAddr))->sin_port = htons(port); + break; + } case AF_INET6: + { (reinterpret_cast(&bindAddr))->sin6_port = htons(port); + break; + } } // Try to bind on it. switch (transport) { case Transport::UDP: + { uvHandle = reinterpret_cast(new uv_udp_t()); err = uv_udp_init_ex( DepLibUV::GetLoop(), reinterpret_cast(uvHandle), UV_UDP_RECVMMSG); + break; + } case Transport::TCP: + { uvHandle = reinterpret_cast(new uv_tcp_t()); err = uv_tcp_init(DepLibUV::GetLoop(), reinterpret_cast(uvHandle)); + break; + } } if (err != 0) @@ -666,4 +695,24 @@ namespace RTC return emptyPorts; } + + uint8_t PortManager::ConvertSocketFlags(uint8_t flags) + { + MS_TRACE(); + + uint8_t newFlags{ 0u }; + + if (flags & static_cast(FBS::Transport::SocketFlag::IPV6ONLY)) + { + newFlags |= UV_UDP_IPV6ONLY; + newFlags |= UV_TCP_IPV6ONLY; // Same flag number but anyway. + } + + if (flags & static_cast(FBS::Transport::SocketFlag::UDP_REUSEPORT)) + { + newFlags |= UV_UDP_REUSEADDR; + } + + return newFlags; + } } // namespace RTC diff --git a/worker/src/RTC/TcpServer.cpp b/worker/src/RTC/TcpServer.cpp index ddd1d2da82..b007fad3c0 100644 --- a/worker/src/RTC/TcpServer.cpp +++ b/worker/src/RTC/TcpServer.cpp @@ -10,19 +10,24 @@ namespace RTC { /* Instance methods. */ - TcpServer::TcpServer(Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip) + TcpServer::TcpServer( + Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint8_t flags) : // This may throw. - ::TcpServerHandle::TcpServerHandle(RTC::PortManager::BindTcp(ip)), listener(listener), + ::TcpServerHandle::TcpServerHandle(RTC::PortManager::BindTcp(ip, flags)), listener(listener), connListener(connListener) { MS_TRACE(); } TcpServer::TcpServer( - Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint16_t port) + Listener* listener, + RTC::TcpConnection::Listener* connListener, + std::string& ip, + uint16_t port, + uint8_t flags) : // This may throw. - ::TcpServerHandle::TcpServerHandle(RTC::PortManager::BindTcp(ip, port)), listener(listener), - connListener(connListener), fixedPort(true) + ::TcpServerHandle::TcpServerHandle(RTC::PortManager::BindTcp(ip, port, flags)), + listener(listener), connListener(connListener), fixedPort(true) { MS_TRACE(); } diff --git a/worker/src/RTC/UdpSocket.cpp b/worker/src/RTC/UdpSocket.cpp index 6c47ab4671..f340fd86f2 100644 --- a/worker/src/RTC/UdpSocket.cpp +++ b/worker/src/RTC/UdpSocket.cpp @@ -10,16 +10,16 @@ namespace RTC { /* Instance methods. */ - UdpSocket::UdpSocket(Listener* listener, std::string& ip) + UdpSocket::UdpSocket(Listener* listener, std::string& ip, uint8_t flags) : // This may throw. - ::UdpSocketHandle::UdpSocketHandle(PortManager::BindUdp(ip)), listener(listener) + ::UdpSocketHandle::UdpSocketHandle(PortManager::BindUdp(ip, flags)), listener(listener) { MS_TRACE(); } - UdpSocket::UdpSocket(Listener* listener, std::string& ip, uint16_t port) + UdpSocket::UdpSocket(Listener* listener, std::string& ip, uint16_t port, uint8_t flags) : // This may throw. - ::UdpSocketHandle::UdpSocketHandle(PortManager::BindUdp(ip, port)), listener(listener), + ::UdpSocketHandle::UdpSocketHandle(PortManager::BindUdp(ip, port, flags)), listener(listener), fixedPort(true) { MS_TRACE(); diff --git a/worker/src/RTC/WebRtcServer.cpp b/worker/src/RTC/WebRtcServer.cpp index 8fe9f760a3..c89a64ddfb 100644 --- a/worker/src/RTC/WebRtcServer.cpp +++ b/worker/src/RTC/WebRtcServer.cpp @@ -67,11 +67,11 @@ namespace RTC if (listenInfo->port() != 0) { - udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port()); + udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port(), listenInfo->flags()); } else { - udpSocket = new RTC::UdpSocket(this, ip); + udpSocket = new RTC::UdpSocket(this, ip, listenInfo->flags()); } this->udpSocketOrTcpServers.emplace_back(udpSocket, nullptr, announcedIp); @@ -99,11 +99,11 @@ namespace RTC if (listenInfo->port() != 0) { - tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port()); + tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port(), listenInfo->flags()); } else { - tcpServer = new RTC::TcpServer(this, this, ip); + tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->flags()); } this->udpSocketOrTcpServers.emplace_back(nullptr, tcpServer, announcedIp); diff --git a/worker/src/RTC/WebRtcTransport.cpp b/worker/src/RTC/WebRtcTransport.cpp index 19e33d5f6d..9b897649b0 100644 --- a/worker/src/RTC/WebRtcTransport.cpp +++ b/worker/src/RTC/WebRtcTransport.cpp @@ -72,11 +72,11 @@ namespace RTC if (listenInfo->port() != 0) { - udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port()); + udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port(), listenInfo->flags()); } else { - udpSocket = new RTC::UdpSocket(this, ip); + udpSocket = new RTC::UdpSocket(this, ip, listenInfo->flags()); } this->udpSockets[udpSocket] = announcedIp; @@ -114,11 +114,11 @@ namespace RTC if (listenInfo->port() != 0) { - tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port()); + tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port(), listenInfo->flags()); } else { - tcpServer = new RTC::TcpServer(this, this, ip); + tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->flags()); } this->tcpServers[tcpServer] = announcedIp; From 54d8e7bea929eb7c6220ddb0fb9a8a7347dbb555 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 2 Jan 2024 14:50:49 +0100 Subject: [PATCH 02/16] don't try multicast on Windows CI --- .github/workflows/mediasoup-node.yaml | 1 + .github/workflows/mediasoup-worker.yaml | 1 + node/src/tests/test-PlainTransport.ts | 68 ++++++++++++++----------- 3 files changed, 39 insertions(+), 31 deletions(-) diff --git a/.github/workflows/mediasoup-node.yaml b/.github/workflows/mediasoup-node.yaml index 5e61e6bc1a..0618603bc7 100644 --- a/.github/workflows/mediasoup-node.yaml +++ b/.github/workflows/mediasoup-node.yaml @@ -28,6 +28,7 @@ jobs: env: MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true' + MEDIASOUP_LOCAL_DEV: 'true' steps: - name: Checkout diff --git a/.github/workflows/mediasoup-worker.yaml b/.github/workflows/mediasoup-worker.yaml index 5405601601..6192e75475 100644 --- a/.github/workflows/mediasoup-worker.yaml +++ b/.github/workflows/mediasoup-worker.yaml @@ -44,6 +44,7 @@ jobs: CC: ${{ matrix.build.cc }} CXX: ${{ matrix.build.cxx }} MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true' + MEDIASOUP_LOCAL_DEV: 'true' steps: - name: Checkout diff --git a/node/src/tests/test-PlainTransport.ts b/node/src/tests/test-PlainTransport.ts index da0986e0f7..3e406c4c21 100644 --- a/node/src/tests/test-PlainTransport.ts +++ b/node/src/tests/test-PlainTransport.ts @@ -1,7 +1,10 @@ +import * as os from 'node:os'; // @ts-ignore import * as pickPort from 'pick-port'; import * as mediasoup from '../'; +const IS_WINDOWS = os.platform() === 'win32'; + let worker: mediasoup.types.Worker; let router: mediasoup.types.Router; let transport: mediasoup.types.PlainTransport; @@ -316,42 +319,45 @@ test('router.createPlainTransport() with non bindable IP rejects with Error', as .toThrow(Error); }, 2000); -test('two transports listening in same IP:port succeed if UDP_REUSEPORT flag is set', async () => +if (!IS_WINDOWS) { - let transport1: mediasoup.types.PlainTransport | undefined; - let transport2: mediasoup.types.PlainTransport | undefined; - - await expect(async () => + test('two transports listening in same IP:port succeed if UDP_REUSEPORT flag is set', async () => { - const multicastIp = '224.0.0.1'; - const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 }); + let transport1: mediasoup.types.PlainTransport | undefined; + let transport2: mediasoup.types.PlainTransport | undefined; - transport1 = await router.createPlainTransport( - { - listenInfo : + await expect(async () => + { + const multicastIp = '224.0.0.1'; + const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 }); + + transport1 = await router.createPlainTransport( { - protocol : 'udp', - ip : multicastIp, - port : port, - flags : [ mediasoup.types.TransportSocketFlag.UDP_REUSEPORT ] - } - }); - - transport2 = await router.createPlainTransport( - { - listenInfo : + listenInfo : + { + protocol : 'udp', + ip : multicastIp, + port : port, + flags : [ mediasoup.types.TransportSocketFlag.UDP_REUSEPORT ] + } + }); + + transport2 = await router.createPlainTransport( { - protocol : 'udp', - ip : multicastIp, - port : port, - flags : [ mediasoup.types.TransportSocketFlag.UDP_REUSEPORT ] - } - }); - }).not.toThrow(); - - transport1?.close(); - transport2?.close(); -}, 2000); + listenInfo : + { + protocol : 'udp', + ip : multicastIp, + port : port, + flags : [ mediasoup.types.TransportSocketFlag.UDP_REUSEPORT ] + } + }); + }).not.toThrow(); + + transport1?.close(); + transport2?.close(); + }, 2000); +} test('plainTransport.getStats() succeeds', async () => { From 6e785dc650a104d0325e84bbdb5ac718f04be3ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 2 Jan 2024 14:50:58 +0100 Subject: [PATCH 03/16] Rust WIP --- rust/src/data_structures.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/rust/src/data_structures.rs b/rust/src/data_structures.rs index b4d41a6167..6f66113ad2 100644 --- a/rust/src/data_structures.rs +++ b/rust/src/data_structures.rs @@ -62,6 +62,9 @@ pub struct ListenInfo { /// Listening port. #[serde(skip_serializing_if = "Option::is_none")] pub port: Option, + /// Socket flags. + #[serde(skip_serializing_if = "Option::is_none")] + pub flags: Option>, /// Send buffer size (bytes). #[serde(skip_serializing_if = "Option::is_none")] pub send_buffer_size: Option, @@ -190,6 +193,26 @@ impl Protocol { } } +/// Transport socket flag. +#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum SocketFlag { + /// Disable dual-stack support so only IPv6 is used (only if ip is IPv6). + IpV6Only, + /// Make different transports bind to the same ip and port (only for UDP). + /// Useful for multicast scenarios with plain transport. Use with caution. + UdpReusePort, +} + +impl SocketFlag { + pub(crate) fn to_fbs(self) -> transport::SocketFlag { + match self { + SocketFlag::IpV6Only => transport::SocketFlag::IPV6ONLY, + SocketFlag::UdpReusePort => transport::SocketFlag::UDP_REUSEPORT, + } + } +} + /// ICE candidate #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] From a1405e23abee5dcfd13199d87c767fe5c331f4b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 2 Jan 2024 17:11:48 +0100 Subject: [PATCH 04/16] Make flags be a table/struct --- node/src/Router.ts | 26 +++++++++--------- node/src/Transport.ts | 15 +++++------ node/src/Worker.ts | 3 ++- node/src/tests/test-PlainTransport.ts | 4 +-- worker/fbs/transport.fbs | 8 +++--- worker/include/RTC/PortManager.hpp | 16 ++++++----- worker/include/RTC/TcpServer.hpp | 8 ++++-- worker/include/RTC/Transport.hpp | 8 +++++- worker/include/RTC/UdpSocket.hpp | 5 ++-- worker/src/RTC/PipeTransport.cpp | 9 ++++--- worker/src/RTC/PlainTransport.cpp | 12 +++++---- worker/src/RTC/PortManager.cpp | 38 ++++++++++++--------------- worker/src/RTC/TcpServer.cpp | 7 +++-- worker/src/RTC/UdpSocket.cpp | 5 ++-- worker/src/RTC/WebRtcServer.cpp | 13 ++++++--- worker/src/RTC/WebRtcTransport.cpp | 13 ++++++--- 16 files changed, 106 insertions(+), 84 deletions(-) diff --git a/node/src/Router.ts b/node/src/Router.ts index 28dad26213..9de04034d5 100644 --- a/node/src/Router.ts +++ b/node/src/Router.ts @@ -8,7 +8,7 @@ import { TransportListenInfo, TransportListenIp, TransportProtocol, - TransportSocketFlag + TransportSocketFlags } from './Transport'; import { WebRtcTransport, WebRtcTransportOptions, parseWebRtcTransportDumpResponse } from './WebRtcTransport'; import { PlainTransport, PlainTransportOptions, parsePlainTransportDumpResponse } from './PlainTransport'; @@ -571,7 +571,7 @@ export class Router listenInfo.ip, listenInfo.announcedIp, listenInfo.port, - socketFlagsToInteger(listenInfo.flags), + socketFlagsToFbs(listenInfo.flags), listenInfo.sendBufferSize, listenInfo.recvBufferSize )); @@ -751,7 +751,7 @@ export class Router listenInfo!.ip, listenInfo!.announcedIp, listenInfo!.port, - socketFlagsToInteger(listenInfo!.flags), + socketFlagsToFbs(listenInfo!.flags), listenInfo!.sendBufferSize, listenInfo!.recvBufferSize ), @@ -762,7 +762,7 @@ export class Router rtcpListenInfo.ip, rtcpListenInfo.announcedIp, rtcpListenInfo.port, - socketFlagsToInteger(rtcpListenInfo.flags), + socketFlagsToFbs(rtcpListenInfo.flags), rtcpListenInfo.sendBufferSize, rtcpListenInfo.recvBufferSize ) : undefined, @@ -901,7 +901,7 @@ export class Router listenInfo!.ip, listenInfo!.announcedIp, listenInfo!.port, - socketFlagsToInteger(listenInfo!.flags), + socketFlagsToFbs(listenInfo!.flags), listenInfo!.sendBufferSize, listenInfo!.recvBufferSize ), @@ -1625,14 +1625,12 @@ export function parseRouterDumpResponse( }; } -function socketFlagsToInteger(flags: TransportSocketFlag[] = []): number +export function socketFlagsToFbs( + flags: TransportSocketFlags = {} +): FbsTransport.SocketFlagsT { - let flagsInteger = 0; - - for (const flag of flags) - { - flagsInteger |= flag; - } - - return flagsInteger; + return new FbsTransport.SocketFlagsT( + Boolean(flags.ipv6Only), + Boolean(flags.udpReusePort) + ); } diff --git a/node/src/Transport.ts b/node/src/Transport.ts index 869d76c2d1..d04f863927 100644 --- a/node/src/Transport.ts +++ b/node/src/Transport.ts @@ -76,7 +76,7 @@ export type TransportListenInfo = /** * Socket flags. */ - flags?: TransportSocketFlag[]; + flags?: TransportSocketFlags; /** * Send buffer size (bytes). @@ -113,23 +113,20 @@ export type TransportListenIp = export type TransportProtocol = 'udp' | 'tcp'; /** - * UDP/TCP socket flag. + * UDP/TCP socket flags. */ -// NOTE: ESLint absurdly complains about "'TransportSocketFlag' is already -// declared in the upper scope". -// eslint-disable-next-line no-shadow -export enum TransportSocketFlag +export type TransportSocketFlags = { /** * Disable dual-stack support so only IPv6 is used (only if ip is IPv6). */ - IPV6ONLY = FbsTransport.SocketFlag.IPV6ONLY, + ipv6Only?: boolean; /** * Make different transports bind to the same ip and port (only for UDP). * Useful for multicast scenarios with plain transport. Use with caution. */ - UDP_REUSEPORT = FbsTransport.SocketFlag.UDP_REUSEPORT -} + udpReusePort?: boolean; +}; export type TransportTuple = { diff --git a/node/src/Worker.ts b/node/src/Worker.ts index 44cee98b4b..56a6d7c813 100644 --- a/node/src/Worker.ts +++ b/node/src/Worker.ts @@ -6,7 +6,7 @@ import { Logger } from './Logger'; import { EnhancedEventEmitter } from './EnhancedEventEmitter'; import * as ortc from './ortc'; import { Channel } from './Channel'; -import { Router, RouterOptions } from './Router'; +import { Router, RouterOptions, socketFlagsToFbs } from './Router'; import { WebRtcServer, WebRtcServerOptions } from './WebRtcServer'; import { RtpCodecCapability } from './RtpParameters'; import { AppData } from './types'; @@ -699,6 +699,7 @@ export class Worker listenInfo.ip, listenInfo.announcedIp, listenInfo.port, + socketFlagsToFbs(listenInfo.flags), listenInfo.sendBufferSize, listenInfo.recvBufferSize) ); diff --git a/node/src/tests/test-PlainTransport.ts b/node/src/tests/test-PlainTransport.ts index 3e406c4c21..4bf6149c45 100644 --- a/node/src/tests/test-PlainTransport.ts +++ b/node/src/tests/test-PlainTransport.ts @@ -338,7 +338,7 @@ if (!IS_WINDOWS) protocol : 'udp', ip : multicastIp, port : port, - flags : [ mediasoup.types.TransportSocketFlag.UDP_REUSEPORT ] + flags : { udpReusePort: true } } }); @@ -349,7 +349,7 @@ if (!IS_WINDOWS) protocol : 'udp', ip : multicastIp, port : port, - flags : [ mediasoup.types.TransportSocketFlag.UDP_REUSEPORT ] + flags : { udpReusePort: true } } }); }).not.toThrow(); diff --git a/worker/fbs/transport.fbs b/worker/fbs/transport.fbs index 22415fb862..af84827109 100644 --- a/worker/fbs/transport.fbs +++ b/worker/fbs/transport.fbs @@ -13,9 +13,9 @@ enum Protocol: uint8 { TCP } -enum SocketFlag: uint8 { - IPV6ONLY = 1, - UDP_REUSEPORT = 2 +table SocketFlags { + ipv6_only: bool = false; + udp_reuse_port: bool = false; } table ListenInfo { @@ -23,7 +23,7 @@ table ListenInfo { ip: string (required); announced_ip: string; port: uint16 = 0; - flags: uint8 = 0; + flags: SocketFlags; send_buffer_size: uint32 = 0; recv_buffer_size: uint32 = 0; } diff --git a/worker/include/RTC/PortManager.hpp b/worker/include/RTC/PortManager.hpp index 775a7235d9..17294f24ca 100644 --- a/worker/include/RTC/PortManager.hpp +++ b/worker/include/RTC/PortManager.hpp @@ -3,6 +3,7 @@ #include "common.hpp" #include "Settings.hpp" +#include "RTC/Transport.hpp" #include #include #include @@ -20,19 +21,19 @@ namespace RTC }; public: - static uv_udp_t* BindUdp(std::string& ip, uint8_t flags) + static uv_udp_t* BindUdp(std::string& ip, RTC::Transport::SocketFlags& flags) { return reinterpret_cast(Bind(Transport::UDP, ip, flags)); } - static uv_udp_t* BindUdp(std::string& ip, uint16_t port, uint8_t flags) + static uv_udp_t* BindUdp(std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags) { return reinterpret_cast(Bind(Transport::UDP, ip, port, flags)); } - static uv_tcp_t* BindTcp(std::string& ip, uint8_t flags) + static uv_tcp_t* BindTcp(std::string& ip, RTC::Transport::SocketFlags& flags) { return reinterpret_cast(Bind(Transport::TCP, ip, flags)); } - static uv_tcp_t* BindTcp(std::string& ip, uint16_t port, uint8_t flags) + static uv_tcp_t* BindTcp(std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags) { return reinterpret_cast(Bind(Transport::TCP, ip, port, flags)); } @@ -46,11 +47,12 @@ namespace RTC } private: - static uv_handle_t* Bind(Transport transport, std::string& ip, uint8_t flags); - static uv_handle_t* Bind(Transport transport, std::string& ip, uint16_t port, uint8_t flags); + static uv_handle_t* Bind(Transport transport, std::string& ip, RTC::Transport::SocketFlags& flags); + static uv_handle_t* Bind( + Transport transport, std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags); static void Unbind(Transport transport, std::string& ip, uint16_t port); static std::vector& GetPorts(Transport transport, const std::string& ip); - static uint8_t ConvertSocketFlags(uint8_t flags); + static uint8_t ConvertSocketFlags(RTC::Transport::SocketFlags& flags); private: thread_local static absl::flat_hash_map> mapUdpIpPorts; diff --git a/worker/include/RTC/TcpServer.hpp b/worker/include/RTC/TcpServer.hpp index 102d85c2fc..6d1aeb7376 100644 --- a/worker/include/RTC/TcpServer.hpp +++ b/worker/include/RTC/TcpServer.hpp @@ -3,6 +3,7 @@ #include "common.hpp" #include "RTC/TcpConnection.hpp" +#include "RTC/Transport.hpp" #include "handles/TcpConnectionHandle.hpp" #include "handles/TcpServerHandle.hpp" #include @@ -24,13 +25,16 @@ namespace RTC public: TcpServer( - Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint8_t flags); + Listener* listener, + RTC::TcpConnection::Listener* connListener, + std::string& ip, + RTC::Transport::SocketFlags& flags); TcpServer( Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint16_t port, - uint8_t flags); + RTC::Transport::SocketFlags& flags); ~TcpServer() override; /* Pure virtual methods inherited from ::TcpServerHandle. */ diff --git a/worker/include/RTC/Transport.hpp b/worker/include/RTC/Transport.hpp index e5702f515d..95814a4920 100644 --- a/worker/include/RTC/Transport.hpp +++ b/worker/include/RTC/Transport.hpp @@ -119,14 +119,20 @@ namespace RTC }; public: + struct SocketFlags + { + bool ipv6Only{ false }; + bool udpReusePort{ false }; + }; + struct ListenInfo { std::string ip; std::string announcedIp; uint16_t port{ 0u }; + SocketFlags flags; uint32_t sendBufferSize{ 0u }; uint32_t recvBufferSize{ 0u }; - uint8_t flags{ 0u }; }; private: diff --git a/worker/include/RTC/UdpSocket.hpp b/worker/include/RTC/UdpSocket.hpp index aff4a5b8c3..ad9bde10d6 100644 --- a/worker/include/RTC/UdpSocket.hpp +++ b/worker/include/RTC/UdpSocket.hpp @@ -2,6 +2,7 @@ #define MS_RTC_UDP_SOCKET_HPP #include "common.hpp" +#include "RTC/Transport.hpp" #include "handles/UdpSocketHandle.hpp" #include @@ -21,8 +22,8 @@ namespace RTC }; public: - UdpSocket(Listener* listener, std::string& ip, uint8_t flags); - UdpSocket(Listener* listener, std::string& ip, uint16_t port, uint8_t flags); + UdpSocket(Listener* listener, std::string& ip, RTC::Transport::SocketFlags& flags); + UdpSocket(Listener* listener, std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags); ~UdpSocket() override; /* Pure virtual methods inherited from ::UdpSocketHandle. */ diff --git a/worker/src/RTC/PipeTransport.cpp b/worker/src/RTC/PipeTransport.cpp index ae0ea29231..8706f5a60b 100644 --- a/worker/src/RTC/PipeTransport.cpp +++ b/worker/src/RTC/PipeTransport.cpp @@ -50,10 +50,11 @@ namespace RTC this->listenInfo.announcedIp.assign(options->listenInfo()->announcedIp()->str()); } - this->listenInfo.port = options->listenInfo()->port(); - this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); - this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); - this->listenInfo.flags = options->listenInfo()->flags(); + this->listenInfo.port = options->listenInfo()->port(); + this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); + this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); + this->listenInfo.flags.ipv6Only = options->listenInfo()->flags()->ipv6Only(); + this->listenInfo.flags.udpReusePort = options->listenInfo()->flags()->udpReusePort(); this->rtx = options->enableRtx(); diff --git a/worker/src/RTC/PlainTransport.cpp b/worker/src/RTC/PlainTransport.cpp index 36d0fa53d8..672fe54cc2 100644 --- a/worker/src/RTC/PlainTransport.cpp +++ b/worker/src/RTC/PlainTransport.cpp @@ -52,10 +52,11 @@ namespace RTC this->listenInfo.announcedIp.assign(options->listenInfo()->announcedIp()->str()); } - this->listenInfo.port = options->listenInfo()->port(); - this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); - this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); - this->listenInfo.flags = options->listenInfo()->flags(); + this->listenInfo.port = options->listenInfo()->port(); + this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); + this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); + this->listenInfo.flags.ipv6Only = options->listenInfo()->flags()->ipv6Only(); + this->listenInfo.flags.udpReusePort = options->listenInfo()->flags()->udpReusePort(); this->rtcpMux = options->rtcpMux(); this->comedia = options->comedia(); @@ -84,7 +85,8 @@ namespace RTC this->rtcpListenInfo.port = options->rtcpListenInfo()->port(); this->rtcpListenInfo.sendBufferSize = options->rtcpListenInfo()->sendBufferSize(); this->rtcpListenInfo.recvBufferSize = options->rtcpListenInfo()->recvBufferSize(); - this->rtcpListenInfo.flags = options->rtcpListenInfo()->flags(); + this->rtcpListenInfo.flags.ipv6Only = options->rtcpListenInfo()->flags()->ipv6Only(); + this->rtcpListenInfo.flags.udpReusePort = options->rtcpListenInfo()->flags()->udpReusePort(); } // If rtcpListenInfo is not given, just clone listenInfo. else diff --git a/worker/src/RTC/PortManager.cpp b/worker/src/RTC/PortManager.cpp index 82eaee0e4f..051a318d45 100644 --- a/worker/src/RTC/PortManager.cpp +++ b/worker/src/RTC/PortManager.cpp @@ -7,7 +7,6 @@ #include "MediaSoupErrors.hpp" #include "Settings.hpp" #include "Utils.hpp" -#include "FBS/transport.h" #include // std:make_tuple() #include // std::piecewise_construct @@ -39,16 +38,13 @@ namespace RTC /* Class methods. */ - uv_handle_t* PortManager::Bind(Transport transport, std::string& ip, uint8_t flags) + uv_handle_t* PortManager::Bind(Transport transport, std::string& ip, RTC::Transport::SocketFlags& flags) { MS_TRACE(); // First normalize the IP. This may throw if invalid IP. Utils::IP::NormalizeIp(ip); - // Covert given flags into libuv flags. - flags = ConvertSocketFlags(flags); - int err; const int family = Utils::IP::GetFamily(ip); struct sockaddr_storage bindAddr; // NOLINT(cppcoreguidelines-pro-type-member-init) @@ -59,6 +55,7 @@ namespace RTC uv_handle_t* uvHandle{ nullptr }; uint16_t port; std::string transportStr; + uint8_t bitFlags = ConvertSocketFlags(flags); switch (transport) { @@ -232,7 +229,7 @@ namespace RTC err = uv_udp_bind( reinterpret_cast(uvHandle), reinterpret_cast(&bindAddr), - flags); + bitFlags); if (err != 0) { @@ -254,7 +251,7 @@ namespace RTC err = uv_tcp_bind( reinterpret_cast(uvHandle), reinterpret_cast(&bindAddr), - flags); + bitFlags); if (err != 0) { @@ -368,21 +365,20 @@ namespace RTC return static_cast(uvHandle); } - uv_handle_t* PortManager::Bind(Transport transport, std::string& ip, uint16_t port, uint8_t flags) + uv_handle_t* PortManager::Bind( + Transport transport, std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags) { MS_TRACE(); // First normalize the IP. This may throw if invalid IP. Utils::IP::NormalizeIp(ip); - // Covert given flags into libuv flags. - flags = ConvertSocketFlags(flags); - int err; const int family = Utils::IP::GetFamily(ip); struct sockaddr_storage bindAddr; // NOLINT(cppcoreguidelines-pro-type-member-init) uv_handle_t* uvHandle{ nullptr }; std::string transportStr; + uint8_t bitFlags = ConvertSocketFlags(flags); switch (transport) { @@ -504,7 +500,7 @@ namespace RTC err = uv_udp_bind( reinterpret_cast(uvHandle), reinterpret_cast(&bindAddr), - flags); + bitFlags); if (err != 0) { @@ -527,7 +523,7 @@ namespace RTC err = uv_tcp_bind( reinterpret_cast(uvHandle), reinterpret_cast(&bindAddr), - flags); + bitFlags); if (err != 0) { @@ -696,23 +692,23 @@ namespace RTC return emptyPorts; } - uint8_t PortManager::ConvertSocketFlags(uint8_t flags) + uint8_t PortManager::ConvertSocketFlags(RTC::Transport::SocketFlags& flags) { MS_TRACE(); - uint8_t newFlags{ 0u }; + uint8_t bitFlags{ 0b00000000 }; - if (flags & static_cast(FBS::Transport::SocketFlag::IPV6ONLY)) + if (flags.ipv6Only) { - newFlags |= UV_UDP_IPV6ONLY; - newFlags |= UV_TCP_IPV6ONLY; // Same flag number but anyway. + bitFlags |= UV_UDP_IPV6ONLY; + bitFlags |= UV_TCP_IPV6ONLY; // Same flag number but anyway. } - if (flags & static_cast(FBS::Transport::SocketFlag::UDP_REUSEPORT)) + if (flags.udpReusePort) { - newFlags |= UV_UDP_REUSEADDR; + bitFlags |= UV_UDP_REUSEADDR; } - return newFlags; + return bitFlags; } } // namespace RTC diff --git a/worker/src/RTC/TcpServer.cpp b/worker/src/RTC/TcpServer.cpp index b007fad3c0..0cef5aa800 100644 --- a/worker/src/RTC/TcpServer.cpp +++ b/worker/src/RTC/TcpServer.cpp @@ -11,7 +11,10 @@ namespace RTC /* Instance methods. */ TcpServer::TcpServer( - Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint8_t flags) + Listener* listener, + RTC::TcpConnection::Listener* connListener, + std::string& ip, + RTC::Transport::SocketFlags& flags) : // This may throw. ::TcpServerHandle::TcpServerHandle(RTC::PortManager::BindTcp(ip, flags)), listener(listener), connListener(connListener) @@ -24,7 +27,7 @@ namespace RTC RTC::TcpConnection::Listener* connListener, std::string& ip, uint16_t port, - uint8_t flags) + RTC::Transport::SocketFlags& flags) : // This may throw. ::TcpServerHandle::TcpServerHandle(RTC::PortManager::BindTcp(ip, port, flags)), listener(listener), connListener(connListener), fixedPort(true) diff --git a/worker/src/RTC/UdpSocket.cpp b/worker/src/RTC/UdpSocket.cpp index f340fd86f2..b599620f25 100644 --- a/worker/src/RTC/UdpSocket.cpp +++ b/worker/src/RTC/UdpSocket.cpp @@ -10,14 +10,15 @@ namespace RTC { /* Instance methods. */ - UdpSocket::UdpSocket(Listener* listener, std::string& ip, uint8_t flags) + UdpSocket::UdpSocket(Listener* listener, std::string& ip, RTC::Transport::SocketFlags& flags) : // This may throw. ::UdpSocketHandle::UdpSocketHandle(PortManager::BindUdp(ip, flags)), listener(listener) { MS_TRACE(); } - UdpSocket::UdpSocket(Listener* listener, std::string& ip, uint16_t port, uint8_t flags) + UdpSocket::UdpSocket( + Listener* listener, std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags) : // This may throw. ::UdpSocketHandle::UdpSocketHandle(PortManager::BindUdp(ip, port, flags)), listener(listener), fixedPort(true) diff --git a/worker/src/RTC/WebRtcServer.cpp b/worker/src/RTC/WebRtcServer.cpp index c89a64ddfb..a1f234c9db 100644 --- a/worker/src/RTC/WebRtcServer.cpp +++ b/worker/src/RTC/WebRtcServer.cpp @@ -60,6 +60,11 @@ namespace RTC announcedIp = listenInfo->announcedIp()->str(); } + RTC::Transport::SocketFlags flags; + + flags.ipv6Only = listenInfo->flags()->ipv6Only(); + flags.udpReusePort = listenInfo->flags()->udpReusePort(); + if (listenInfo->protocol() == FBS::Transport::Protocol::UDP) { // This may throw. @@ -67,11 +72,11 @@ namespace RTC if (listenInfo->port() != 0) { - udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port(), listenInfo->flags()); + udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port(), flags); } else { - udpSocket = new RTC::UdpSocket(this, ip, listenInfo->flags()); + udpSocket = new RTC::UdpSocket(this, ip, flags); } this->udpSocketOrTcpServers.emplace_back(udpSocket, nullptr, announcedIp); @@ -99,11 +104,11 @@ namespace RTC if (listenInfo->port() != 0) { - tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port(), listenInfo->flags()); + tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port(), flags); } else { - tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->flags()); + tcpServer = new RTC::TcpServer(this, this, ip, flags); } this->udpSocketOrTcpServers.emplace_back(nullptr, tcpServer, announcedIp); diff --git a/worker/src/RTC/WebRtcTransport.cpp b/worker/src/RTC/WebRtcTransport.cpp index 9b897649b0..6eb1f9717d 100644 --- a/worker/src/RTC/WebRtcTransport.cpp +++ b/worker/src/RTC/WebRtcTransport.cpp @@ -62,6 +62,11 @@ namespace RTC announcedIp = listenInfo->announcedIp()->str(); } + RTC::Transport::SocketFlags flags; + + flags.ipv6Only = listenInfo->flags()->ipv6Only(); + flags.udpReusePort = listenInfo->flags()->udpReusePort(); + const uint16_t iceLocalPreference = IceCandidateDefaultLocalPriority - iceLocalPreferenceDecrement; const uint32_t icePriority = generateIceCandidatePriority(iceLocalPreference); @@ -72,11 +77,11 @@ namespace RTC if (listenInfo->port() != 0) { - udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port(), listenInfo->flags()); + udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port(), flags); } else { - udpSocket = new RTC::UdpSocket(this, ip, listenInfo->flags()); + udpSocket = new RTC::UdpSocket(this, ip, flags); } this->udpSockets[udpSocket] = announcedIp; @@ -114,11 +119,11 @@ namespace RTC if (listenInfo->port() != 0) { - tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port(), listenInfo->flags()); + tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port(), flags); } else { - tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->flags()); + tcpServer = new RTC::TcpServer(this, this, ip, flags); } this->tcpServers[tcpServer] = announcedIp; From 491b8c859688a91dbdcaf6402368409da116c6a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 2 Jan 2024 17:15:27 +0100 Subject: [PATCH 05/16] Revert "Rust WIP" This reverts commit 6e785dc650a104d0325e84bbdb5ac718f04be3ba. --- rust/src/data_structures.rs | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/rust/src/data_structures.rs b/rust/src/data_structures.rs index 6f66113ad2..b4d41a6167 100644 --- a/rust/src/data_structures.rs +++ b/rust/src/data_structures.rs @@ -62,9 +62,6 @@ pub struct ListenInfo { /// Listening port. #[serde(skip_serializing_if = "Option::is_none")] pub port: Option, - /// Socket flags. - #[serde(skip_serializing_if = "Option::is_none")] - pub flags: Option>, /// Send buffer size (bytes). #[serde(skip_serializing_if = "Option::is_none")] pub send_buffer_size: Option, @@ -193,26 +190,6 @@ impl Protocol { } } -/// Transport socket flag. -#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)] -#[serde(rename_all = "lowercase")] -pub enum SocketFlag { - /// Disable dual-stack support so only IPv6 is used (only if ip is IPv6). - IpV6Only, - /// Make different transports bind to the same ip and port (only for UDP). - /// Useful for multicast scenarios with plain transport. Use with caution. - UdpReusePort, -} - -impl SocketFlag { - pub(crate) fn to_fbs(self) -> transport::SocketFlag { - match self { - SocketFlag::IpV6Only => transport::SocketFlag::IPV6ONLY, - SocketFlag::UdpReusePort => transport::SocketFlag::UDP_REUSEPORT, - } - } -} - /// ICE candidate #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] From 395dcd0fa1bc328fde610dcff9e23598458d8669 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 2 Jan 2024 18:24:30 +0100 Subject: [PATCH 06/16] Rust WIP 2: almost done but tests die --- rust/examples/echo.rs | 1 + rust/examples/multiopus.rs | 2 ++ rust/examples/svc-simulcast.rs | 1 + rust/examples/videoroom.rs | 1 + rust/src/data_structures.rs | 24 ++++++++++++++++++++++ rust/src/router.rs | 8 ++++++++ rust/src/router/consumer/tests.rs | 1 + rust/src/router/data_consumer/tests.rs | 3 +++ rust/src/router/data_producer/tests.rs | 1 + rust/src/router/pipe_transport/tests.rs | 1 + rust/src/router/plain_transport/tests.rs | 1 + rust/src/router/producer/tests.rs | 1 + rust/src/router/webrtc_transport/tests.rs | 5 +++++ rust/src/webrtc_server/tests.rs | 1 + rust/tests/integration/consumer.rs | 1 + rust/tests/integration/data_consumer.rs | 4 ++++ rust/tests/integration/data_producer.rs | 2 ++ rust/tests/integration/multiopus.rs | 1 + rust/tests/integration/pipe_transport.rs | 7 +++++++ rust/tests/integration/plain_transport.rs | 12 +++++++++++ rust/tests/integration/producer.rs | 1 + rust/tests/integration/smoke.rs | 2 ++ rust/tests/integration/webrtc_server.rs | 11 ++++++++++ rust/tests/integration/webrtc_transport.rs | 17 +++++++++++++++ worker/src/RTC/PlainTransport.cpp | 10 +++++++++ 25 files changed, 119 insertions(+) diff --git a/rust/examples/echo.rs b/rust/examples/echo.rs index fd8027d83c..c64ed714d4 100644 --- a/rust/examples/echo.rs +++ b/rust/examples/echo.rs @@ -187,6 +187,7 @@ impl EchoConnection { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/examples/multiopus.rs b/rust/examples/multiopus.rs index 341c9d8a0c..a4f2f7be02 100644 --- a/rust/examples/multiopus.rs +++ b/rust/examples/multiopus.rs @@ -150,6 +150,7 @@ impl EchoConnection { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -235,6 +236,7 @@ impl EchoConnection { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), diff --git a/rust/examples/svc-simulcast.rs b/rust/examples/svc-simulcast.rs index 3cfd318b9b..959d7ce056 100644 --- a/rust/examples/svc-simulcast.rs +++ b/rust/examples/svc-simulcast.rs @@ -207,6 +207,7 @@ impl SvcSimulcastConnection { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/examples/videoroom.rs b/rust/examples/videoroom.rs index ef818d8be2..ff7a6a0794 100644 --- a/rust/examples/videoroom.rs +++ b/rust/examples/videoroom.rs @@ -501,6 +501,7 @@ mod participant { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/src/data_structures.rs b/rust/src/data_structures.rs index b4d41a6167..7e67ac01c1 100644 --- a/rust/src/data_structures.rs +++ b/rust/src/data_structures.rs @@ -62,6 +62,9 @@ pub struct ListenInfo { /// Listening port. #[serde(skip_serializing_if = "Option::is_none")] pub port: Option, + /// Socket flags. + #[serde(skip_serializing_if = "Option::is_none")] + pub flags: Option, /// Send buffer size (bytes). #[serde(skip_serializing_if = "Option::is_none")] pub send_buffer_size: Option, @@ -80,12 +83,33 @@ impl ListenInfo { ip: self.ip.to_string(), announced_ip: self.announced_ip.map(|ip| ip.to_string()), port: self.port.unwrap_or(0), + flags: self.flags.map(|flags| Box::new(flags.to_fbs())), send_buffer_size: self.send_buffer_size.unwrap_or(0), recv_buffer_size: self.recv_buffer_size.unwrap_or(0), } } } +/// UDP/TCP socket flags. +#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SocketFlags { + /// Disable dual-stack support so only IPv6 is used (only if ip is IPv6). + pub ipv6_only: bool, + /// Make different transports bind to the same ip and port (only for UDP). + /// Useful for multicast scenarios with plain transport. Use with caution. + pub udp_reuse_port: bool, +} + +impl SocketFlags { + pub(crate) fn to_fbs(self) -> transport::SocketFlags { + transport::SocketFlags { + ipv6_only: self.ipv6_only, + udp_reuse_port: self.udp_reuse_port, + } + } +} + /// ICE role. #[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] diff --git a/rust/src/router.rs b/rust/src/router.rs index c2d09a0228..931593932c 100644 --- a/rust/src/router.rs +++ b/rust/src/router.rs @@ -147,6 +147,7 @@ impl PipeToRouterOptions { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }, @@ -609,6 +610,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// }, @@ -696,6 +698,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// })) @@ -761,6 +764,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// })) @@ -971,6 +975,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// }, @@ -1013,6 +1018,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// }, @@ -1198,6 +1204,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// }, @@ -1229,6 +1236,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// }, diff --git a/rust/src/router/consumer/tests.rs b/rust/src/router/consumer/tests.rs index 8f4fe2a74a..94a5b2f9b6 100644 --- a/rust/src/router/consumer/tests.rs +++ b/rust/src/router/consumer/tests.rs @@ -87,6 +87,7 @@ async fn init() -> (Router, WebRtcTransport, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/src/router/data_consumer/tests.rs b/rust/src/router/data_consumer/tests.rs index 7ef9a80588..64a2dd884f 100644 --- a/rust/src/router/data_consumer/tests.rs +++ b/rust/src/router/data_consumer/tests.rs @@ -41,6 +41,7 @@ async fn init() -> (Router, DataProducer) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); @@ -74,6 +75,7 @@ fn data_producer_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -127,6 +129,7 @@ fn transport_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/src/router/data_producer/tests.rs b/rust/src/router/data_producer/tests.rs index 3bbb34db02..52b585b162 100644 --- a/rust/src/router/data_producer/tests.rs +++ b/rust/src/router/data_producer/tests.rs @@ -41,6 +41,7 @@ async fn init() -> (Router, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/src/router/pipe_transport/tests.rs b/rust/src/router/pipe_transport/tests.rs index 54a1f22a48..4e57984bcf 100644 --- a/rust/src/router/pipe_transport/tests.rs +++ b/rust/src/router/pipe_transport/tests.rs @@ -103,6 +103,7 @@ async fn init() -> (Router, Router, WebRtcTransport, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/src/router/plain_transport/tests.rs b/rust/src/router/plain_transport/tests.rs index fcff706c5f..a0f0af8d16 100644 --- a/rust/src/router/plain_transport/tests.rs +++ b/rust/src/router/plain_transport/tests.rs @@ -42,6 +42,7 @@ fn router_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/src/router/producer/tests.rs b/rust/src/router/producer/tests.rs index 92533c3c32..21e18c21d7 100644 --- a/rust/src/router/producer/tests.rs +++ b/rust/src/router/producer/tests.rs @@ -72,6 +72,7 @@ async fn init() -> (Router, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/src/router/webrtc_transport/tests.rs b/rust/src/router/webrtc_transport/tests.rs index 42d54678ee..fe8a81de43 100644 --- a/rust/src/router/webrtc_transport/tests.rs +++ b/rust/src/router/webrtc_transport/tests.rs @@ -57,6 +57,7 @@ fn create_with_webrtc_server_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -65,6 +66,7 @@ fn create_with_webrtc_server_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port2), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -233,6 +235,7 @@ fn router_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -274,6 +277,7 @@ fn webrtc_server_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -282,6 +286,7 @@ fn webrtc_server_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port2), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/src/webrtc_server/tests.rs b/rust/src/webrtc_server/tests.rs index a07c32c648..95f257bc77 100644 --- a/rust/src/webrtc_server/tests.rs +++ b/rust/src/webrtc_server/tests.rs @@ -38,6 +38,7 @@ fn worker_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port), + flags: None, send_buffer_size: None, recv_buffer_size: None, }, diff --git a/rust/tests/integration/consumer.rs b/rust/tests/integration/consumer.rs index 8a4746365b..54338b00a2 100644 --- a/rust/tests/integration/consumer.rs +++ b/rust/tests/integration/consumer.rs @@ -352,6 +352,7 @@ async fn init() -> ( ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/tests/integration/data_consumer.rs b/rust/tests/integration/data_consumer.rs index d18e21d540..8ae3ed3cf1 100644 --- a/rust/tests/integration/data_consumer.rs +++ b/rust/tests/integration/data_consumer.rs @@ -68,6 +68,7 @@ async fn init() -> (Worker, Router, WebRtcTransport, DataProducer) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); @@ -99,6 +100,7 @@ fn consume_data_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -208,6 +210,7 @@ fn weak() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -572,6 +575,7 @@ fn close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/tests/integration/data_producer.rs b/rust/tests/integration/data_producer.rs index e753ff5eb5..31516da931 100644 --- a/rust/tests/integration/data_producer.rs +++ b/rust/tests/integration/data_producer.rs @@ -54,6 +54,7 @@ async fn init() -> (Worker, Router, WebRtcTransport, PlainTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); @@ -72,6 +73,7 @@ async fn init() -> (Worker, Router, WebRtcTransport, PlainTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/tests/integration/multiopus.rs b/rust/tests/integration/multiopus.rs index 650b450cd0..f05170401e 100644 --- a/rust/tests/integration/multiopus.rs +++ b/rust/tests/integration/multiopus.rs @@ -135,6 +135,7 @@ async fn init() -> (Router, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/tests/integration/pipe_transport.rs b/rust/tests/integration/pipe_transport.rs index 242c37a483..32a8170e96 100644 --- a/rust/tests/integration/pipe_transport.rs +++ b/rust/tests/integration/pipe_transport.rs @@ -261,6 +261,7 @@ async fn init() -> ( ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); @@ -605,6 +606,7 @@ fn weak() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -639,6 +641,7 @@ fn create_with_fixed_port_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port), + flags: None, send_buffer_size: None, recv_buffer_size: None, }) @@ -662,6 +665,7 @@ fn create_with_enable_rtx_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -774,6 +778,7 @@ fn create_with_enable_srtp_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -829,6 +834,7 @@ fn create_with_invalid_srtp_parameters_fails() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })) @@ -1155,6 +1161,7 @@ fn pipe_to_router_called_twice_generates_single_pair() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/tests/integration/plain_transport.rs b/rust/tests/integration/plain_transport.rs index c968c395b5..9fd95bcdb7 100644 --- a/rust/tests/integration/plain_transport.rs +++ b/rust/tests/integration/plain_transport.rs @@ -94,6 +94,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -132,6 +133,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -205,6 +207,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -215,6 +218,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(rtcp_port), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -282,6 +286,7 @@ fn create_with_fixed_port_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: Some(port), + flags: None, send_buffer_size: None, recv_buffer_size: None, }) @@ -305,6 +310,7 @@ fn weak() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -338,6 +344,7 @@ fn create_enable_srtp_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -403,6 +410,7 @@ fn create_non_bindable_ip() { ip: "8.8.8.8".parse().unwrap(), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })) @@ -424,6 +432,7 @@ fn get_stats_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -481,6 +490,7 @@ fn connect_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -556,6 +566,7 @@ fn connect_wrong_arguments() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -596,6 +607,7 @@ fn close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/tests/integration/producer.rs b/rust/tests/integration/producer.rs index 7b05c34083..909d5cbac4 100644 --- a/rust/tests/integration/producer.rs +++ b/rust/tests/integration/producer.rs @@ -207,6 +207,7 @@ async fn init() -> (Worker, Router, WebRtcTransport, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/tests/integration/smoke.rs b/rust/tests/integration/smoke.rs index 2f1a46f658..7008ad5e83 100644 --- a/rust/tests/integration/smoke.rs +++ b/rust/tests/integration/smoke.rs @@ -88,6 +88,7 @@ fn smoke() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); @@ -274,6 +275,7 @@ fn smoke() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/tests/integration/webrtc_server.rs b/rust/tests/integration/webrtc_server.rs index f02cb765be..76b25a8dc7 100644 --- a/rust/tests/integration/webrtc_server.rs +++ b/rust/tests/integration/webrtc_server.rs @@ -64,6 +64,7 @@ fn create_webrtc_server_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -72,6 +73,7 @@ fn create_webrtc_server_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))), port: Some(port2), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -158,6 +160,7 @@ fn create_webrtc_server_without_specifying_port_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -166,6 +169,7 @@ fn create_webrtc_server_without_specifying_port_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -239,6 +243,7 @@ fn unavailable_infos_fails() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -247,6 +252,7 @@ fn unavailable_infos_fails() { ip: IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), announced_ip: None, port: Some(port2), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -270,6 +276,7 @@ fn unavailable_infos_fails() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -278,6 +285,7 @@ fn unavailable_infos_fails() { ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), announced_ip: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))), port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -301,6 +309,7 @@ fn unavailable_infos_fails() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }, @@ -315,6 +324,7 @@ fn unavailable_infos_fails() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }, @@ -343,6 +353,7 @@ fn close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port), + flags: None, send_buffer_size: None, recv_buffer_size: None, }, diff --git a/rust/tests/integration/webrtc_transport.rs b/rust/tests/integration/webrtc_transport.rs index e8c45b6761..9eec456aa4 100644 --- a/rust/tests/integration/webrtc_transport.rs +++ b/rust/tests/integration/webrtc_transport.rs @@ -100,6 +100,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -137,6 +138,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }, @@ -145,6 +147,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED), announced_ip: Some("9.9.9.2".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }, @@ -153,6 +156,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }, @@ -263,6 +267,7 @@ fn create_with_fixed_port_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: Some(port), + flags: None, send_buffer_size: None, recv_buffer_size: None, })) @@ -286,6 +291,7 @@ fn weak() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -316,6 +322,7 @@ fn create_non_bindable_ip() { ip: "8.8.8.8".parse().unwrap(), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, },) @@ -338,6 +345,7 @@ fn get_stats_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -389,6 +397,7 @@ fn connect_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -438,6 +447,7 @@ fn set_max_incoming_bitrate_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -470,6 +480,7 @@ fn set_max_outgoing_bitrate_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -502,6 +513,7 @@ fn set_min_outgoing_bitrate_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -534,6 +546,7 @@ fn set_max_outgoing_bitrate_fails_if_value_is_lower_than_current_min_limit() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -571,6 +584,7 @@ fn set_min_outgoing_bitrate_fails_if_value_is_higher_than_current_max_limit() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -608,6 +622,7 @@ fn restart_ice_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -643,6 +658,7 @@ fn enable_trace_event_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -718,6 +734,7 @@ fn close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), diff --git a/worker/src/RTC/PlainTransport.cpp b/worker/src/RTC/PlainTransport.cpp index 672fe54cc2..4105f143d0 100644 --- a/worker/src/RTC/PlainTransport.cpp +++ b/worker/src/RTC/PlainTransport.cpp @@ -52,6 +52,16 @@ namespace RTC this->listenInfo.announcedIp.assign(options->listenInfo()->announcedIp()->str()); } + if (flatbuffers::IsFieldPresent( + options->listenInfo(), FBS::Transport::ListenInfo::VT_FLAGS)) + { + MS_DUMP("---- PRESENT !!!!!!"); + } + else + { + MS_DUMP("---- NOT PRESENT !!!!!!"); + } + this->listenInfo.port = options->listenInfo()->port(); this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); From 722fe2e09df29c024f7b09b9476caef836f1c8ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 2 Jan 2024 20:10:56 +0100 Subject: [PATCH 07/16] transport.fbs: Make `flags: SocketFlags` required --- node/src/tests/test-PlainTransport.ts | 39 ++++++++- rust/src/data_structures.rs | 16 ++-- rust/tests/integration/plain_transport.rs | 98 ++++++++++++++++++++++- worker/fbs/transport.fbs | 2 +- worker/src/RTC/PlainTransport.cpp | 10 --- 5 files changed, 146 insertions(+), 19 deletions(-) diff --git a/node/src/tests/test-PlainTransport.ts b/node/src/tests/test-PlainTransport.ts index 4bf6149c45..962aee11a4 100644 --- a/node/src/tests/test-PlainTransport.ts +++ b/node/src/tests/test-PlainTransport.ts @@ -321,7 +321,7 @@ test('router.createPlainTransport() with non bindable IP rejects with Error', as if (!IS_WINDOWS) { - test('two transports listening in same IP:port succeed if UDP_REUSEPORT flag is set', async () => + test('two transports binding to the same IP:port with udpReusePort flag succeed', async () => { let transport1: mediasoup.types.PlainTransport | undefined; let transport2: mediasoup.types.PlainTransport | undefined; @@ -357,6 +357,43 @@ if (!IS_WINDOWS) transport1?.close(); transport2?.close(); }, 2000); + + test('two transports binding to the same IP:port without udpReusePort flag fails', async () => + { + let transport1: mediasoup.types.PlainTransport | undefined; + let transport2: mediasoup.types.PlainTransport | undefined; + + await expect(async () => + { + const multicastIp = '224.0.0.1'; + const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 }); + + transport1 = await router.createPlainTransport( + { + listenInfo : + { + protocol : 'udp', + ip : multicastIp, + port : port, + flags : { udpReusePort: false } + } + }); + + transport2 = await router.createPlainTransport( + { + listenInfo : + { + protocol : 'udp', + ip : multicastIp, + port : port, + flags : { udpReusePort: false } + } + }); + }).rejects.toThrow(); + + transport1?.close(); + transport2?.close(); + }, 2000); } test('plainTransport.getStats() succeeds', async () => diff --git a/rust/src/data_structures.rs b/rust/src/data_structures.rs index 7e67ac01c1..9d2e005692 100644 --- a/rust/src/data_structures.rs +++ b/rust/src/data_structures.rs @@ -83,7 +83,7 @@ impl ListenInfo { ip: self.ip.to_string(), announced_ip: self.announced_ip.map(|ip| ip.to_string()), port: self.port.unwrap_or(0), - flags: self.flags.map(|flags| Box::new(flags.to_fbs())), + flags: Box::new(self.flags.unwrap_or_default().to_fbs()), send_buffer_size: self.send_buffer_size.unwrap_or(0), recv_buffer_size: self.recv_buffer_size.unwrap_or(0), } @@ -91,21 +91,25 @@ impl ListenInfo { } /// UDP/TCP socket flags. -#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)] +#[derive( + Default, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize, +)] #[serde(rename_all = "camelCase")] pub struct SocketFlags { /// Disable dual-stack support so only IPv6 is used (only if ip is IPv6). - pub ipv6_only: bool, + /// Defaults to false. + pub ipv6_only: Option, /// Make different transports bind to the same ip and port (only for UDP). /// Useful for multicast scenarios with plain transport. Use with caution. - pub udp_reuse_port: bool, + /// Defaults to false. + pub udp_reuse_port: Option, } impl SocketFlags { pub(crate) fn to_fbs(self) -> transport::SocketFlags { transport::SocketFlags { - ipv6_only: self.ipv6_only, - udp_reuse_port: self.udp_reuse_port, + ipv6_only: self.ipv6_only.unwrap_or_else(false), + udp_reuse_port: self.udp_reuse_port.unwrap_or_else(Some(false)), } } } diff --git a/rust/tests/integration/plain_transport.rs b/rust/tests/integration/plain_transport.rs index 9fd95bcdb7..9567e6dc69 100644 --- a/rust/tests/integration/plain_transport.rs +++ b/rust/tests/integration/plain_transport.rs @@ -1,6 +1,8 @@ use futures_lite::future; use hash_hasher::HashedSet; -use mediasoup::data_structures::{AppData, ListenInfo, Protocol, SctpState, TransportTuple}; +use mediasoup::data_structures::{ + AppData, ListenInfo, Protocol, SctpState, SocketFlags, TransportTuple, +}; use mediasoup::plain_transport::{PlainTransportOptions, PlainTransportRemoteParameters}; use mediasoup::prelude::*; use mediasoup::router::{Router, RouterOptions}; @@ -420,6 +422,100 @@ fn create_non_bindable_ip() { }); } +#[cfg(not(target_os = "windows"))] +#[test] +fn create_two_transports_binding_to_same_ip_port_with_udp_reuse_port_flag_succeed() { + future::block_on(async move { + let (_worker, router) = init().await; + + let multicast_ip = "224.0.0.1".parse().unwrap(); + let port = pick_unused_port().unwrap(); + + // Transport 1. + let _ = router + .create_plain_transport({ + PlainTransportOptions::new(ListenInfo { + protocol: Protocol::Udp, + ip: multicast_ip, + announced_ip: None, + port: Some(port), + flags: Some(SocketFlags { + udp_reuse_port: true, + }), + send_buffer_size: None, + recv_buffer_size: None, + }) + }) + .await + .expect("Failed to create first Plain transport"); + + // Transport 2. + let _ = router + .create_plain_transport({ + PlainTransportOptions::new(ListenInfo { + protocol: Protocol::Udp, + ip: multicast_ip, + announced_ip: None, + port: Some(port), + flags: Some(SocketFlags { + udp_reuse_port: true, + }), + send_buffer_size: None, + recv_buffer_size: None, + }) + }) + .await + .expect("Failed to create second Plain transport"); + }); +} + +#[cfg(not(target_os = "windows"))] +#[test] +fn create_two_transports_binding_to_same_ip_port_without_udp_reuse_port_flag_fails() { + future::block_on(async move { + let (_worker, router) = init().await; + + let multicast_ip = "224.0.0.1".parse().unwrap(); + let port = pick_unused_port().unwrap(); + + // Transport 1. + let _ = router + .create_plain_transport({ + PlainTransportOptions::new(ListenInfo { + protocol: Protocol::Udp, + ip: multicast_ip, + announced_ip: None, + port: Some(port), + flags: Some(SocketFlags { + udp_reuse_port: false, + }), + send_buffer_size: None, + recv_buffer_size: None, + }) + }) + .await + .expect("Failed to create first Plain transport"); + + // Transport 2. + assert!(matches!( + router + .create_plain_transport(PlainTransportOptions::new(ListenInfo { + protocol: Protocol::Udp, + ip: multicast_ip, + announced_ip: None, + port: Some(port), + flags: Some(SocketFlags { + udp_reuse_port: false, + }), + send_buffer_size: None, + recv_buffer_size: None, + })) + .await, + Err(RequestError::Response { .. }), + )); + }); +} + #[test] fn get_stats_succeeds() { future::block_on(async move { diff --git a/worker/fbs/transport.fbs b/worker/fbs/transport.fbs index af84827109..001661b89f 100644 --- a/worker/fbs/transport.fbs +++ b/worker/fbs/transport.fbs @@ -23,7 +23,7 @@ table ListenInfo { ip: string (required); announced_ip: string; port: uint16 = 0; - flags: SocketFlags; + flags: SocketFlags (required); send_buffer_size: uint32 = 0; recv_buffer_size: uint32 = 0; } diff --git a/worker/src/RTC/PlainTransport.cpp b/worker/src/RTC/PlainTransport.cpp index 4105f143d0..672fe54cc2 100644 --- a/worker/src/RTC/PlainTransport.cpp +++ b/worker/src/RTC/PlainTransport.cpp @@ -52,16 +52,6 @@ namespace RTC this->listenInfo.announcedIp.assign(options->listenInfo()->announcedIp()->str()); } - if (flatbuffers::IsFieldPresent( - options->listenInfo(), FBS::Transport::ListenInfo::VT_FLAGS)) - { - MS_DUMP("---- PRESENT !!!!!!"); - } - else - { - MS_DUMP("---- NOT PRESENT !!!!!!"); - } - this->listenInfo.port = options->listenInfo()->port(); this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); From b543631759a38c23d49d277b5098297a3e52b24a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 2 Jan 2024 20:12:48 +0100 Subject: [PATCH 08/16] undo --- rust/src/data_structures.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/src/data_structures.rs b/rust/src/data_structures.rs index 9d2e005692..0e22c9311f 100644 --- a/rust/src/data_structures.rs +++ b/rust/src/data_structures.rs @@ -108,8 +108,8 @@ pub struct SocketFlags { impl SocketFlags { pub(crate) fn to_fbs(self) -> transport::SocketFlags { transport::SocketFlags { - ipv6_only: self.ipv6_only.unwrap_or_else(false), - udp_reuse_port: self.udp_reuse_port.unwrap_or_else(Some(false)), + ipv6_only: self.ipv6_only, + udp_reuse_port: self.udp_reuse_port, } } } From 7fac1c8234127c2f3e7cb6e0b14be2c5da07951c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 2 Jan 2024 20:23:45 +0100 Subject: [PATCH 09/16] Make keys required --- rust/src/data_structures.rs | 4 ++-- rust/tests/integration/plain_transport.rs | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/rust/src/data_structures.rs b/rust/src/data_structures.rs index 0e22c9311f..07fb5247af 100644 --- a/rust/src/data_structures.rs +++ b/rust/src/data_structures.rs @@ -98,11 +98,11 @@ impl ListenInfo { pub struct SocketFlags { /// Disable dual-stack support so only IPv6 is used (only if ip is IPv6). /// Defaults to false. - pub ipv6_only: Option, + pub ipv6_only: bool, /// Make different transports bind to the same ip and port (only for UDP). /// Useful for multicast scenarios with plain transport. Use with caution. /// Defaults to false. - pub udp_reuse_port: Option, + pub udp_reuse_port: bool, } impl SocketFlags { diff --git a/rust/tests/integration/plain_transport.rs b/rust/tests/integration/plain_transport.rs index 9567e6dc69..c132724b81 100644 --- a/rust/tests/integration/plain_transport.rs +++ b/rust/tests/integration/plain_transport.rs @@ -440,6 +440,7 @@ fn create_two_transports_binding_to_same_ip_port_with_udp_reuse_port_flag_succee announced_ip: None, port: Some(port), flags: Some(SocketFlags { + ipv6_only: false, udp_reuse_port: true, }), send_buffer_size: None, @@ -458,6 +459,7 @@ fn create_two_transports_binding_to_same_ip_port_with_udp_reuse_port_flag_succee announced_ip: None, port: Some(port), flags: Some(SocketFlags { + ipv6_only: false, udp_reuse_port: true, }), send_buffer_size: None, @@ -487,6 +489,7 @@ fn create_two_transports_binding_to_same_ip_port_without_udp_reuse_port_flag_fai announced_ip: None, port: Some(port), flags: Some(SocketFlags { + ipv6_only: false, udp_reuse_port: false, }), send_buffer_size: None, @@ -505,6 +508,7 @@ fn create_two_transports_binding_to_same_ip_port_without_udp_reuse_port_flag_fai announced_ip: None, port: Some(port), flags: Some(SocketFlags { + ipv6_only: false, udp_reuse_port: false, }), send_buffer_size: None, From f6f03fa78f060f84e85093f849f75ad24fc919cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 2 Jan 2024 20:44:53 +0100 Subject: [PATCH 10/16] cosmetic --- node/src/tests/test-PlainTransport.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/src/tests/test-PlainTransport.ts b/node/src/tests/test-PlainTransport.ts index 962aee11a4..554d07444a 100644 --- a/node/src/tests/test-PlainTransport.ts +++ b/node/src/tests/test-PlainTransport.ts @@ -370,7 +370,7 @@ if (!IS_WINDOWS) transport1 = await router.createPlainTransport( { - listenInfo : + listenInfo : { protocol : 'udp', ip : multicastIp, @@ -381,7 +381,7 @@ if (!IS_WINDOWS) transport2 = await router.createPlainTransport( { - listenInfo : + listenInfo : { protocol : 'udp', ip : multicastIp, From 385b6bed5391eb9f30e997c14929bc28fd3bd2a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 2 Jan 2024 20:53:49 +0100 Subject: [PATCH 11/16] Add missing flags --- rust/benches/producer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/benches/producer.rs b/rust/benches/producer.rs index a6a5e87c64..c9b98870dc 100644 --- a/rust/benches/producer.rs +++ b/rust/benches/producer.rs @@ -67,6 +67,7 @@ async fn init() -> (Worker, Router, WebRtcTransport, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); From 34956b230b2d8425f01a7b0ec67326970a5bf56e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 2 Jan 2024 20:57:02 +0100 Subject: [PATCH 12/16] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f24bb46fc8..0f01d306c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### NEXT * Avoid modification of user input data ([PR #1285](https://github.com/versatica/mediasoup/pull/1285)). +* `ListenInfo`: Add transport socket flags ([PR #1291](https://github.com/versatica/mediasoup/pull/1291)). ### 3.13.13 From 576ebcd00eeb33c750665525ea7d144c910c5e51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 2 Jan 2024 21:11:38 +0100 Subject: [PATCH 13/16] cosmetic --- .github/workflows/mediasoup-worker-prebuild.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/mediasoup-worker-prebuild.yaml b/.github/workflows/mediasoup-worker-prebuild.yaml index b399fbe89d..1291c95adc 100644 --- a/.github/workflows/mediasoup-worker-prebuild.yaml +++ b/.github/workflows/mediasoup-worker-prebuild.yaml @@ -32,6 +32,7 @@ jobs: CC: ${{ matrix.build.cc }} CXX: ${{ matrix.build.cxx }} MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true' + MEDIASOUP_LOCAL_DEV: 'true' steps: - name: Checkout From 93bbb17d1065809e38356b284149e4373d5a0eed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Wed, 3 Jan 2024 10:41:22 +0100 Subject: [PATCH 14/16] cosmetic --- rust/src/router/plain_transport.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/src/router/plain_transport.rs b/rust/src/router/plain_transport.rs index 8988b6d37b..33debb2fbf 100644 --- a/rust/src/router/plain_transport.rs +++ b/rust/src/router/plain_transport.rs @@ -837,8 +837,8 @@ impl PlainTransport { /// # async fn f( /// # plain_transport: mediasoup::plain_transport::PlainTransport, /// # ) -> Result<(), Box> { - /// // Calling connect() on a PlainTransport created with comedia unset, rtcpMux - /// // set and enableSrtp enabled. + /// // Calling connect() on a PlainTransport created with comedia unset, + /// // rtcp_mux set and enableSrtp enabled. /// plain_transport /// .connect(PlainTransportRemoteParameters { /// ip: Some("1.2.3.4".parse().unwrap()), From 7b935b61136793c0f1644ad3fcef27b65234d9c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Wed, 3 Jan 2024 11:06:21 +0100 Subject: [PATCH 15/16] Fix Rust tests to ensure first transport is still referenced while the second one is created --- rust/tests/integration/plain_transport.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/rust/tests/integration/plain_transport.rs b/rust/tests/integration/plain_transport.rs index c132724b81..828135519f 100644 --- a/rust/tests/integration/plain_transport.rs +++ b/rust/tests/integration/plain_transport.rs @@ -431,8 +431,7 @@ fn create_two_transports_binding_to_same_ip_port_with_udp_reuse_port_flag_succee let multicast_ip = "224.0.0.1".parse().unwrap(); let port = pick_unused_port().unwrap(); - // Transport 1. - let _ = router + let transport1 = router .create_plain_transport({ PlainTransportOptions::new(ListenInfo { protocol: Protocol::Udp, @@ -450,8 +449,7 @@ fn create_two_transports_binding_to_same_ip_port_with_udp_reuse_port_flag_succee .await .expect("Failed to create first Plain transport"); - // Transport 2. - let _ = router + let transport2 = router .create_plain_transport({ PlainTransportOptions::new(ListenInfo { protocol: Protocol::Udp, @@ -468,6 +466,9 @@ fn create_two_transports_binding_to_same_ip_port_with_udp_reuse_port_flag_succee }) .await .expect("Failed to create second Plain transport"); + + assert_eq!(transport1.tuple().local_port(), port); + assert_eq!(transport2.tuple().local_port(), port); }); } @@ -480,8 +481,7 @@ fn create_two_transports_binding_to_same_ip_port_without_udp_reuse_port_flag_fai let multicast_ip = "224.0.0.1".parse().unwrap(); let port = pick_unused_port().unwrap(); - // Transport 1. - let _ = router + let transport1 = router .create_plain_transport({ PlainTransportOptions::new(ListenInfo { protocol: Protocol::Udp, @@ -499,7 +499,6 @@ fn create_two_transports_binding_to_same_ip_port_without_udp_reuse_port_flag_fai .await .expect("Failed to create first Plain transport"); - // Transport 2. assert!(matches!( router .create_plain_transport(PlainTransportOptions::new(ListenInfo { @@ -517,6 +516,8 @@ fn create_two_transports_binding_to_same_ip_port_without_udp_reuse_port_flag_fai .await, Err(RequestError::Response { .. }), )); + + assert_eq!(transport1.tuple().local_port(), port); }); } From 1dc2e3dc80424b478524046fb4e31d34e14d0a74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Wed, 3 Jan 2024 11:19:55 +0100 Subject: [PATCH 16/16] Fix cargo clippy --- rust/tests/integration/plain_transport.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/tests/integration/plain_transport.rs b/rust/tests/integration/plain_transport.rs index 828135519f..5fc334fca6 100644 --- a/rust/tests/integration/plain_transport.rs +++ b/rust/tests/integration/plain_transport.rs @@ -1,8 +1,8 @@ use futures_lite::future; use hash_hasher::HashedSet; -use mediasoup::data_structures::{ - AppData, ListenInfo, Protocol, SctpState, SocketFlags, TransportTuple, -}; +#[cfg(not(target_os = "windows"))] +use mediasoup::data_structures::SocketFlags; +use mediasoup::data_structures::{AppData, ListenInfo, Protocol, SctpState, TransportTuple}; use mediasoup::plain_transport::{PlainTransportOptions, PlainTransportRemoteParameters}; use mediasoup::prelude::*; use mediasoup::router::{Router, RouterOptions};