Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ListenInfo: Add transport socket flags #1291

Merged
merged 16 commits into from
Jan 3, 2024
1 change: 1 addition & 0 deletions .github/workflows/mediasoup-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:

env:
MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true'
MEDIASOUP_LOCAL_DEV: 'true'

steps:
- name: Checkout
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/mediasoup-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
CC: ${{ matrix.build.cc }}
CXX: ${{ matrix.build.cxx }}
MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true'
MEDIASOUP_LOCAL_DEV: 'true'

steps:
- name: Checkout
Expand Down
17 changes: 16 additions & 1 deletion node/src/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {
Transport,
TransportListenInfo,
TransportListenIp,
TransportProtocol
TransportProtocol,
TransportSocketFlags
} from './Transport';
import { WebRtcTransport, WebRtcTransportOptions, parseWebRtcTransportDumpResponse } from './WebRtcTransport';
import { PlainTransport, PlainTransportOptions, parsePlainTransportDumpResponse } from './PlainTransport';
Expand Down Expand Up @@ -570,6 +571,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo.ip,
listenInfo.announcedIp,
listenInfo.port,
socketFlagsToFbs(listenInfo.flags),
listenInfo.sendBufferSize,
listenInfo.recvBufferSize
));
Expand Down Expand Up @@ -749,6 +751,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo!.ip,
listenInfo!.announcedIp,
listenInfo!.port,
socketFlagsToFbs(listenInfo!.flags),
listenInfo!.sendBufferSize,
listenInfo!.recvBufferSize
),
Expand All @@ -759,6 +762,7 @@ export class Router<RouterAppData extends AppData = AppData>
rtcpListenInfo.ip,
rtcpListenInfo.announcedIp,
rtcpListenInfo.port,
socketFlagsToFbs(rtcpListenInfo.flags),
rtcpListenInfo.sendBufferSize,
rtcpListenInfo.recvBufferSize
) : undefined,
Expand Down Expand Up @@ -897,6 +901,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo!.ip,
listenInfo!.announcedIp,
listenInfo!.port,
socketFlagsToFbs(listenInfo!.flags),
listenInfo!.sendBufferSize,
listenInfo!.recvBufferSize
),
Expand Down Expand Up @@ -1619,3 +1624,13 @@ export function parseRouterDumpResponse(
mapDataConsumerIdDataProducerId : parseStringStringVector(binary, 'mapDataConsumerIdDataProducerId')
};
}

export function socketFlagsToFbs(
flags: TransportSocketFlags = {}
): FbsTransport.SocketFlagsT
{
return new FbsTransport.SocketFlagsT(
Boolean(flags.ipv6Only),
Boolean(flags.udpReusePort)
);
}
21 changes: 21 additions & 0 deletions node/src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ export type TransportListenInfo =
*/
port?: number;

/**
* Socket flags.
*/
flags?: TransportSocketFlags;

/**
* Send buffer size (bytes).
*/
Expand Down Expand Up @@ -107,6 +112,22 @@ export type TransportListenIp =
*/
export type TransportProtocol = 'udp' | 'tcp';

/**
* UDP/TCP socket flags.
*/
export type TransportSocketFlags =
{
/**
* Disable dual-stack support so only IPv6 is used (only if ip is IPv6).
*/
ipv6Only?: boolean;
/**
* Make different transports bind to the same ip and port (only for UDP).
* Useful for multicast scenarios with plain transport. Use with caution.
*/
udpReusePort?: boolean;
};

export type TransportTuple =
{
localIp: string;
Expand Down
3 changes: 2 additions & 1 deletion node/src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import * as ortc from './ortc';
import { Channel } from './Channel';
import { Router, RouterOptions } from './Router';
import { Router, RouterOptions, socketFlagsToFbs } from './Router';
import { WebRtcServer, WebRtcServerOptions } from './WebRtcServer';
import { RtpCodecCapability } from './RtpParameters';
import { AppData } from './types';
Expand Down Expand Up @@ -699,6 +699,7 @@ export class Worker<WorkerAppData extends AppData = AppData>
listenInfo.ip,
listenInfo.announcedIp,
listenInfo.port,
socketFlagsToFbs(listenInfo.flags),
listenInfo.sendBufferSize,
listenInfo.recvBufferSize)
);
Expand Down
43 changes: 43 additions & 0 deletions node/src/tests/test-PlainTransport.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import * as os from 'node:os';
// @ts-ignore
import * as pickPort from 'pick-port';
import * as mediasoup from '../';

const IS_WINDOWS = os.platform() === 'win32';

let worker: mediasoup.types.Worker;
let router: mediasoup.types.Router;
let transport: mediasoup.types.PlainTransport;
Expand Down Expand Up @@ -316,6 +319,46 @@ test('router.createPlainTransport() with non bindable IP rejects with Error', as
.toThrow(Error);
}, 2000);

if (!IS_WINDOWS)
{
test('two transports listening in same IP:port succeed if UDP_REUSEPORT flag is set', async () =>
{
let transport1: mediasoup.types.PlainTransport | undefined;
let transport2: mediasoup.types.PlainTransport | undefined;

await expect(async () =>
{
const multicastIp = '224.0.0.1';
const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 });

transport1 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : { udpReusePort: true }
}
});

transport2 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : { udpReusePort: true }
}
});
}).not.toThrow();

transport1?.close();
transport2?.close();
}, 2000);
}

test('plainTransport.getStats() succeeds', async () =>
{
const data = await transport.getStats();
Expand Down
8 changes: 4 additions & 4 deletions npm-scripts.mjs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import process from 'node:process';
import os from 'node:os';
import fs from 'node:fs';
import path from 'node:path';
import * as process from 'node:process';
import * as os from 'node:os';
import * as fs from 'node:fs';
import * as path from 'node:path';
import { execSync } from 'node:child_process';
import fetch from 'node-fetch';
import tar from 'tar';
Expand Down
23 changes: 23 additions & 0 deletions rust/src/data_structures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ pub struct ListenInfo {
/// Listening port.
#[serde(skip_serializing_if = "Option::is_none")]
pub port: Option<u16>,
/// Socket flags.
#[serde(skip_serializing_if = "Option::is_none")]
pub flags: Option<Vec<SocketFlag>>,
/// Send buffer size (bytes).
#[serde(skip_serializing_if = "Option::is_none")]
pub send_buffer_size: Option<u32>,
Expand Down Expand Up @@ -190,6 +193,26 @@ impl Protocol {
}
}

/// Transport socket flag.
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum SocketFlag {
/// Disable dual-stack support so only IPv6 is used (only if ip is IPv6).
IpV6Only,
/// Make different transports bind to the same ip and port (only for UDP).
/// Useful for multicast scenarios with plain transport. Use with caution.
UdpReusePort,
}

impl SocketFlag {
pub(crate) fn to_fbs(self) -> transport::SocketFlag {
match self {
SocketFlag::IpV6Only => transport::SocketFlag::IPV6ONLY,
SocketFlag::UdpReusePort => transport::SocketFlag::UDP_REUSEPORT,
}
}
}

/// ICE candidate
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
Expand Down
10 changes: 8 additions & 2 deletions worker/fbs/transport.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ enum Protocol: uint8 {
TCP
}

table SocketFlags {
ipv6_only: bool = false;
udp_reuse_port: bool = false;
}

table ListenInfo {
protocol: FBS.Transport.Protocol = UDP;
protocol: Protocol = UDP;
ip: string (required);
announced_ip: string;
port: uint16 = 0;
flags: SocketFlags;
send_buffer_size: uint32 = 0;
recv_buffer_size: uint32 = 0;
}
Expand Down Expand Up @@ -85,7 +91,7 @@ table Tuple {
local_port: uint16;
remote_ip: string;
remote_port: uint16;
protocol: FBS.Transport.Protocol = UDP;
protocol: Protocol = UDP;
}

table RtpListener {
Expand Down
13 changes: 13 additions & 0 deletions worker/include/Logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@
((value & 0x02) ? '1' : '0'), \
((value & 0x01) ? '1' : '0')

// Usage:
// MS_DEBUG_DEV("Leading text "MS_UINT8_TO_BINARY_PATTERN, MS_UINT8_TO_BINARY(value));
#define MS_UINT8_TO_BINARY_PATTERN "%c%c%c%c%c%c%c%c"
#define MS_UINT8_TO_BINARY(value) \
((value & 0x80) ? '1' : '0'), \
((value & 0x40) ? '1' : '0'), \
((value & 0x20) ? '1' : '0'), \
((value & 0x10) ? '1' : '0'), \
((value & 0x08) ? '1' : '0'), \
((value & 0x04) ? '1' : '0'), \
((value & 0x02) ? '1' : '0'), \
((value & 0x01) ? '1' : '0')

class Logger
{
public:
Expand Down
23 changes: 13 additions & 10 deletions worker/include/RTC/PortManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "common.hpp"
#include "Settings.hpp"
#include "RTC/Transport.hpp"
#include <uv.h>
#include <absl/container/flat_hash_map.h>
#include <string>
Expand All @@ -20,21 +21,21 @@ namespace RTC
};

public:
static uv_udp_t* BindUdp(std::string& ip)
static uv_udp_t* BindUdp(std::string& ip, RTC::Transport::SocketFlags& flags)
{
return reinterpret_cast<uv_udp_t*>(Bind(Transport::UDP, ip));
return reinterpret_cast<uv_udp_t*>(Bind(Transport::UDP, ip, flags));
}
static uv_udp_t* BindUdp(std::string& ip, uint16_t port)
static uv_udp_t* BindUdp(std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags)
{
return reinterpret_cast<uv_udp_t*>(Bind(Transport::UDP, ip, port));
return reinterpret_cast<uv_udp_t*>(Bind(Transport::UDP, ip, port, flags));
}
static uv_tcp_t* BindTcp(std::string& ip)
static uv_tcp_t* BindTcp(std::string& ip, RTC::Transport::SocketFlags& flags)
{
return reinterpret_cast<uv_tcp_t*>(Bind(Transport::TCP, ip));
return reinterpret_cast<uv_tcp_t*>(Bind(Transport::TCP, ip, flags));
}
static uv_tcp_t* BindTcp(std::string& ip, uint16_t port)
static uv_tcp_t* BindTcp(std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags)
{
return reinterpret_cast<uv_tcp_t*>(Bind(Transport::TCP, ip, port));
return reinterpret_cast<uv_tcp_t*>(Bind(Transport::TCP, ip, port, flags));
}
static void UnbindUdp(std::string& ip, uint16_t port)
{
Expand All @@ -46,10 +47,12 @@ namespace RTC
}

private:
static uv_handle_t* Bind(Transport transport, std::string& ip);
static uv_handle_t* Bind(Transport transport, std::string& ip, uint16_t port);
static uv_handle_t* Bind(Transport transport, std::string& ip, RTC::Transport::SocketFlags& flags);
static uv_handle_t* Bind(
Transport transport, std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags);
static void Unbind(Transport transport, std::string& ip, uint16_t port);
static std::vector<bool>& GetPorts(Transport transport, const std::string& ip);
static uint8_t ConvertSocketFlags(RTC::Transport::SocketFlags& flags);

private:
thread_local static absl::flat_hash_map<std::string, std::vector<bool>> mapUdpIpPorts;
Expand Down
13 changes: 11 additions & 2 deletions worker/include/RTC/TcpServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "common.hpp"
#include "RTC/TcpConnection.hpp"
#include "RTC/Transport.hpp"
#include "handles/TcpConnectionHandle.hpp"
#include "handles/TcpServerHandle.hpp"
#include <string>
Expand All @@ -23,9 +24,17 @@ namespace RTC
};

public:
TcpServer(Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip);
TcpServer(
Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint16_t port);
Listener* listener,
RTC::TcpConnection::Listener* connListener,
std::string& ip,
RTC::Transport::SocketFlags& flags);
TcpServer(
Listener* listener,
RTC::TcpConnection::Listener* connListener,
std::string& ip,
uint16_t port,
RTC::Transport::SocketFlags& flags);
~TcpServer() override;

/* Pure virtual methods inherited from ::TcpServerHandle. */
Expand Down
7 changes: 7 additions & 0 deletions worker/include/RTC/Transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,18 @@ namespace RTC
};

public:
struct SocketFlags
{
bool ipv6Only{ false };
bool udpReusePort{ false };
};

struct ListenInfo
{
std::string ip;
std::string announcedIp;
uint16_t port{ 0u };
SocketFlags flags;
uint32_t sendBufferSize{ 0u };
uint32_t recvBufferSize{ 0u };
};
Expand Down
5 changes: 3 additions & 2 deletions worker/include/RTC/UdpSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define MS_RTC_UDP_SOCKET_HPP

#include "common.hpp"
#include "RTC/Transport.hpp"
#include "handles/UdpSocketHandle.hpp"
#include <string>

Expand All @@ -21,8 +22,8 @@ namespace RTC
};

public:
UdpSocket(Listener* listener, std::string& ip);
UdpSocket(Listener* listener, std::string& ip, uint16_t port);
UdpSocket(Listener* listener, std::string& ip, RTC::Transport::SocketFlags& flags);
UdpSocket(Listener* listener, std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags);
~UdpSocket() override;

/* Pure virtual methods inherited from ::UdpSocketHandle. */
Expand Down
Loading
Loading