From eccc173bdb3646db10fbe3a57e1325c1dae104ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 12 Dec 2023 19:12:03 +0100 Subject: [PATCH] Fix Rust DataConsumer (#1262) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: José Luis Millán --- CHANGELOG.md | 1 + node/src/tests/test-DataConsumer.ts | 6 +++-- rust/src/router/data_consumer.rs | 31 +++++++++++----------- rust/tests/integration/data_consumer.rs | 25 +++++++++++++++++ rust/tests/integration/direct_transport.rs | 4 +-- 5 files changed, 47 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e545dd3208..b0be793ad4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * liburing: avoid extra memcpy on RTP ([PR #1258](https://github.com/versatica/mediasoup/pull/1258)). * libsrtp: use our own fork ([PR #1260](https://github.com/versatica/mediasoup/pull/1260)). +* Fix Rust `DataConsumer` ([PR #1262](https://github.com/versatica/mediasoup/pull/1262)). ### 3.13.10 diff --git a/node/src/tests/test-DataConsumer.ts b/node/src/tests/test-DataConsumer.ts index 5bbf33fa0c..44cdab5884 100644 --- a/node/src/tests/test-DataConsumer.ts +++ b/node/src/tests/test-DataConsumer.ts @@ -71,7 +71,8 @@ test('transport.consumeData() succeeds', async () => expect(dataConsumer1.label).toBe('foo'); expect(dataConsumer1.protocol).toBe('bar'); expect(dataConsumer1.paused).toBe(false); - expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 1, 2, 100, 65535 ]); + expect(dataConsumer1.subchannels.sort((a, b) => a - b)) + .toEqual([ 0, 1, 2, 100, 65535 ]); expect(dataConsumer1.appData).toEqual({ baz: 'LOL' }); const dump = await router.dump(); @@ -134,7 +135,8 @@ test('dataConsumer.setSubchannels() succeeds', async () => { await dataConsumer1.setSubchannels([ 999, 999, 998, 65536 ]); - expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 998, 999 ]); + expect(dataConsumer1.subchannels.sort((a, b) => a - b)) + .toEqual([ 0, 998, 999 ]); }, 2000); test('transport.consumeData() on a DirectTransport succeeds', async () => diff --git a/rust/src/router/data_consumer.rs b/rust/src/router/data_consumer.rs index d235f4c894..003a7f03cc 100644 --- a/rust/src/router/data_consumer.rs +++ b/rust/src/router/data_consumer.rs @@ -381,6 +381,7 @@ impl fmt::Debug for RegularDataConsumer { .field("data_producer_id", &self.inner.data_producer_id) .field("paused", &self.inner.paused) .field("data_producer_paused", &self.inner.data_producer_paused) + .field("subchannels", &self.inner.subchannels) .field("transport", &self.inner.transport) .field("closed", &self.inner.closed) .finish() @@ -411,6 +412,7 @@ impl fmt::Debug for DirectDataConsumer { .field("data_producer_id", &self.inner.data_producer_id) .field("paused", &self.inner.paused) .field("data_producer_paused", &self.inner.data_producer_paused) + .field("subchannels", &self.inner.subchannels) .field("transport", &self.inner.transport) .field("closed", &self.inner.closed) .finish() @@ -788,6 +790,19 @@ impl DataConsumer { .await } + /// Sets subchannels to the worker DataConsumer. + pub async fn set_subchannels(&self, subchannels: Vec) -> Result<(), RequestError> { + let response = self + .inner() + .channel + .request(self.id(), DataConsumerSetSubchannelsRequest { subchannels }) + .await?; + + *self.inner().subchannels.lock() = response.subchannels; + + Ok(()) + } + /// Callback is called when a message has been received from the corresponding data producer. /// /// # Notes on usage @@ -918,22 +933,6 @@ impl DirectDataConsumer { ) .await } - - /// Sets subchannels to the worker DataConsumer. - pub async fn set_subchannels(&self, subchannels: Vec) -> Result<(), RequestError> { - let response = self - .inner - .channel - .request( - self.inner.id, - DataConsumerSetSubchannelsRequest { subchannels }, - ) - .await?; - - *self.inner.subchannels.lock() = response.subchannels; - - Ok(()) - } } /// [`WeakDataConsumer`] doesn't own data consumer instance on mediasoup-worker and will not prevent diff --git a/rust/tests/integration/data_consumer.rs b/rust/tests/integration/data_consumer.rs index 3d0894fcc7..11e3255c92 100644 --- a/rust/tests/integration/data_consumer.rs +++ b/rust/tests/integration/data_consumer.rs @@ -321,6 +321,31 @@ fn get_stats_succeeds() { }); } +#[test] +fn set_subchannels() { + future::block_on(async move { + let (_worker, _router, transport1, data_producer) = init().await; + + let data_consumer = transport1 + .consume_data(DataConsumerOptions::new_sctp_unordered_with_life_time( + data_producer.id(), + 4000, + )) + .await + .expect("Failed to consume data"); + + data_consumer + .set_subchannels([999, 999, 998, 0].to_vec()) + .await + .expect("Failed to set data consumer subchannels"); + + let mut sorted_subchannels = data_consumer.subchannels(); + sorted_subchannels.sort(); + + assert_eq!(sorted_subchannels, [0, 998, 999]); + }); +} + #[test] fn consume_data_on_direct_transport_succeeds() { future::block_on(async move { diff --git a/rust/tests/integration/direct_transport.rs b/rust/tests/integration/direct_transport.rs index 380f3e1b3e..6053de0208 100644 --- a/rust/tests/integration/direct_transport.rs +++ b/rust/tests/integration/direct_transport.rs @@ -430,7 +430,7 @@ fn send_with_subchannels_succeeds() { } }; - let direct_data_consumer_2 = match &data_consumer_2 { + let _ = match &data_consumer_2 { DataConsumer::Direct(direct_data_consumer) => direct_data_consumer, _ => { panic!("Expected direct data consumer") @@ -514,7 +514,7 @@ fn send_with_subchannels_succeeds() { let mut subchannels = data_consumer_2.subchannels(); subchannels.push(1); - direct_data_consumer_2 + data_consumer_2 .set_subchannels(subchannels) .await .expect("Failed to set subchannels");