From 582edde1e35112e5495885509f5ad36579a2336d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 12 Dec 2023 19:48:03 +0100 Subject: [PATCH] Do Rust --- rust/src/messages.rs | 100 +++++++++++++++++++++ rust/src/router/data_consumer.rs | 68 ++++++++++----- rust/tests/integration/data_consumer.rs | 111 ++++++++++++++++++++++++ 3 files changed, 259 insertions(+), 20 deletions(-) diff --git a/rust/src/messages.rs b/rust/src/messages.rs index e92b8521ac..1eb50f46cc 100644 --- a/rust/src/messages.rs +++ b/rust/src/messages.rs @@ -3260,6 +3260,106 @@ impl Request for DataConsumerSetSubchannelsRequest { } } +#[derive(Debug, Clone, Serialize)] +pub(crate) struct DataConsumerAddSubchannelRequest { + pub(crate) subchannel: u16, +} + +#[derive(Debug, Clone, Serialize)] +pub(crate) struct DataConsumerAddSubchannelResponse { + pub(crate) subchannels: Vec, +} + +impl Request for DataConsumerAddSubchannelRequest { + const METHOD: request::Method = request::Method::DataconsumerAddSubchannel; + type HandlerId = DataConsumerId; + type Response = DataConsumerAddSubchannelResponse; + + fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec { + let mut builder = Builder::new(); + + let data = data_consumer::AddSubchannelRequest::create(&mut builder, self.subchannel); + let request_body = + request::Body::create_data_consumer_add_subchannel_request(&mut builder, data); + + let request = request::Request::create( + &mut builder, + id, + Self::METHOD, + handler_id.to_string(), + Some(request_body), + ); + let message_body = message::Body::create_request(&mut builder, request); + let message = message::Message::create(&mut builder, message_body); + + builder.finish(message, None).to_vec() + } + + fn convert_response( + response: Option>, + ) -> Result> { + let Some(response::BodyRef::DataConsumerAddSubchannelResponse(data)) = response else { + panic!("Wrong message from worker: {response:?}"); + }; + + let data = data_consumer::AddSubchannelResponse::try_from(data)?; + + Ok(DataConsumerAddSubchannelResponse { + subchannels: data.subchannels, + }) + } +} + +#[derive(Debug, Clone, Serialize)] +pub(crate) struct DataConsumerRemoveSubchannelRequest { + pub(crate) subchannel: u16, +} + +#[derive(Debug, Clone, Serialize)] +pub(crate) struct DataConsumerRemoveSubchannelResponse { + pub(crate) subchannels: Vec, +} + +impl Request for DataConsumerRemoveSubchannelRequest { + const METHOD: request::Method = request::Method::DataconsumerRemoveSubchannel; + type HandlerId = DataConsumerId; + type Response = DataConsumerRemoveSubchannelResponse; + + fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec { + let mut builder = Builder::new(); + + let data = data_consumer::RemoveSubchannelRequest::create(&mut builder, self.subchannel); + let request_body = + request::Body::create_data_consumer_remove_subchannel_request(&mut builder, data); + + let request = request::Request::create( + &mut builder, + id, + Self::METHOD, + handler_id.to_string(), + Some(request_body), + ); + let message_body = message::Body::create_request(&mut builder, request); + let message = message::Message::create(&mut builder, message_body); + + builder.finish(message, None).to_vec() + } + + fn convert_response( + response: Option>, + ) -> Result> { + let Some(response::BodyRef::DataConsumerRemoveSubchannelResponse(data)) = response else { + panic!("Wrong message from worker: {response:?}"); + }; + + let data = data_consumer::RemoveSubchannelResponse::try_from(data)?; + + Ok(DataConsumerRemoveSubchannelResponse { + subchannels: data.subchannels, + }) + } +} + #[derive(Debug)] pub(crate) struct RtpObserverCloseRequest { pub(crate) rtp_observer_id: RtpObserverId, diff --git a/rust/src/router/data_consumer.rs b/rust/src/router/data_consumer.rs index d235f4c894..88ca1a5742 100644 --- a/rust/src/router/data_consumer.rs +++ b/rust/src/router/data_consumer.rs @@ -4,10 +4,10 @@ mod tests; use crate::data_producer::{DataProducer, DataProducerId, WeakDataProducer}; use crate::data_structures::{AppData, WebRtcMessage}; use crate::messages::{ - DataConsumerCloseRequest, DataConsumerDumpRequest, DataConsumerGetBufferedAmountRequest, - DataConsumerGetStatsRequest, DataConsumerPauseRequest, DataConsumerResumeRequest, - DataConsumerSendRequest, DataConsumerSetBufferedAmountLowThresholdRequest, - DataConsumerSetSubchannelsRequest, + DataConsumerAddSubchannelRequest, DataConsumerCloseRequest, DataConsumerDumpRequest, + DataConsumerGetBufferedAmountRequest, DataConsumerGetStatsRequest, DataConsumerPauseRequest, + DataConsumerRemoveSubchannelRequest, DataConsumerResumeRequest, DataConsumerSendRequest, + DataConsumerSetBufferedAmountLowThresholdRequest, DataConsumerSetSubchannelsRequest, }; use crate::sctp_parameters::SctpStreamParameters; use crate::transport::Transport; @@ -381,6 +381,7 @@ impl fmt::Debug for RegularDataConsumer { .field("data_producer_id", &self.inner.data_producer_id) .field("paused", &self.inner.paused) .field("data_producer_paused", &self.inner.data_producer_paused) + .field("subchannels", &self.inner.subchannels) .field("transport", &self.inner.transport) .field("closed", &self.inner.closed) .finish() @@ -411,6 +412,7 @@ impl fmt::Debug for DirectDataConsumer { .field("data_producer_id", &self.inner.data_producer_id) .field("paused", &self.inner.paused) .field("data_producer_paused", &self.inner.data_producer_paused) + .field("subchannels", &self.inner.subchannels) .field("transport", &self.inner.transport) .field("closed", &self.inner.closed) .finish() @@ -788,6 +790,48 @@ impl DataConsumer { .await } + /// Sets subchannels to the worker DataConsumer. + pub async fn set_subchannels(&self, subchannels: Vec) -> Result<(), RequestError> { + let response = self + .inner() + .channel + .request(self.id(), DataConsumerSetSubchannelsRequest { subchannels }) + .await?; + + *self.inner().subchannels.lock() = response.subchannels; + + Ok(()) + } + + /// Adds a subchannel to the worker DataConsumer. + pub async fn add_subchannel(&self, subchannel: u16) -> Result<(), RequestError> { + let response = self + .inner() + .channel + .request(self.id(), DataConsumerAddSubchannelRequest { subchannel }) + .await?; + + *self.inner().subchannels.lock() = response.subchannels; + + Ok(()) + } + + /// Removes a subchannel to the worker DataConsumer. + pub async fn remove_subchannel(&self, subchannel: u16) -> Result<(), RequestError> { + let response = self + .inner() + .channel + .request( + self.id(), + DataConsumerRemoveSubchannelRequest { subchannel }, + ) + .await?; + + *self.inner().subchannels.lock() = response.subchannels; + + Ok(()) + } + /// Callback is called when a message has been received from the corresponding data producer. /// /// # Notes on usage @@ -918,22 +962,6 @@ impl DirectDataConsumer { ) .await } - - /// Sets subchannels to the worker DataConsumer. - pub async fn set_subchannels(&self, subchannels: Vec) -> Result<(), RequestError> { - let response = self - .inner - .channel - .request( - self.inner.id, - DataConsumerSetSubchannelsRequest { subchannels }, - ) - .await?; - - *self.inner.subchannels.lock() = response.subchannels; - - Ok(()) - } } /// [`WeakDataConsumer`] doesn't own data consumer instance on mediasoup-worker and will not prevent diff --git a/rust/tests/integration/data_consumer.rs b/rust/tests/integration/data_consumer.rs index 3d0894fcc7..d18e21d540 100644 --- a/rust/tests/integration/data_consumer.rs +++ b/rust/tests/integration/data_consumer.rs @@ -321,6 +321,117 @@ fn get_stats_succeeds() { }); } +#[test] +fn set_subchannels() { + future::block_on(async move { + let (_worker, _router, transport1, data_producer) = init().await; + + let data_consumer = transport1 + .consume_data(DataConsumerOptions::new_sctp_unordered_with_life_time( + data_producer.id(), + 4000, + )) + .await + .expect("Failed to consume data"); + + data_consumer + .set_subchannels([999, 999, 998, 0].to_vec()) + .await + .expect("Failed to set data consumer subchannels"); + + let mut sorted_subchannels = data_consumer.subchannels(); + sorted_subchannels.sort(); + + assert_eq!(sorted_subchannels, [0, 998, 999]); + }); +} + +#[test] +fn add_and_remove_subchannel() { + future::block_on(async move { + let (_worker, _router, transport1, data_producer) = init().await; + + let data_consumer = transport1 + .consume_data(DataConsumerOptions::new_sctp_unordered_with_life_time( + data_producer.id(), + 4000, + )) + .await + .expect("Failed to consume data"); + + data_consumer + .set_subchannels([].to_vec()) + .await + .expect("Failed to set data consumer subchannels"); + + assert_eq!(data_consumer.subchannels(), []); + + data_consumer + .add_subchannel(5) + .await + .expect("Failed to add data consumer subchannel"); + + assert_eq!(data_consumer.subchannels(), [5]); + + data_consumer + .add_subchannel(10) + .await + .expect("Failed to add data consumer subchannel"); + + let mut sorted_subchannels = data_consumer.subchannels(); + sorted_subchannels.sort(); + + assert_eq!(sorted_subchannels, [5, 10]); + + data_consumer + .add_subchannel(5) + .await + .expect("Failed to add data consumer subchannel"); + + sorted_subchannels = data_consumer.subchannels(); + sorted_subchannels.sort(); + + assert_eq!(sorted_subchannels, [5, 10]); + + data_consumer + .remove_subchannel(666) + .await + .expect("Failed to remove data consumer subchannel"); + + sorted_subchannels = data_consumer.subchannels(); + sorted_subchannels.sort(); + + assert_eq!(sorted_subchannels, [5, 10]); + + data_consumer + .remove_subchannel(5) + .await + .expect("Failed to remove data consumer subchannel"); + + sorted_subchannels = data_consumer.subchannels(); + sorted_subchannels.sort(); + + assert_eq!(sorted_subchannels, [10]); + + data_consumer + .add_subchannel(5) + .await + .expect("Failed to add data consumer subchannel"); + + sorted_subchannels = data_consumer.subchannels(); + sorted_subchannels.sort(); + + assert_eq!(sorted_subchannels, [5, 10]); + + data_consumer + .set_subchannels([].to_vec()) + .await + .expect("Failed to set data consumer subchannels"); + + assert_eq!(data_consumer.subchannels(), []); + }); +} + #[test] fn consume_data_on_direct_transport_succeeds() { future::block_on(async move {