Skip to content

Commit

Permalink
ListenInfo: Add transport socket flags
Browse files Browse the repository at this point in the history
  • Loading branch information
ibc committed Jan 2, 2024
1 parent a36f2c2 commit 46f8819
Show file tree
Hide file tree
Showing 17 changed files with 228 additions and 94 deletions.
19 changes: 18 additions & 1 deletion node/src/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {
Transport,
TransportListenInfo,
TransportListenIp,
TransportProtocol
TransportProtocol,
TransportSocketFlag
} from './Transport';
import { WebRtcTransport, WebRtcTransportOptions, parseWebRtcTransportDumpResponse } from './WebRtcTransport';
import { PlainTransport, PlainTransportOptions, parsePlainTransportDumpResponse } from './PlainTransport';
Expand Down Expand Up @@ -570,6 +571,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo.ip,
listenInfo.announcedIp,
listenInfo.port,
socketFlagsToInteger(listenInfo.flags),
listenInfo.sendBufferSize,
listenInfo.recvBufferSize
));
Expand Down Expand Up @@ -749,6 +751,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo!.ip,
listenInfo!.announcedIp,
listenInfo!.port,
socketFlagsToInteger(listenInfo!.flags),
listenInfo!.sendBufferSize,
listenInfo!.recvBufferSize
),
Expand All @@ -759,6 +762,7 @@ export class Router<RouterAppData extends AppData = AppData>
rtcpListenInfo.ip,
rtcpListenInfo.announcedIp,
rtcpListenInfo.port,
socketFlagsToInteger(rtcpListenInfo.flags),
rtcpListenInfo.sendBufferSize,
rtcpListenInfo.recvBufferSize
) : undefined,
Expand Down Expand Up @@ -897,6 +901,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo!.ip,
listenInfo!.announcedIp,
listenInfo!.port,
socketFlagsToInteger(listenInfo!.flags),
listenInfo!.sendBufferSize,
listenInfo!.recvBufferSize
),
Expand Down Expand Up @@ -1619,3 +1624,15 @@ export function parseRouterDumpResponse(
mapDataConsumerIdDataProducerId : parseStringStringVector(binary, 'mapDataConsumerIdDataProducerId')
};
}

function socketFlagsToInteger(flags: TransportSocketFlag[] = []): number
{
let flagsInteger = 0;

for (const flag of flags)
{
flagsInteger |= flag;
}

return flagsInteger;
}
24 changes: 24 additions & 0 deletions node/src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ export type TransportListenInfo =
*/
port?: number;

/**
* Socket flags.
*/
flags?: TransportSocketFlag[];

/**
* Send buffer size (bytes).
*/
Expand Down Expand Up @@ -107,6 +112,25 @@ export type TransportListenIp =
*/
export type TransportProtocol = 'udp' | 'tcp';

/**
* UDP/TCP socket flag.
*/
// NOTE: ESLint absurdly complains about "'TransportSocketFlag' is already
// declared in the upper scope".
// eslint-disable-next-line no-shadow
export enum TransportSocketFlag
{
/**
* Disable dual-stack support so only IPv6 is used (only if ip is IPv6).
*/
IPV6ONLY = FbsTransport.SocketFlag.IPV6ONLY,
/**
* Make different transports bind to the same ip and port (only for UDP).
* Useful for multicast scenarios with plain transport. Use with caution.
*/
UDP_REUSEPORT = FbsTransport.SocketFlag.UDP_REUSEPORT
}

export type TransportTuple =
{
localIp: string;
Expand Down
37 changes: 37 additions & 0 deletions node/src/tests/test-PlainTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,43 @@ test('router.createPlainTransport() with non bindable IP rejects with Error', as
.toThrow(Error);
}, 2000);

test('two transports listening in same IP:port succeed if UDP_REUSEPORT flag is set', async () =>
{
let transport1: mediasoup.types.PlainTransport | undefined;
let transport2: mediasoup.types.PlainTransport | undefined;

await expect(async () =>
{
const multicastIp = '224.0.0.1';
const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 });

transport1 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : [ mediasoup.types.TransportSocketFlag.UDP_REUSEPORT ]
}
});

transport2 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : [ mediasoup.types.TransportSocketFlag.UDP_REUSEPORT ]
}
});
}).not.toThrow();

transport1?.close();
transport2?.close();
}, 2000);

test('plainTransport.getStats() succeeds', async () =>
{
const data = await transport.getStats();
Expand Down
8 changes: 4 additions & 4 deletions npm-scripts.mjs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import process from 'node:process';
import os from 'node:os';
import fs from 'node:fs';
import path from 'node:path';
import * as process from 'node:process';
import * as os from 'node:os';
import * as fs from 'node:fs';
import * as path from 'node:path';
import { execSync } from 'node:child_process';
import fetch from 'node-fetch';
import tar from 'tar';
Expand Down
10 changes: 8 additions & 2 deletions worker/fbs/transport.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ enum Protocol: uint8 {
TCP
}

enum SocketFlag: uint8 {
IPV6ONLY = 1,
UDP_REUSEPORT = 2
}

table ListenInfo {
protocol: FBS.Transport.Protocol = UDP;
protocol: Protocol = UDP;
ip: string (required);
announced_ip: string;
port: uint16 = 0;
flags: uint8 = 0;
send_buffer_size: uint32 = 0;
recv_buffer_size: uint32 = 0;
}
Expand Down Expand Up @@ -85,7 +91,7 @@ table Tuple {
local_port: uint16;
remote_ip: string;
remote_port: uint16;
protocol: FBS.Transport.Protocol = UDP;
protocol: Protocol = UDP;
}

table RtpListener {
Expand Down
13 changes: 13 additions & 0 deletions worker/include/Logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@
((value & 0x02) ? '1' : '0'), \
((value & 0x01) ? '1' : '0')

// Usage:
// MS_DEBUG_DEV("Leading text "MS_UINT8_TO_BINARY_PATTERN, MS_UINT8_TO_BINARY(value));
#define MS_UINT8_TO_BINARY_PATTERN "%c%c%c%c%c%c%c%c"
#define MS_UINT8_TO_BINARY(value) \
((value & 0x80) ? '1' : '0'), \
((value & 0x40) ? '1' : '0'), \
((value & 0x20) ? '1' : '0'), \
((value & 0x10) ? '1' : '0'), \
((value & 0x08) ? '1' : '0'), \
((value & 0x04) ? '1' : '0'), \
((value & 0x02) ? '1' : '0'), \
((value & 0x01) ? '1' : '0')

class Logger
{
public:
Expand Down
21 changes: 11 additions & 10 deletions worker/include/RTC/PortManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ namespace RTC
};

public:
static uv_udp_t* BindUdp(std::string& ip)
static uv_udp_t* BindUdp(std::string& ip, uint8_t flags)
{
return reinterpret_cast<uv_udp_t*>(Bind(Transport::UDP, ip));
return reinterpret_cast<uv_udp_t*>(Bind(Transport::UDP, ip, flags));
}
static uv_udp_t* BindUdp(std::string& ip, uint16_t port)
static uv_udp_t* BindUdp(std::string& ip, uint16_t port, uint8_t flags)
{
return reinterpret_cast<uv_udp_t*>(Bind(Transport::UDP, ip, port));
return reinterpret_cast<uv_udp_t*>(Bind(Transport::UDP, ip, port, flags));
}
static uv_tcp_t* BindTcp(std::string& ip)
static uv_tcp_t* BindTcp(std::string& ip, uint8_t flags)
{
return reinterpret_cast<uv_tcp_t*>(Bind(Transport::TCP, ip));
return reinterpret_cast<uv_tcp_t*>(Bind(Transport::TCP, ip, flags));
}
static uv_tcp_t* BindTcp(std::string& ip, uint16_t port)
static uv_tcp_t* BindTcp(std::string& ip, uint16_t port, uint8_t flags)
{
return reinterpret_cast<uv_tcp_t*>(Bind(Transport::TCP, ip, port));
return reinterpret_cast<uv_tcp_t*>(Bind(Transport::TCP, ip, port, flags));
}
static void UnbindUdp(std::string& ip, uint16_t port)
{
Expand All @@ -46,10 +46,11 @@ namespace RTC
}

private:
static uv_handle_t* Bind(Transport transport, std::string& ip);
static uv_handle_t* Bind(Transport transport, std::string& ip, uint16_t port);
static uv_handle_t* Bind(Transport transport, std::string& ip, uint8_t flags);
static uv_handle_t* Bind(Transport transport, std::string& ip, uint16_t port, uint8_t flags);
static void Unbind(Transport transport, std::string& ip, uint16_t port);
static std::vector<bool>& GetPorts(Transport transport, const std::string& ip);
static uint8_t ConvertSocketFlags(uint8_t flags);

private:
thread_local static absl::flat_hash_map<std::string, std::vector<bool>> mapUdpIpPorts;
Expand Down
9 changes: 7 additions & 2 deletions worker/include/RTC/TcpServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ namespace RTC
};

public:
TcpServer(Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip);
TcpServer(
Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint16_t port);
Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint8_t flags);
TcpServer(
Listener* listener,
RTC::TcpConnection::Listener* connListener,
std::string& ip,
uint16_t port,
uint8_t flags);
~TcpServer() override;

/* Pure virtual methods inherited from ::TcpServerHandle. */
Expand Down
1 change: 1 addition & 0 deletions worker/include/RTC/Transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ namespace RTC
uint16_t port{ 0u };
uint32_t sendBufferSize{ 0u };
uint32_t recvBufferSize{ 0u };
uint8_t flags{ 0u };
};

private:
Expand Down
4 changes: 2 additions & 2 deletions worker/include/RTC/UdpSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace RTC
};

public:
UdpSocket(Listener* listener, std::string& ip);
UdpSocket(Listener* listener, std::string& ip, uint16_t port);
UdpSocket(Listener* listener, std::string& ip, uint8_t flags);
UdpSocket(Listener* listener, std::string& ip, uint16_t port, uint8_t flags);
~UdpSocket() override;

/* Pure virtual methods inherited from ::UdpSocketHandle. */
Expand Down
22 changes: 7 additions & 15 deletions worker/src/RTC/PipeTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,10 @@ namespace RTC
this->listenInfo.announcedIp.assign(options->listenInfo()->announcedIp()->str());
}

this->listenInfo.port = options->listenInfo()->port();

if (flatbuffers::IsFieldPresent(
options->listenInfo(), FBS::Transport::ListenInfo::VT_SENDBUFFERSIZE))
{
this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize();
}

if (flatbuffers::IsFieldPresent(
options->listenInfo(), FBS::Transport::ListenInfo::VT_RECVBUFFERSIZE))
{
this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize();
}
this->listenInfo.port = options->listenInfo()->port();
this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize();
this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize();
this->listenInfo.flags = options->listenInfo()->flags();

this->rtx = options->enableRtx();

Expand All @@ -77,11 +68,12 @@ namespace RTC
// This may throw.
if (this->listenInfo.port != 0)
{
this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.port);
this->udpSocket = new RTC::UdpSocket(
this, this->listenInfo.ip, this->listenInfo.port, this->listenInfo.flags);
}
else
{
this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip);
this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.flags);
}

if (this->listenInfo.sendBufferSize != 0)
Expand Down
46 changes: 15 additions & 31 deletions worker/src/RTC/PlainTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,10 @@ namespace RTC
this->listenInfo.announcedIp.assign(options->listenInfo()->announcedIp()->str());
}

this->listenInfo.port = options->listenInfo()->port();

if (flatbuffers::IsFieldPresent(
options->listenInfo(), FBS::Transport::ListenInfo::VT_SENDBUFFERSIZE))
{
this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize();
}

if (flatbuffers::IsFieldPresent(
options->listenInfo(), FBS::Transport::ListenInfo::VT_RECVBUFFERSIZE))
{
this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize();
}
this->listenInfo.port = options->listenInfo()->port();
this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize();
this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize();
this->listenInfo.flags = options->listenInfo()->flags();

this->rtcpMux = options->rtcpMux();
this->comedia = options->comedia();
Expand All @@ -90,19 +81,10 @@ namespace RTC
this->rtcpListenInfo.announcedIp.assign(options->rtcpListenInfo()->announcedIp()->str());
}

this->rtcpListenInfo.port = options->rtcpListenInfo()->port();

if (flatbuffers::IsFieldPresent(
options->rtcpListenInfo(), FBS::Transport::ListenInfo::VT_SENDBUFFERSIZE))
{
this->rtcpListenInfo.sendBufferSize = options->rtcpListenInfo()->sendBufferSize();
}

if (flatbuffers::IsFieldPresent(
options->rtcpListenInfo(), FBS::Transport::ListenInfo::VT_RECVBUFFERSIZE))
{
this->rtcpListenInfo.recvBufferSize = options->rtcpListenInfo()->recvBufferSize();
}
this->rtcpListenInfo.port = options->rtcpListenInfo()->port();
this->rtcpListenInfo.sendBufferSize = options->rtcpListenInfo()->sendBufferSize();
this->rtcpListenInfo.recvBufferSize = options->rtcpListenInfo()->recvBufferSize();
this->rtcpListenInfo.flags = options->rtcpListenInfo()->flags();
}
// If rtcpListenInfo is not given, just clone listenInfo.
else
Expand Down Expand Up @@ -160,11 +142,12 @@ namespace RTC
// This may throw.
if (this->listenInfo.port != 0)
{
this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.port);
this->udpSocket = new RTC::UdpSocket(
this, this->listenInfo.ip, this->listenInfo.port, this->listenInfo.flags);
}
else
{
this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip);
this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.flags);
}

if (this->listenInfo.sendBufferSize != 0)
Expand All @@ -184,12 +167,13 @@ namespace RTC
// This may throw.
if (this->rtcpListenInfo.port != 0)
{
this->rtcpUdpSocket =
new RTC::UdpSocket(this, this->rtcpListenInfo.ip, this->rtcpListenInfo.port);
this->rtcpUdpSocket = new RTC::UdpSocket(
this, this->rtcpListenInfo.ip, this->rtcpListenInfo.port, this->rtcpListenInfo.flags);
}
else
{
this->rtcpUdpSocket = new RTC::UdpSocket(this, this->rtcpListenInfo.ip);
this->rtcpUdpSocket =
new RTC::UdpSocket(this, this->rtcpListenInfo.ip, this->rtcpListenInfo.flags);
}

if (this->rtcpListenInfo.sendBufferSize != 0)
Expand Down
Loading

0 comments on commit 46f8819

Please sign in to comment.