Skip to content

Commit

Permalink
Fix Rust DataConsumer
Browse files Browse the repository at this point in the history
### Details

* [x] Rust `set_subchannels()` method was only implemented in `DirectDataConsumer` which is wrong.
* [ ] Rust `DirectDataConsumer` lacks ALL methods of `DataConsumer` such as `pause()`, `closed()`, etc!!!
* [x] Bonus track: Use an ordered set in `DataConsumer` in worker so `subchannels` will be always shown in numerical order (no need to sort them later in Node/Rust layer).
  • Loading branch information
ibc committed Dec 12, 2023
1 parent 042b7f5 commit aca18a3
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 25 deletions.
4 changes: 2 additions & 2 deletions node/src/tests/test-DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ test('transport.consumeData() succeeds', async () =>
expect(dataConsumer1.label).toBe('foo');
expect(dataConsumer1.protocol).toBe('bar');
expect(dataConsumer1.paused).toBe(false);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 1, 2, 100, 65535 ]);
expect(dataConsumer1.subchannels).toEqual([ 0, 1, 2, 100, 65535 ]);
expect(dataConsumer1.appData).toEqual({ baz: 'LOL' });

const dump = await router.dump();
Expand Down Expand Up @@ -134,7 +134,7 @@ test('dataConsumer.setSubchannels() succeeds', async () =>
{
await dataConsumer1.setSubchannels([ 999, 999, 998, 65536 ]);

expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 998, 999 ]);
expect(dataConsumer1.subchannels).toEqual([ 0, 998, 999 ]);
}, 2000);

test('transport.consumeData() on a DirectTransport succeeds', async () =>
Expand Down
33 changes: 17 additions & 16 deletions rust/src/router/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ impl fmt::Debug for DirectDataConsumer {
.field("data_producer_id", &self.inner.data_producer_id)
.field("paused", &self.inner.paused)
.field("data_producer_paused", &self.inner.data_producer_paused)
.field("subchannels", &self.inner.subchannels)
.field("transport", &self.inner.transport)
.field("closed", &self.inner.closed)
.finish()
Expand Down Expand Up @@ -788,6 +789,22 @@ impl DataConsumer {
.await
}

/// Sets subchannels to the worker DataConsumer.
pub async fn set_subchannels(&self, subchannels: Vec<u16>) -> Result<(), RequestError> {
let response = self
.inner()
.channel
.request(
self.id(),
DataConsumerSetSubchannelsRequest { subchannels },
)
.await?;

*self.inner().subchannels.lock() = response.subchannels;

Ok(())
}

/// Callback is called when a message has been received from the corresponding data producer.
///
/// # Notes on usage
Expand Down Expand Up @@ -918,22 +935,6 @@ impl DirectDataConsumer {
)
.await
}

/// Sets subchannels to the worker DataConsumer.
pub async fn set_subchannels(&self, subchannels: Vec<u16>) -> Result<(), RequestError> {
let response = self
.inner
.channel
.request(
self.inner.id,
DataConsumerSetSubchannelsRequest { subchannels },
)
.await?;

*self.inner.subchannels.lock() = response.subchannels;

Ok(())
}
}

/// [`WeakDataConsumer`] doesn't own data consumer instance on mediasoup-worker and will not prevent
Expand Down
32 changes: 27 additions & 5 deletions rust/tests/integration/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,7 @@ fn consume_data_succeeds() {
}
assert_eq!(data_consumer.label().as_str(), "foo");
assert_eq!(data_consumer.protocol().as_str(), "bar");

let mut sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [0, 1, 2, 100, 65535]);
assert_eq!(data_consumer.subchannels(), [0, 1, 2, 100, 65535]);
assert_eq!(
data_consumer
.app_data()
Expand Down Expand Up @@ -321,6 +317,32 @@ fn get_stats_succeeds() {
});
}

#[test]
fn set_subchannels() {
future::block_on(async move {
let (_worker, router, transport1, data_producer) = init().await;

let data_consumer = transport1
.consume_data({
let options = DataConsumerOptions::new_sctp_unordered_with_life_time(
data_producer.id(),
4000,
);

options
})
.await
.expect("Failed to consume data");

data_consumer
.set_subchannels([ 999, 999, 998, 65536 ].to_vec())
.await
.expect("Failed to set data consumer subchannels");

assert_eq!(data_consumer.subchannels(), [ 0, 998, 999 ]);
});
}

#[test]
fn consume_data_on_direct_transport_succeeds() {
future::block_on(async move {
Expand Down
4 changes: 2 additions & 2 deletions worker/include/RTC/DataConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "Channel/ChannelSocket.hpp"
#include "RTC/SctpDictionaries.hpp"
#include "RTC/Shared.hpp"
#include <absl/container/flat_hash_set.h>
#include <absl/container/btree_set.h>
#include <string>

namespace RTC
Expand Down Expand Up @@ -126,7 +126,7 @@ namespace RTC
RTC::SctpStreamParameters sctpStreamParameters;
std::string label;
std::string protocol;
absl::flat_hash_set<uint16_t> subchannels;
absl::btree_set<uint16_t> subchannels;
bool transportConnected{ false };
bool sctpAssociationConnected{ false };
bool paused{ false };
Expand Down

0 comments on commit aca18a3

Please sign in to comment.