diff --git a/CHANGELOG.md b/CHANGELOG.md index b0be793ad4..306202a174 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,10 @@ ### NEXT -* liburing: avoid extra memcpy on RTP ([PR #1258](https://github.com/versatica/mediasoup/pull/1258)). -* libsrtp: use our own fork ([PR #1260](https://github.com/versatica/mediasoup/pull/1260)). +* liburing: Avoid extra memcpy on RTP ([PR #1258](https://github.com/versatica/mediasoup/pull/1258)). +* libsrtp: Use our own fork ([PR #1260](https://github.com/versatica/mediasoup/pull/1260)). * Fix Rust `DataConsumer` ([PR #1262](https://github.com/versatica/mediasoup/pull/1262)). +* `DataConsumer`: Add `addSubchannel()` and `removeSubchannel()` methods ([PR #1263](https://github.com/versatica/mediasoup/pull/1263)). ### 3.13.10 diff --git a/node/src/DataConsumer.ts b/node/src/DataConsumer.ts index 12b7e5db68..52ccd0284b 100644 --- a/node/src/DataConsumer.ts +++ b/node/src/DataConsumer.ts @@ -586,6 +586,61 @@ export class DataConsumer this.#subchannels = utils.parseVector(data, 'subchannels'); } + /** + * Add a subchannel. + */ + async addSubchannel(subchannel: number): Promise + { + logger.debug('addSubchannel()'); + + /* Build Request. */ + const requestOffset = + FbsDataConsumer.AddSubchannelRequest.createAddSubchannelRequest( + this.#channel.bufferBuilder, subchannel); + + const response = await this.#channel.request( + FbsRequest.Method.DATACONSUMER_ADD_SUBCHANNEL, + FbsRequest.Body.DataConsumer_AddSubchannelRequest, + requestOffset, + this.#internal.dataConsumerId + ); + + /* Decode Response. */ + const data = new FbsDataConsumer.AddSubchannelResponse(); + + response.body(data); + + // Update subchannels. + this.#subchannels = utils.parseVector(data, 'subchannels'); + } + + /** + * Remove a subchannel. + */ + async removeSubchannel(subchannel: number): Promise + { + logger.debug('removeSubchannel()'); + + /* Build Request. */ + const requestOffset = FbsDataConsumer.RemoveSubchannelRequest. + createRemoveSubchannelRequest(this.#channel.bufferBuilder, subchannel); + + const response = await this.#channel.request( + FbsRequest.Method.DATACONSUMER_REMOVE_SUBCHANNEL, + FbsRequest.Body.DataConsumer_RemoveSubchannelRequest, + requestOffset, + this.#internal.dataConsumerId + ); + + /* Decode Response. */ + const data = new FbsDataConsumer.RemoveSubchannelResponse(); + + response.body(data); + + // Update subchannels. + this.#subchannels = utils.parseVector(data, 'subchannels'); + } + private handleWorkerNotifications(): void { this.#channel.on(this.#internal.dataConsumerId, (event: Event, data?: Notification) => diff --git a/node/src/tests/test-DataConsumer.ts b/node/src/tests/test-DataConsumer.ts index 44cdab5884..66bfae338b 100644 --- a/node/src/tests/test-DataConsumer.ts +++ b/node/src/tests/test-DataConsumer.ts @@ -139,6 +139,30 @@ test('dataConsumer.setSubchannels() succeeds', async () => .toEqual([ 0, 998, 999 ]); }, 2000); +test('dataConsumer.addSubchannel() and .removeSubchannel() succeed', async () => +{ + await dataConsumer1.setSubchannels([ ]); + expect(dataConsumer1.subchannels).toEqual([ ]); + + await dataConsumer1.addSubchannel(5); + expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5 ]); + + await dataConsumer1.addSubchannel(10); + expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]); + + await dataConsumer1.addSubchannel(5); + expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]); + + await dataConsumer1.removeSubchannel(666); + expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]); + + await dataConsumer1.removeSubchannel(5); + expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 10 ]); + + await dataConsumer1.setSubchannels([ ]); + expect(dataConsumer1.subchannels).toEqual([ ]); +}, 2000); + test('transport.consumeData() on a DirectTransport succeeds', async () => { const onObserverNewDataConsumer = jest.fn(); diff --git a/rust/src/messages.rs b/rust/src/messages.rs index e92b8521ac..1eb50f46cc 100644 --- a/rust/src/messages.rs +++ b/rust/src/messages.rs @@ -3260,6 +3260,106 @@ impl Request for DataConsumerSetSubchannelsRequest { } } +#[derive(Debug, Clone, Serialize)] +pub(crate) struct DataConsumerAddSubchannelRequest { + pub(crate) subchannel: u16, +} + +#[derive(Debug, Clone, Serialize)] +pub(crate) struct DataConsumerAddSubchannelResponse { + pub(crate) subchannels: Vec, +} + +impl Request for DataConsumerAddSubchannelRequest { + const METHOD: request::Method = request::Method::DataconsumerAddSubchannel; + type HandlerId = DataConsumerId; + type Response = DataConsumerAddSubchannelResponse; + + fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec { + let mut builder = Builder::new(); + + let data = data_consumer::AddSubchannelRequest::create(&mut builder, self.subchannel); + let request_body = + request::Body::create_data_consumer_add_subchannel_request(&mut builder, data); + + let request = request::Request::create( + &mut builder, + id, + Self::METHOD, + handler_id.to_string(), + Some(request_body), + ); + let message_body = message::Body::create_request(&mut builder, request); + let message = message::Message::create(&mut builder, message_body); + + builder.finish(message, None).to_vec() + } + + fn convert_response( + response: Option>, + ) -> Result> { + let Some(response::BodyRef::DataConsumerAddSubchannelResponse(data)) = response else { + panic!("Wrong message from worker: {response:?}"); + }; + + let data = data_consumer::AddSubchannelResponse::try_from(data)?; + + Ok(DataConsumerAddSubchannelResponse { + subchannels: data.subchannels, + }) + } +} + +#[derive(Debug, Clone, Serialize)] +pub(crate) struct DataConsumerRemoveSubchannelRequest { + pub(crate) subchannel: u16, +} + +#[derive(Debug, Clone, Serialize)] +pub(crate) struct DataConsumerRemoveSubchannelResponse { + pub(crate) subchannels: Vec, +} + +impl Request for DataConsumerRemoveSubchannelRequest { + const METHOD: request::Method = request::Method::DataconsumerRemoveSubchannel; + type HandlerId = DataConsumerId; + type Response = DataConsumerRemoveSubchannelResponse; + + fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec { + let mut builder = Builder::new(); + + let data = data_consumer::RemoveSubchannelRequest::create(&mut builder, self.subchannel); + let request_body = + request::Body::create_data_consumer_remove_subchannel_request(&mut builder, data); + + let request = request::Request::create( + &mut builder, + id, + Self::METHOD, + handler_id.to_string(), + Some(request_body), + ); + let message_body = message::Body::create_request(&mut builder, request); + let message = message::Message::create(&mut builder, message_body); + + builder.finish(message, None).to_vec() + } + + fn convert_response( + response: Option>, + ) -> Result> { + let Some(response::BodyRef::DataConsumerRemoveSubchannelResponse(data)) = response else { + panic!("Wrong message from worker: {response:?}"); + }; + + let data = data_consumer::RemoveSubchannelResponse::try_from(data)?; + + Ok(DataConsumerRemoveSubchannelResponse { + subchannels: data.subchannels, + }) + } +} + #[derive(Debug)] pub(crate) struct RtpObserverCloseRequest { pub(crate) rtp_observer_id: RtpObserverId, diff --git a/rust/src/router/data_consumer.rs b/rust/src/router/data_consumer.rs index 003a7f03cc..88ca1a5742 100644 --- a/rust/src/router/data_consumer.rs +++ b/rust/src/router/data_consumer.rs @@ -4,10 +4,10 @@ mod tests; use crate::data_producer::{DataProducer, DataProducerId, WeakDataProducer}; use crate::data_structures::{AppData, WebRtcMessage}; use crate::messages::{ - DataConsumerCloseRequest, DataConsumerDumpRequest, DataConsumerGetBufferedAmountRequest, - DataConsumerGetStatsRequest, DataConsumerPauseRequest, DataConsumerResumeRequest, - DataConsumerSendRequest, DataConsumerSetBufferedAmountLowThresholdRequest, - DataConsumerSetSubchannelsRequest, + DataConsumerAddSubchannelRequest, DataConsumerCloseRequest, DataConsumerDumpRequest, + DataConsumerGetBufferedAmountRequest, DataConsumerGetStatsRequest, DataConsumerPauseRequest, + DataConsumerRemoveSubchannelRequest, DataConsumerResumeRequest, DataConsumerSendRequest, + DataConsumerSetBufferedAmountLowThresholdRequest, DataConsumerSetSubchannelsRequest, }; use crate::sctp_parameters::SctpStreamParameters; use crate::transport::Transport; @@ -803,6 +803,35 @@ impl DataConsumer { Ok(()) } + /// Adds a subchannel to the worker DataConsumer. + pub async fn add_subchannel(&self, subchannel: u16) -> Result<(), RequestError> { + let response = self + .inner() + .channel + .request(self.id(), DataConsumerAddSubchannelRequest { subchannel }) + .await?; + + *self.inner().subchannels.lock() = response.subchannels; + + Ok(()) + } + + /// Removes a subchannel to the worker DataConsumer. + pub async fn remove_subchannel(&self, subchannel: u16) -> Result<(), RequestError> { + let response = self + .inner() + .channel + .request( + self.id(), + DataConsumerRemoveSubchannelRequest { subchannel }, + ) + .await?; + + *self.inner().subchannels.lock() = response.subchannels; + + Ok(()) + } + /// Callback is called when a message has been received from the corresponding data producer. /// /// # Notes on usage diff --git a/rust/tests/integration/data_consumer.rs b/rust/tests/integration/data_consumer.rs index 11e3255c92..d18e21d540 100644 --- a/rust/tests/integration/data_consumer.rs +++ b/rust/tests/integration/data_consumer.rs @@ -346,6 +346,92 @@ fn set_subchannels() { }); } +#[test] +fn add_and_remove_subchannel() { + future::block_on(async move { + let (_worker, _router, transport1, data_producer) = init().await; + + let data_consumer = transport1 + .consume_data(DataConsumerOptions::new_sctp_unordered_with_life_time( + data_producer.id(), + 4000, + )) + .await + .expect("Failed to consume data"); + + data_consumer + .set_subchannels([].to_vec()) + .await + .expect("Failed to set data consumer subchannels"); + + assert_eq!(data_consumer.subchannels(), []); + + data_consumer + .add_subchannel(5) + .await + .expect("Failed to add data consumer subchannel"); + + assert_eq!(data_consumer.subchannels(), [5]); + + data_consumer + .add_subchannel(10) + .await + .expect("Failed to add data consumer subchannel"); + + let mut sorted_subchannels = data_consumer.subchannels(); + sorted_subchannels.sort(); + + assert_eq!(sorted_subchannels, [5, 10]); + + data_consumer + .add_subchannel(5) + .await + .expect("Failed to add data consumer subchannel"); + + sorted_subchannels = data_consumer.subchannels(); + sorted_subchannels.sort(); + + assert_eq!(sorted_subchannels, [5, 10]); + + data_consumer + .remove_subchannel(666) + .await + .expect("Failed to remove data consumer subchannel"); + + sorted_subchannels = data_consumer.subchannels(); + sorted_subchannels.sort(); + + assert_eq!(sorted_subchannels, [5, 10]); + + data_consumer + .remove_subchannel(5) + .await + .expect("Failed to remove data consumer subchannel"); + + sorted_subchannels = data_consumer.subchannels(); + sorted_subchannels.sort(); + + assert_eq!(sorted_subchannels, [10]); + + data_consumer + .add_subchannel(5) + .await + .expect("Failed to add data consumer subchannel"); + + sorted_subchannels = data_consumer.subchannels(); + sorted_subchannels.sort(); + + assert_eq!(sorted_subchannels, [5, 10]); + + data_consumer + .set_subchannels([].to_vec()) + .await + .expect("Failed to set data consumer subchannels"); + + assert_eq!(data_consumer.subchannels(), []); + }); +} + #[test] fn consume_data_on_direct_transport_succeeds() { future::block_on(async move { diff --git a/worker/fbs/dataConsumer.fbs b/worker/fbs/dataConsumer.fbs index 2bebe33191..2670288a58 100644 --- a/worker/fbs/dataConsumer.fbs +++ b/worker/fbs/dataConsumer.fbs @@ -47,6 +47,22 @@ table SetSubchannelsResponse { subchannels: [uint16] (required); } +table AddSubchannelRequest { + subchannel: uint16; +} + +table AddSubchannelResponse { + subchannels: [uint16] (required); +} + +table RemoveSubchannelRequest { + subchannel: uint16; +} + +table RemoveSubchannelResponse { + subchannels: [uint16] (required); +} + // Notifications from Worker. table BufferedAmountLowNotification { diff --git a/worker/fbs/request.fbs b/worker/fbs/request.fbs index 56eca5674d..d9f869e878 100644 --- a/worker/fbs/request.fbs +++ b/worker/fbs/request.fbs @@ -72,6 +72,8 @@ enum Method: uint8 { DATACONSUMER_SET_BUFFERED_AMOUNT_LOW_THRESHOLD, DATACONSUMER_SEND, DATACONSUMER_SET_SUBCHANNELS, + DATACONSUMER_ADD_SUBCHANNEL, + DATACONSUMER_REMOVE_SUBCHANNEL, RTPOBSERVER_PAUSE, RTPOBSERVER_RESUME, RTPOBSERVER_ADD_PRODUCER, @@ -114,6 +116,8 @@ union Body { DataConsumer_SetBufferedAmountLowThresholdRequest: FBS.DataConsumer.SetBufferedAmountLowThresholdRequest, DataConsumer_SendRequest: FBS.DataConsumer.SendRequest, DataConsumer_SetSubchannelsRequest: FBS.DataConsumer.SetSubchannelsRequest, + DataConsumer_AddSubchannelRequest: FBS.DataConsumer.AddSubchannelRequest, + DataConsumer_RemoveSubchannelRequest: FBS.DataConsumer.RemoveSubchannelRequest, RtpObserver_AddProducerRequest: FBS.RtpObserver.AddProducerRequest, RtpObserver_RemoveProducerRequest: FBS.RtpObserver.RemoveProducerRequest, } diff --git a/worker/fbs/response.fbs b/worker/fbs/response.fbs index 539d36a72a..707e5750ef 100644 --- a/worker/fbs/response.fbs +++ b/worker/fbs/response.fbs @@ -40,6 +40,8 @@ union Body { DataConsumer_DumpResponse: FBS.DataConsumer.DumpResponse, DataConsumer_GetStatsResponse: FBS.DataConsumer.GetStatsResponse, DataConsumer_SetSubchannelsResponse: FBS.DataConsumer.SetSubchannelsResponse, + DataConsumer_AddSubchannelResponse: FBS.DataConsumer.AddSubchannelResponse, + DataConsumer_RemoveSubchannelResponse: FBS.DataConsumer.RemoveSubchannelResponse } table Response { diff --git a/worker/src/Channel/ChannelRequest.cpp b/worker/src/Channel/ChannelRequest.cpp index adf62ba28e..ed1ef58395 100644 --- a/worker/src/Channel/ChannelRequest.cpp +++ b/worker/src/Channel/ChannelRequest.cpp @@ -75,6 +75,8 @@ namespace Channel { FBS::Request::Method::DATACONSUMER_SET_BUFFERED_AMOUNT_LOW_THRESHOLD, "dataConsumer.setBufferedAmountLowThreshold" }, { FBS::Request::Method::DATACONSUMER_SEND, "dataConsumer.send" }, { FBS::Request::Method::DATACONSUMER_SET_SUBCHANNELS, "dataConsumer.setSubchannels" }, + { FBS::Request::Method::DATACONSUMER_ADD_SUBCHANNEL, "dataConsumer.addSubchannel" }, + { FBS::Request::Method::DATACONSUMER_REMOVE_SUBCHANNEL, "dataConsumer.removeSubchannel" }, { FBS::Request::Method::RTPOBSERVER_PAUSE, "rtpObserver.pause" }, { FBS::Request::Method::RTPOBSERVER_RESUME, "rtpObserver.resume" }, { FBS::Request::Method::RTPOBSERVER_ADD_PRODUCER, "rtpObserver.addProducer" }, diff --git a/worker/src/RTC/DataConsumer.cpp b/worker/src/RTC/DataConsumer.cpp index 3f5fd69ffd..fdabc812a5 100644 --- a/worker/src/RTC/DataConsumer.cpp +++ b/worker/src/RTC/DataConsumer.cpp @@ -339,6 +339,54 @@ namespace RTC break; } + case Channel::ChannelRequest::Method::DATACONSUMER_ADD_SUBCHANNEL: + { + const auto* body = request->data->body_as(); + + this->subchannels.insert(body->subchannel()); + + std::vector subchannels; + + subchannels.reserve(this->subchannels.size()); + + for (auto subchannel : this->subchannels) + { + subchannels.emplace_back(subchannel); + } + + // Create response. + auto responseOffset = FBS::DataConsumer::CreateAddSubchannelResponseDirect( + request->GetBufferBuilder(), std::addressof(subchannels)); + + request->Accept(FBS::Response::Body::DataConsumer_AddSubchannelResponse, responseOffset); + + break; + } + + case Channel::ChannelRequest::Method::DATACONSUMER_REMOVE_SUBCHANNEL: + { + const auto* body = request->data->body_as(); + + this->subchannels.erase(body->subchannel()); + + std::vector subchannels; + + subchannels.reserve(this->subchannels.size()); + + for (auto subchannel : this->subchannels) + { + subchannels.emplace_back(subchannel); + } + + // Create response. + auto responseOffset = FBS::DataConsumer::CreateRemoveSubchannelResponseDirect( + request->GetBufferBuilder(), std::addressof(subchannels)); + + request->Accept(FBS::Response::Body::DataConsumer_RemoveSubchannelResponse, responseOffset); + + break; + } + default: { MS_THROW_ERROR("unknown method '%s'", request->methodCStr);