Skip to content

Commit

Permalink
DataConsumer: Add addSubchannel() and removeSubchannel() (#1263)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibc authored Dec 12, 2023
1 parent eccc173 commit e2e0622
Show file tree
Hide file tree
Showing 11 changed files with 373 additions and 6 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

### NEXT

* liburing: avoid extra memcpy on RTP ([PR #1258](https://github.com/versatica/mediasoup/pull/1258)).
* libsrtp: use our own fork ([PR #1260](https://github.com/versatica/mediasoup/pull/1260)).
* liburing: Avoid extra memcpy on RTP ([PR #1258](https://github.com/versatica/mediasoup/pull/1258)).
* libsrtp: Use our own fork ([PR #1260](https://github.com/versatica/mediasoup/pull/1260)).
* Fix Rust `DataConsumer` ([PR #1262](https://github.com/versatica/mediasoup/pull/1262)).
* `DataConsumer`: Add `addSubchannel()` and `removeSubchannel()` methods ([PR #1263](https://github.com/versatica/mediasoup/pull/1263)).


### 3.13.10
Expand Down
55 changes: 55 additions & 0 deletions node/src/DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,61 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
this.#subchannels = utils.parseVector(data, 'subchannels');
}

/**
* Add a subchannel.
*/
async addSubchannel(subchannel: number): Promise<void>
{
logger.debug('addSubchannel()');

/* Build Request. */
const requestOffset =
FbsDataConsumer.AddSubchannelRequest.createAddSubchannelRequest(
this.#channel.bufferBuilder, subchannel);

const response = await this.#channel.request(
FbsRequest.Method.DATACONSUMER_ADD_SUBCHANNEL,
FbsRequest.Body.DataConsumer_AddSubchannelRequest,
requestOffset,
this.#internal.dataConsumerId
);

/* Decode Response. */
const data = new FbsDataConsumer.AddSubchannelResponse();

response.body(data);

// Update subchannels.
this.#subchannels = utils.parseVector(data, 'subchannels');
}

/**
* Remove a subchannel.
*/
async removeSubchannel(subchannel: number): Promise<void>
{
logger.debug('removeSubchannel()');

/* Build Request. */
const requestOffset = FbsDataConsumer.RemoveSubchannelRequest.
createRemoveSubchannelRequest(this.#channel.bufferBuilder, subchannel);

const response = await this.#channel.request(
FbsRequest.Method.DATACONSUMER_REMOVE_SUBCHANNEL,
FbsRequest.Body.DataConsumer_RemoveSubchannelRequest,
requestOffset,
this.#internal.dataConsumerId
);

/* Decode Response. */
const data = new FbsDataConsumer.RemoveSubchannelResponse();

response.body(data);

// Update subchannels.
this.#subchannels = utils.parseVector(data, 'subchannels');
}

private handleWorkerNotifications(): void
{
this.#channel.on(this.#internal.dataConsumerId, (event: Event, data?: Notification) =>
Expand Down
24 changes: 24 additions & 0 deletions node/src/tests/test-DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,30 @@ test('dataConsumer.setSubchannels() succeeds', async () =>
.toEqual([ 0, 998, 999 ]);
}, 2000);

test('dataConsumer.addSubchannel() and .removeSubchannel() succeed', async () =>
{
await dataConsumer1.setSubchannels([ ]);
expect(dataConsumer1.subchannels).toEqual([ ]);

await dataConsumer1.addSubchannel(5);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5 ]);

await dataConsumer1.addSubchannel(10);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]);

await dataConsumer1.addSubchannel(5);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]);

await dataConsumer1.removeSubchannel(666);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]);

await dataConsumer1.removeSubchannel(5);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 10 ]);

await dataConsumer1.setSubchannels([ ]);
expect(dataConsumer1.subchannels).toEqual([ ]);
}, 2000);

test('transport.consumeData() on a DirectTransport succeeds', async () =>
{
const onObserverNewDataConsumer = jest.fn();
Expand Down
100 changes: 100 additions & 0 deletions rust/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3260,6 +3260,106 @@ impl Request for DataConsumerSetSubchannelsRequest {
}
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerAddSubchannelRequest {
pub(crate) subchannel: u16,
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerAddSubchannelResponse {
pub(crate) subchannels: Vec<u16>,
}

impl Request for DataConsumerAddSubchannelRequest {
const METHOD: request::Method = request::Method::DataconsumerAddSubchannel;
type HandlerId = DataConsumerId;
type Response = DataConsumerAddSubchannelResponse;

fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();

let data = data_consumer::AddSubchannelRequest::create(&mut builder, self.subchannel);
let request_body =
request::Body::create_data_consumer_add_subchannel_request(&mut builder, data);

let request = request::Request::create(
&mut builder,
id,
Self::METHOD,
handler_id.to_string(),
Some(request_body),
);
let message_body = message::Body::create_request(&mut builder, request);
let message = message::Message::create(&mut builder, message_body);

builder.finish(message, None).to_vec()
}

fn convert_response(
response: Option<response::BodyRef<'_>>,
) -> Result<Self::Response, Box<dyn Error>> {
let Some(response::BodyRef::DataConsumerAddSubchannelResponse(data)) = response else {
panic!("Wrong message from worker: {response:?}");
};

let data = data_consumer::AddSubchannelResponse::try_from(data)?;

Ok(DataConsumerAddSubchannelResponse {
subchannels: data.subchannels,
})
}
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerRemoveSubchannelRequest {
pub(crate) subchannel: u16,
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerRemoveSubchannelResponse {
pub(crate) subchannels: Vec<u16>,
}

impl Request for DataConsumerRemoveSubchannelRequest {
const METHOD: request::Method = request::Method::DataconsumerRemoveSubchannel;
type HandlerId = DataConsumerId;
type Response = DataConsumerRemoveSubchannelResponse;

fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();

let data = data_consumer::RemoveSubchannelRequest::create(&mut builder, self.subchannel);
let request_body =
request::Body::create_data_consumer_remove_subchannel_request(&mut builder, data);

let request = request::Request::create(
&mut builder,
id,
Self::METHOD,
handler_id.to_string(),
Some(request_body),
);
let message_body = message::Body::create_request(&mut builder, request);
let message = message::Message::create(&mut builder, message_body);

builder.finish(message, None).to_vec()
}

fn convert_response(
response: Option<response::BodyRef<'_>>,
) -> Result<Self::Response, Box<dyn Error>> {
let Some(response::BodyRef::DataConsumerRemoveSubchannelResponse(data)) = response else {
panic!("Wrong message from worker: {response:?}");
};

let data = data_consumer::RemoveSubchannelResponse::try_from(data)?;

Ok(DataConsumerRemoveSubchannelResponse {
subchannels: data.subchannels,
})
}
}

#[derive(Debug)]
pub(crate) struct RtpObserverCloseRequest {
pub(crate) rtp_observer_id: RtpObserverId,
Expand Down
37 changes: 33 additions & 4 deletions rust/src/router/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ mod tests;
use crate::data_producer::{DataProducer, DataProducerId, WeakDataProducer};
use crate::data_structures::{AppData, WebRtcMessage};
use crate::messages::{
DataConsumerCloseRequest, DataConsumerDumpRequest, DataConsumerGetBufferedAmountRequest,
DataConsumerGetStatsRequest, DataConsumerPauseRequest, DataConsumerResumeRequest,
DataConsumerSendRequest, DataConsumerSetBufferedAmountLowThresholdRequest,
DataConsumerSetSubchannelsRequest,
DataConsumerAddSubchannelRequest, DataConsumerCloseRequest, DataConsumerDumpRequest,
DataConsumerGetBufferedAmountRequest, DataConsumerGetStatsRequest, DataConsumerPauseRequest,
DataConsumerRemoveSubchannelRequest, DataConsumerResumeRequest, DataConsumerSendRequest,
DataConsumerSetBufferedAmountLowThresholdRequest, DataConsumerSetSubchannelsRequest,
};
use crate::sctp_parameters::SctpStreamParameters;
use crate::transport::Transport;
Expand Down Expand Up @@ -803,6 +803,35 @@ impl DataConsumer {
Ok(())
}

/// Adds a subchannel to the worker DataConsumer.
pub async fn add_subchannel(&self, subchannel: u16) -> Result<(), RequestError> {
let response = self
.inner()
.channel
.request(self.id(), DataConsumerAddSubchannelRequest { subchannel })
.await?;

*self.inner().subchannels.lock() = response.subchannels;

Ok(())
}

/// Removes a subchannel to the worker DataConsumer.
pub async fn remove_subchannel(&self, subchannel: u16) -> Result<(), RequestError> {
let response = self
.inner()
.channel
.request(
self.id(),
DataConsumerRemoveSubchannelRequest { subchannel },
)
.await?;

*self.inner().subchannels.lock() = response.subchannels;

Ok(())
}

/// Callback is called when a message has been received from the corresponding data producer.
///
/// # Notes on usage
Expand Down
86 changes: 86 additions & 0 deletions rust/tests/integration/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,92 @@ fn set_subchannels() {
});
}

#[test]
fn add_and_remove_subchannel() {
future::block_on(async move {
let (_worker, _router, transport1, data_producer) = init().await;

let data_consumer = transport1
.consume_data(DataConsumerOptions::new_sctp_unordered_with_life_time(
data_producer.id(),
4000,
))
.await
.expect("Failed to consume data");

data_consumer
.set_subchannels([].to_vec())
.await
.expect("Failed to set data consumer subchannels");

assert_eq!(data_consumer.subchannels(), []);

data_consumer
.add_subchannel(5)
.await
.expect("Failed to add data consumer subchannel");

assert_eq!(data_consumer.subchannels(), [5]);

data_consumer
.add_subchannel(10)
.await
.expect("Failed to add data consumer subchannel");

let mut sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [5, 10]);

data_consumer
.add_subchannel(5)
.await
.expect("Failed to add data consumer subchannel");

sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [5, 10]);

data_consumer
.remove_subchannel(666)
.await
.expect("Failed to remove data consumer subchannel");

sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [5, 10]);

data_consumer
.remove_subchannel(5)
.await
.expect("Failed to remove data consumer subchannel");

sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [10]);

data_consumer
.add_subchannel(5)
.await
.expect("Failed to add data consumer subchannel");

sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [5, 10]);

data_consumer
.set_subchannels([].to_vec())
.await
.expect("Failed to set data consumer subchannels");

assert_eq!(data_consumer.subchannels(), []);
});
}

#[test]
fn consume_data_on_direct_transport_succeeds() {
future::block_on(async move {
Expand Down
16 changes: 16 additions & 0 deletions worker/fbs/dataConsumer.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ table SetSubchannelsResponse {
subchannels: [uint16] (required);
}

table AddSubchannelRequest {
subchannel: uint16;
}

table AddSubchannelResponse {
subchannels: [uint16] (required);
}

table RemoveSubchannelRequest {
subchannel: uint16;
}

table RemoveSubchannelResponse {
subchannels: [uint16] (required);
}

// Notifications from Worker.

table BufferedAmountLowNotification {
Expand Down
4 changes: 4 additions & 0 deletions worker/fbs/request.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ enum Method: uint8 {
DATACONSUMER_SET_BUFFERED_AMOUNT_LOW_THRESHOLD,
DATACONSUMER_SEND,
DATACONSUMER_SET_SUBCHANNELS,
DATACONSUMER_ADD_SUBCHANNEL,
DATACONSUMER_REMOVE_SUBCHANNEL,
RTPOBSERVER_PAUSE,
RTPOBSERVER_RESUME,
RTPOBSERVER_ADD_PRODUCER,
Expand Down Expand Up @@ -114,6 +116,8 @@ union Body {
DataConsumer_SetBufferedAmountLowThresholdRequest: FBS.DataConsumer.SetBufferedAmountLowThresholdRequest,
DataConsumer_SendRequest: FBS.DataConsumer.SendRequest,
DataConsumer_SetSubchannelsRequest: FBS.DataConsumer.SetSubchannelsRequest,
DataConsumer_AddSubchannelRequest: FBS.DataConsumer.AddSubchannelRequest,
DataConsumer_RemoveSubchannelRequest: FBS.DataConsumer.RemoveSubchannelRequest,
RtpObserver_AddProducerRequest: FBS.RtpObserver.AddProducerRequest,
RtpObserver_RemoveProducerRequest: FBS.RtpObserver.RemoveProducerRequest,
}
Expand Down
Loading

0 comments on commit e2e0622

Please sign in to comment.