From d5d04a68c9eb2cd7687353f67c4897114d5ae200 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 12 Dec 2023 17:54:30 +0100 Subject: [PATCH] DataConsumer: Add `addSubchannel()` and `removeSubchannel() **TODO:** Rust, but I won't do it until https://github.com/versatica/mediasoup/pull/1262 is done and merged. --- CHANGELOG.md | 1 + node/src/DataConsumer.ts | 55 +++++++++++++++++++++++++++ node/src/tests/test-DataConsumer.ts | 24 ++++++++++++ worker/fbs/dataConsumer.fbs | 16 ++++++++ worker/fbs/request.fbs | 4 ++ worker/fbs/response.fbs | 2 + worker/src/Channel/ChannelRequest.cpp | 2 + worker/src/RTC/DataConsumer.cpp | 48 +++++++++++++++++++++++ 8 files changed, 152 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e545dd3208..c06f535410 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * liburing: avoid extra memcpy on RTP ([PR #1258](https://github.com/versatica/mediasoup/pull/1258)). * libsrtp: use our own fork ([PR #1260](https://github.com/versatica/mediasoup/pull/1260)). +* `DataConsumer`: Add `addSubchannel()` and `removeSubchannel()` methods ([PR #XXXX](https://github.com/versatica/mediasoup/pull/XXXX)). ### 3.13.10 diff --git a/node/src/DataConsumer.ts b/node/src/DataConsumer.ts index 12b7e5db68..52ccd0284b 100644 --- a/node/src/DataConsumer.ts +++ b/node/src/DataConsumer.ts @@ -586,6 +586,61 @@ export class DataConsumer this.#subchannels = utils.parseVector(data, 'subchannels'); } + /** + * Add a subchannel. + */ + async addSubchannel(subchannel: number): Promise + { + logger.debug('addSubchannel()'); + + /* Build Request. */ + const requestOffset = + FbsDataConsumer.AddSubchannelRequest.createAddSubchannelRequest( + this.#channel.bufferBuilder, subchannel); + + const response = await this.#channel.request( + FbsRequest.Method.DATACONSUMER_ADD_SUBCHANNEL, + FbsRequest.Body.DataConsumer_AddSubchannelRequest, + requestOffset, + this.#internal.dataConsumerId + ); + + /* Decode Response. */ + const data = new FbsDataConsumer.AddSubchannelResponse(); + + response.body(data); + + // Update subchannels. + this.#subchannels = utils.parseVector(data, 'subchannels'); + } + + /** + * Remove a subchannel. + */ + async removeSubchannel(subchannel: number): Promise + { + logger.debug('removeSubchannel()'); + + /* Build Request. */ + const requestOffset = FbsDataConsumer.RemoveSubchannelRequest. + createRemoveSubchannelRequest(this.#channel.bufferBuilder, subchannel); + + const response = await this.#channel.request( + FbsRequest.Method.DATACONSUMER_REMOVE_SUBCHANNEL, + FbsRequest.Body.DataConsumer_RemoveSubchannelRequest, + requestOffset, + this.#internal.dataConsumerId + ); + + /* Decode Response. */ + const data = new FbsDataConsumer.RemoveSubchannelResponse(); + + response.body(data); + + // Update subchannels. + this.#subchannels = utils.parseVector(data, 'subchannels'); + } + private handleWorkerNotifications(): void { this.#channel.on(this.#internal.dataConsumerId, (event: Event, data?: Notification) => diff --git a/node/src/tests/test-DataConsumer.ts b/node/src/tests/test-DataConsumer.ts index 5bbf33fa0c..cfd75c0cdf 100644 --- a/node/src/tests/test-DataConsumer.ts +++ b/node/src/tests/test-DataConsumer.ts @@ -137,6 +137,30 @@ test('dataConsumer.setSubchannels() succeeds', async () => expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 998, 999 ]); }, 2000); +test('dataConsumer.addSubchannel() and .removeSubchannel() succeed', async () => +{ + await dataConsumer1.setSubchannels([ ]); + expect(dataConsumer1.subchannels).toEqual([ ]); + + await dataConsumer1.addSubchannel(5); + expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5 ]); + + await dataConsumer1.addSubchannel(10); + expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]); + + await dataConsumer1.addSubchannel(5); + expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]); + + await dataConsumer1.removeSubchannel(666); + expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]); + + await dataConsumer1.removeSubchannel(5); + expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 10 ]); + + await dataConsumer1.setSubchannels([ ]); + expect(dataConsumer1.subchannels).toEqual([ ]); +}, 2000); + test('transport.consumeData() on a DirectTransport succeeds', async () => { const onObserverNewDataConsumer = jest.fn(); diff --git a/worker/fbs/dataConsumer.fbs b/worker/fbs/dataConsumer.fbs index 2bebe33191..2670288a58 100644 --- a/worker/fbs/dataConsumer.fbs +++ b/worker/fbs/dataConsumer.fbs @@ -47,6 +47,22 @@ table SetSubchannelsResponse { subchannels: [uint16] (required); } +table AddSubchannelRequest { + subchannel: uint16; +} + +table AddSubchannelResponse { + subchannels: [uint16] (required); +} + +table RemoveSubchannelRequest { + subchannel: uint16; +} + +table RemoveSubchannelResponse { + subchannels: [uint16] (required); +} + // Notifications from Worker. table BufferedAmountLowNotification { diff --git a/worker/fbs/request.fbs b/worker/fbs/request.fbs index 56eca5674d..d9f869e878 100644 --- a/worker/fbs/request.fbs +++ b/worker/fbs/request.fbs @@ -72,6 +72,8 @@ enum Method: uint8 { DATACONSUMER_SET_BUFFERED_AMOUNT_LOW_THRESHOLD, DATACONSUMER_SEND, DATACONSUMER_SET_SUBCHANNELS, + DATACONSUMER_ADD_SUBCHANNEL, + DATACONSUMER_REMOVE_SUBCHANNEL, RTPOBSERVER_PAUSE, RTPOBSERVER_RESUME, RTPOBSERVER_ADD_PRODUCER, @@ -114,6 +116,8 @@ union Body { DataConsumer_SetBufferedAmountLowThresholdRequest: FBS.DataConsumer.SetBufferedAmountLowThresholdRequest, DataConsumer_SendRequest: FBS.DataConsumer.SendRequest, DataConsumer_SetSubchannelsRequest: FBS.DataConsumer.SetSubchannelsRequest, + DataConsumer_AddSubchannelRequest: FBS.DataConsumer.AddSubchannelRequest, + DataConsumer_RemoveSubchannelRequest: FBS.DataConsumer.RemoveSubchannelRequest, RtpObserver_AddProducerRequest: FBS.RtpObserver.AddProducerRequest, RtpObserver_RemoveProducerRequest: FBS.RtpObserver.RemoveProducerRequest, } diff --git a/worker/fbs/response.fbs b/worker/fbs/response.fbs index 539d36a72a..707e5750ef 100644 --- a/worker/fbs/response.fbs +++ b/worker/fbs/response.fbs @@ -40,6 +40,8 @@ union Body { DataConsumer_DumpResponse: FBS.DataConsumer.DumpResponse, DataConsumer_GetStatsResponse: FBS.DataConsumer.GetStatsResponse, DataConsumer_SetSubchannelsResponse: FBS.DataConsumer.SetSubchannelsResponse, + DataConsumer_AddSubchannelResponse: FBS.DataConsumer.AddSubchannelResponse, + DataConsumer_RemoveSubchannelResponse: FBS.DataConsumer.RemoveSubchannelResponse } table Response { diff --git a/worker/src/Channel/ChannelRequest.cpp b/worker/src/Channel/ChannelRequest.cpp index adf62ba28e..ed1ef58395 100644 --- a/worker/src/Channel/ChannelRequest.cpp +++ b/worker/src/Channel/ChannelRequest.cpp @@ -75,6 +75,8 @@ namespace Channel { FBS::Request::Method::DATACONSUMER_SET_BUFFERED_AMOUNT_LOW_THRESHOLD, "dataConsumer.setBufferedAmountLowThreshold" }, { FBS::Request::Method::DATACONSUMER_SEND, "dataConsumer.send" }, { FBS::Request::Method::DATACONSUMER_SET_SUBCHANNELS, "dataConsumer.setSubchannels" }, + { FBS::Request::Method::DATACONSUMER_ADD_SUBCHANNEL, "dataConsumer.addSubchannel" }, + { FBS::Request::Method::DATACONSUMER_REMOVE_SUBCHANNEL, "dataConsumer.removeSubchannel" }, { FBS::Request::Method::RTPOBSERVER_PAUSE, "rtpObserver.pause" }, { FBS::Request::Method::RTPOBSERVER_RESUME, "rtpObserver.resume" }, { FBS::Request::Method::RTPOBSERVER_ADD_PRODUCER, "rtpObserver.addProducer" }, diff --git a/worker/src/RTC/DataConsumer.cpp b/worker/src/RTC/DataConsumer.cpp index 3f5fd69ffd..fdabc812a5 100644 --- a/worker/src/RTC/DataConsumer.cpp +++ b/worker/src/RTC/DataConsumer.cpp @@ -339,6 +339,54 @@ namespace RTC break; } + case Channel::ChannelRequest::Method::DATACONSUMER_ADD_SUBCHANNEL: + { + const auto* body = request->data->body_as(); + + this->subchannels.insert(body->subchannel()); + + std::vector subchannels; + + subchannels.reserve(this->subchannels.size()); + + for (auto subchannel : this->subchannels) + { + subchannels.emplace_back(subchannel); + } + + // Create response. + auto responseOffset = FBS::DataConsumer::CreateAddSubchannelResponseDirect( + request->GetBufferBuilder(), std::addressof(subchannels)); + + request->Accept(FBS::Response::Body::DataConsumer_AddSubchannelResponse, responseOffset); + + break; + } + + case Channel::ChannelRequest::Method::DATACONSUMER_REMOVE_SUBCHANNEL: + { + const auto* body = request->data->body_as(); + + this->subchannels.erase(body->subchannel()); + + std::vector subchannels; + + subchannels.reserve(this->subchannels.size()); + + for (auto subchannel : this->subchannels) + { + subchannels.emplace_back(subchannel); + } + + // Create response. + auto responseOffset = FBS::DataConsumer::CreateRemoveSubchannelResponseDirect( + request->GetBufferBuilder(), std::addressof(subchannels)); + + request->Accept(FBS::Response::Body::DataConsumer_RemoveSubchannelResponse, responseOffset); + + break; + } + default: { MS_THROW_ERROR("unknown method '%s'", request->methodCStr);