Skip to content

Commit

Permalink
Do Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
ibc committed Dec 12, 2023
1 parent 48c45f4 commit 582edde
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 20 deletions.
100 changes: 100 additions & 0 deletions rust/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3260,6 +3260,106 @@ impl Request for DataConsumerSetSubchannelsRequest {
}
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerAddSubchannelRequest {
pub(crate) subchannel: u16,
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerAddSubchannelResponse {
pub(crate) subchannels: Vec<u16>,
}

impl Request for DataConsumerAddSubchannelRequest {
const METHOD: request::Method = request::Method::DataconsumerAddSubchannel;
type HandlerId = DataConsumerId;
type Response = DataConsumerAddSubchannelResponse;

fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();

let data = data_consumer::AddSubchannelRequest::create(&mut builder, self.subchannel);
let request_body =
request::Body::create_data_consumer_add_subchannel_request(&mut builder, data);

let request = request::Request::create(
&mut builder,
id,
Self::METHOD,
handler_id.to_string(),
Some(request_body),
);
let message_body = message::Body::create_request(&mut builder, request);
let message = message::Message::create(&mut builder, message_body);

builder.finish(message, None).to_vec()
}

fn convert_response(
response: Option<response::BodyRef<'_>>,
) -> Result<Self::Response, Box<dyn Error>> {
let Some(response::BodyRef::DataConsumerAddSubchannelResponse(data)) = response else {
panic!("Wrong message from worker: {response:?}");
};

let data = data_consumer::AddSubchannelResponse::try_from(data)?;

Ok(DataConsumerAddSubchannelResponse {
subchannels: data.subchannels,
})
}
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerRemoveSubchannelRequest {
pub(crate) subchannel: u16,
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerRemoveSubchannelResponse {
pub(crate) subchannels: Vec<u16>,
}

impl Request for DataConsumerRemoveSubchannelRequest {
const METHOD: request::Method = request::Method::DataconsumerRemoveSubchannel;
type HandlerId = DataConsumerId;
type Response = DataConsumerRemoveSubchannelResponse;

fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();

let data = data_consumer::RemoveSubchannelRequest::create(&mut builder, self.subchannel);
let request_body =
request::Body::create_data_consumer_remove_subchannel_request(&mut builder, data);

let request = request::Request::create(
&mut builder,
id,
Self::METHOD,
handler_id.to_string(),
Some(request_body),
);
let message_body = message::Body::create_request(&mut builder, request);
let message = message::Message::create(&mut builder, message_body);

builder.finish(message, None).to_vec()
}

fn convert_response(
response: Option<response::BodyRef<'_>>,
) -> Result<Self::Response, Box<dyn Error>> {
let Some(response::BodyRef::DataConsumerRemoveSubchannelResponse(data)) = response else {
panic!("Wrong message from worker: {response:?}");
};

let data = data_consumer::RemoveSubchannelResponse::try_from(data)?;

Ok(DataConsumerRemoveSubchannelResponse {
subchannels: data.subchannels,
})
}
}

#[derive(Debug)]
pub(crate) struct RtpObserverCloseRequest {
pub(crate) rtp_observer_id: RtpObserverId,
Expand Down
68 changes: 48 additions & 20 deletions rust/src/router/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ mod tests;
use crate::data_producer::{DataProducer, DataProducerId, WeakDataProducer};
use crate::data_structures::{AppData, WebRtcMessage};
use crate::messages::{
DataConsumerCloseRequest, DataConsumerDumpRequest, DataConsumerGetBufferedAmountRequest,
DataConsumerGetStatsRequest, DataConsumerPauseRequest, DataConsumerResumeRequest,
DataConsumerSendRequest, DataConsumerSetBufferedAmountLowThresholdRequest,
DataConsumerSetSubchannelsRequest,
DataConsumerAddSubchannelRequest, DataConsumerCloseRequest, DataConsumerDumpRequest,
DataConsumerGetBufferedAmountRequest, DataConsumerGetStatsRequest, DataConsumerPauseRequest,
DataConsumerRemoveSubchannelRequest, DataConsumerResumeRequest, DataConsumerSendRequest,
DataConsumerSetBufferedAmountLowThresholdRequest, DataConsumerSetSubchannelsRequest,
};
use crate::sctp_parameters::SctpStreamParameters;
use crate::transport::Transport;
Expand Down Expand Up @@ -381,6 +381,7 @@ impl fmt::Debug for RegularDataConsumer {
.field("data_producer_id", &self.inner.data_producer_id)
.field("paused", &self.inner.paused)
.field("data_producer_paused", &self.inner.data_producer_paused)
.field("subchannels", &self.inner.subchannels)
.field("transport", &self.inner.transport)
.field("closed", &self.inner.closed)
.finish()
Expand Down Expand Up @@ -411,6 +412,7 @@ impl fmt::Debug for DirectDataConsumer {
.field("data_producer_id", &self.inner.data_producer_id)
.field("paused", &self.inner.paused)
.field("data_producer_paused", &self.inner.data_producer_paused)
.field("subchannels", &self.inner.subchannels)
.field("transport", &self.inner.transport)
.field("closed", &self.inner.closed)
.finish()
Expand Down Expand Up @@ -788,6 +790,48 @@ impl DataConsumer {
.await
}

/// Sets subchannels to the worker DataConsumer.
pub async fn set_subchannels(&self, subchannels: Vec<u16>) -> Result<(), RequestError> {
let response = self
.inner()
.channel
.request(self.id(), DataConsumerSetSubchannelsRequest { subchannels })
.await?;

*self.inner().subchannels.lock() = response.subchannels;

Ok(())
}

/// Adds a subchannel to the worker DataConsumer.
pub async fn add_subchannel(&self, subchannel: u16) -> Result<(), RequestError> {
let response = self
.inner()
.channel
.request(self.id(), DataConsumerAddSubchannelRequest { subchannel })
.await?;

*self.inner().subchannels.lock() = response.subchannels;

Ok(())
}

/// Removes a subchannel to the worker DataConsumer.
pub async fn remove_subchannel(&self, subchannel: u16) -> Result<(), RequestError> {
let response = self
.inner()
.channel
.request(
self.id(),
DataConsumerRemoveSubchannelRequest { subchannel },
)
.await?;

*self.inner().subchannels.lock() = response.subchannels;

Ok(())
}

/// Callback is called when a message has been received from the corresponding data producer.
///
/// # Notes on usage
Expand Down Expand Up @@ -918,22 +962,6 @@ impl DirectDataConsumer {
)
.await
}

/// Sets subchannels to the worker DataConsumer.
pub async fn set_subchannels(&self, subchannels: Vec<u16>) -> Result<(), RequestError> {
let response = self
.inner
.channel
.request(
self.inner.id,
DataConsumerSetSubchannelsRequest { subchannels },
)
.await?;

*self.inner.subchannels.lock() = response.subchannels;

Ok(())
}
}

/// [`WeakDataConsumer`] doesn't own data consumer instance on mediasoup-worker and will not prevent
Expand Down
111 changes: 111 additions & 0 deletions rust/tests/integration/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,117 @@ fn get_stats_succeeds() {
});
}

#[test]
fn set_subchannels() {
future::block_on(async move {
let (_worker, _router, transport1, data_producer) = init().await;

let data_consumer = transport1
.consume_data(DataConsumerOptions::new_sctp_unordered_with_life_time(
data_producer.id(),
4000,
))
.await
.expect("Failed to consume data");

data_consumer
.set_subchannels([999, 999, 998, 0].to_vec())
.await
.expect("Failed to set data consumer subchannels");

let mut sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [0, 998, 999]);
});
}

#[test]
fn add_and_remove_subchannel() {
future::block_on(async move {
let (_worker, _router, transport1, data_producer) = init().await;

let data_consumer = transport1
.consume_data(DataConsumerOptions::new_sctp_unordered_with_life_time(
data_producer.id(),
4000,
))
.await
.expect("Failed to consume data");

data_consumer
.set_subchannels([].to_vec())
.await
.expect("Failed to set data consumer subchannels");

assert_eq!(data_consumer.subchannels(), []);

data_consumer
.add_subchannel(5)
.await
.expect("Failed to add data consumer subchannel");

assert_eq!(data_consumer.subchannels(), [5]);

data_consumer
.add_subchannel(10)
.await
.expect("Failed to add data consumer subchannel");

let mut sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [5, 10]);

data_consumer
.add_subchannel(5)
.await
.expect("Failed to add data consumer subchannel");

sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [5, 10]);

data_consumer
.remove_subchannel(666)
.await
.expect("Failed to remove data consumer subchannel");

sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [5, 10]);

data_consumer
.remove_subchannel(5)
.await
.expect("Failed to remove data consumer subchannel");

sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [10]);

data_consumer
.add_subchannel(5)
.await
.expect("Failed to add data consumer subchannel");

sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [5, 10]);

data_consumer
.set_subchannels([].to_vec())
.await
.expect("Failed to set data consumer subchannels");

assert_eq!(data_consumer.subchannels(), []);
});
}

#[test]
fn consume_data_on_direct_transport_succeeds() {
future::block_on(async move {
Expand Down

0 comments on commit 582edde

Please sign in to comment.