Skip to content

Commit

Permalink
Make subchannels be ordered
Browse files Browse the repository at this point in the history
### Details

- Use an ordered set in `DataConsumer` in worker so `subchannels` will be always shown in numerical order (no need to sort them later in Node/Rust layer).
- Also make `dataConsumer.subchannels` be **immediately** (and optimistically) updated before `dataChannel.setSubchannels()` completes.
  • Loading branch information
ibc committed Dec 12, 2023
1 parent b2508b1 commit 3d54749
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
### NEXT

* liburing: avoid extra memcpy on RTP ([PR #1258](https://github.com/versatica/mediasoup/pull/1258)).
* Make subchannels be ordered ([PR #XXXX](https://github.com/versatica/mediasoup/pull/XXXX)).


### 3.13.10
Expand Down
12 changes: 12 additions & 0 deletions node/src/DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,18 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
{
logger.debug('setSubchannels()');

// Optimistic subchannels update. We do this to make the |subchannels|
// getter be immediately updated with given values. Later it will be
// updated again with the response from the worker.
const subchannelsSet = new Set<number>();

for (const subchannel of subchannels)
{
subchannelsSet.add(Math.abs(subchannel % 65536));
}

this.#subchannels = Array.from(subchannelsSet).sort();

/* Build Request. */
const requestOffset = new FbsDataConsumer.SetSubchannelsRequestT(
subchannels
Expand Down
14 changes: 11 additions & 3 deletions node/src/tests/test-DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ test('transport.consumeData() succeeds', async () =>
expect(dataConsumer1.label).toBe('foo');
expect(dataConsumer1.protocol).toBe('bar');
expect(dataConsumer1.paused).toBe(false);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 1, 2, 100, 65535 ]);
expect(dataConsumer1.subchannels).toEqual([ 0, 1, 2, 100, 65535 ]);
expect(dataConsumer1.appData).toEqual({ baz: 'LOL' });

const dump = await router.dump();
Expand Down Expand Up @@ -132,9 +132,17 @@ test('dataConsumer.getStats() succeeds', async () =>

test('dataConsumer.setSubchannels() succeeds', async () =>
{
await dataConsumer1.setSubchannels([ 999, 999, 998, 65536 ]);
const expectedSubchannels = [ 0, 998, 999 ];

expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 998, 999 ]);
const promise = dataConsumer1.setSubchannels([ 999, 999, 998, 65536 ]);

// Before even subchannels are updated in worker side, its value in Node must
// already be updated.
expect(dataConsumer1.subchannels).toEqual(expectedSubchannels);

// And also once the promise resolves.
await promise;
expect(dataConsumer1.subchannels).toEqual(expectedSubchannels);
}, 2000);

test('transport.consumeData() on a DirectTransport succeeds', async () =>
Expand Down
6 changes: 1 addition & 5 deletions rust/tests/integration/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,7 @@ fn consume_data_succeeds() {
}
assert_eq!(data_consumer.label().as_str(), "foo");
assert_eq!(data_consumer.protocol().as_str(), "bar");

let mut sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [0, 1, 2, 100, 65535]);
assert_eq!(data_consumer.subchannels(), [0, 1, 2, 100, 65535]);
assert_eq!(
data_consumer
.app_data()
Expand Down
4 changes: 2 additions & 2 deletions worker/include/RTC/DataConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "Channel/ChannelSocket.hpp"
#include "RTC/SctpDictionaries.hpp"
#include "RTC/Shared.hpp"
#include <absl/container/flat_hash_set.h>
#include <absl/container/btree_set.h>
#include <string>

namespace RTC
Expand Down Expand Up @@ -126,7 +126,7 @@ namespace RTC
RTC::SctpStreamParameters sctpStreamParameters;
std::string label;
std::string protocol;
absl::flat_hash_set<uint16_t> subchannels;
absl::btree_set<uint16_t> subchannels;
bool transportConnected{ false };
bool sctpAssociationConnected{ false };
bool paused{ false };
Expand Down

0 comments on commit 3d54749

Please sign in to comment.