From b031fb99c638a1069e86025e9e08ab3846cd1ff1 Mon Sep 17 00:00:00 2001 From: Mikhail Baranov Date: Fri, 25 Oct 2024 17:17:39 +0200 Subject: [PATCH] Dynamic padding in packetizer for compressive frame encoders (#758) --- .../roc_audio/iframe_encoder.h | 8 ++++ src/internal_modules/roc_audio/packetizer.cpp | 41 +++++++++++++++---- src/internal_modules/roc_audio/packetizer.h | 10 ++++- .../roc_audio/pcm_encoder.cpp | 4 ++ src/internal_modules/roc_audio/pcm_encoder.h | 3 ++ .../roc_pipeline/sender_session.cpp | 5 ++- src/tests/roc_audio/test_packetizer.cpp | 18 ++++---- 7 files changed, 67 insertions(+), 22 deletions(-) diff --git a/src/internal_modules/roc_audio/iframe_encoder.h b/src/internal_modules/roc_audio/iframe_encoder.h index 0a888a671..485eff61b 100644 --- a/src/internal_modules/roc_audio/iframe_encoder.h +++ b/src/internal_modules/roc_audio/iframe_encoder.h @@ -62,6 +62,14 @@ class IFrameEncoder { //! After this call, the frame is fully encoded and no more samples is written //! to the frame. A new frame should be started by calling begin_frame(). virtual void end_frame() = 0; + + //! If this encoder implies different frame sizes. + //! + //! @remarks + //! If this is compression encoder, probably its output frame sizes are dependant to + //! underlying content, so if some part of sending pipeline needs steady packet sizes + //! some padding might be required. + virtual bool variable_frame_bytecount() const = 0; }; } // namespace audio diff --git a/src/internal_modules/roc_audio/packetizer.cpp b/src/internal_modules/roc_audio/packetizer.cpp index 3a1083562..e3a2727e7 100644 --- a/src/internal_modules/roc_audio/packetizer.cpp +++ b/src/internal_modules/roc_audio/packetizer.cpp @@ -15,7 +15,8 @@ namespace roc { namespace audio { -Packetizer::Packetizer(packet::IWriter& writer, +Packetizer::Packetizer(core::IArena& arena, + packet::IWriter& writer, packet::IComposer& composer, packet::ISequencer& sequencer, IFrameEncoder& payload_encoder, @@ -26,6 +27,7 @@ Packetizer::Packetizer(packet::IWriter& writer, , composer_(composer) , sequencer_(sequencer) , payload_encoder_(payload_encoder) + , do_padding_(payload_encoder.variable_frame_bytecount()) , packet_factory_(packet_factory) , sample_spec_(sample_spec) , samples_per_packet_(0) @@ -33,6 +35,7 @@ Packetizer::Packetizer(packet::IWriter& writer, , packet_pos_(0) , packet_cts_(0) , capture_ts_(0) + , pkt_sz_stats_(arena, 1000) , init_status_(status::NoStatus) { roc_panic_if_msg(!sample_spec_.is_valid() || !sample_spec_.is_raw(), "packetizer: required valid sample spec with raw format: %s", @@ -49,7 +52,9 @@ Packetizer::Packetizer(packet::IWriter& writer, } samples_per_packet_ = sample_spec.ns_2_stream_timestamp(packet_length); - payload_size_ = payload_encoder.encoded_byte_count(samples_per_packet_); + payload_size_ = do_padding_ + ? sz_ceil_(payload_encoder.encoded_byte_count(samples_per_packet_)) + : payload_encoder.encoded_byte_count(samples_per_packet_); roc_log( LogDebug, @@ -161,9 +166,15 @@ status::StatusCode Packetizer::end_packet_() { // Fill protocol-specific fields. sequencer_.next(*packet_, packet_cts_, (packet::stream_timestamp_t)packet_pos_); - // Apply padding if needed. - if (packet_pos_ < samples_per_packet_) { - pad_packet_(written_payload_size); + if (do_padding_) { + const size_t round_payload_size = sz_ceil_(written_payload_size); + pkt_sz_stats_.add(round_payload_size); + pad_packet_(written_payload_size, pkt_sz_stats_.mov_max()); + } else { + // Apply padding if needed. + if (packet_pos_ < samples_per_packet_) { + pad_packet_(written_payload_size, payload_size_); + } } const status::StatusCode code = writer_.write(packet_); @@ -208,14 +219,26 @@ status::StatusCode Packetizer::create_packet_() { return status::StatusOK; } -void Packetizer::pad_packet_(size_t written_payload_size) { - if (written_payload_size == payload_size_) { +void Packetizer::pad_packet_(size_t written_payload_size, size_t required_sz) { + if (written_payload_size == required_sz) { return; } - if (!composer_.pad(*packet_, payload_size_ - written_payload_size)) { + if (!composer_.pad(*packet_, required_sz - written_payload_size)) { roc_panic("packetizer: can't pad packet: orig_size=%lu actual_size=%lu", - (unsigned long)payload_size_, (unsigned long)written_payload_size); + (unsigned long)required_sz, (unsigned long)written_payload_size); + } +} + +size_t Packetizer::sz_ceil_(size_t sz) { + const size_t round_up_to = 0x40; + const size_t mask = round_up_to - 1; + const size_t inv_mask = ((size_t)-1) ^ mask; + + if ((sz & mask) > 0) { + return sz & inv_mask + round_up_to; + } else { + return sz & inv_mask; } } diff --git a/src/internal_modules/roc_audio/packetizer.h b/src/internal_modules/roc_audio/packetizer.h index aa2d5b039..bb9c58755 100644 --- a/src/internal_modules/roc_audio/packetizer.h +++ b/src/internal_modules/roc_audio/packetizer.h @@ -16,6 +16,7 @@ #include "roc_audio/iframe_writer.h" #include "roc_audio/sample.h" #include "roc_audio/sample_spec.h" +#include "roc_core/mov_stats.h" #include "roc_core/noncopyable.h" #include "roc_core/time.h" #include "roc_packet/icomposer.h" @@ -60,7 +61,8 @@ class Packetizer : public IFrameWriter, public core::NonCopyable<> { //! - @p buffer_factory is used to allocate buffers for packets //! - @p packet_length defines packet length in nanoseconds //! - @p sample_spec describes input frames - Packetizer(packet::IWriter& writer, + Packetizer(core::IArena& arena, + packet::IWriter& writer, packet::IComposer& composer, packet::ISequencer& sequencer, IFrameEncoder& payload_encoder, @@ -90,12 +92,15 @@ class Packetizer : public IFrameWriter, public core::NonCopyable<> { status::StatusCode end_packet_(); status::StatusCode create_packet_(); - void pad_packet_(size_t written_payload_size); + void pad_packet_(size_t written_payload_size, size_t required_sz); + + size_t sz_ceil_(size_t sz); packet::IWriter& writer_; packet::IComposer& composer_; packet::ISequencer& sequencer_; IFrameEncoder& payload_encoder_; + const bool do_padding_; packet::PacketFactory& packet_factory_; @@ -110,6 +115,7 @@ class Packetizer : public IFrameWriter, public core::NonCopyable<> { core::nanoseconds_t capture_ts_; PacketizerMetrics metrics_; + core::MovStats pkt_sz_stats_; status::StatusCode init_status_; }; diff --git a/src/internal_modules/roc_audio/pcm_encoder.cpp b/src/internal_modules/roc_audio/pcm_encoder.cpp index 53dc3d089..71bf1bbc4 100644 --- a/src/internal_modules/roc_audio/pcm_encoder.cpp +++ b/src/internal_modules/roc_audio/pcm_encoder.cpp @@ -72,5 +72,9 @@ void PcmEncoder::end_frame() { frame_bit_off_ = 0; } +bool PcmEncoder::variable_frame_bytecount() const { + return false; +} + } // namespace audio } // namespace roc diff --git a/src/internal_modules/roc_audio/pcm_encoder.h b/src/internal_modules/roc_audio/pcm_encoder.h index 9cbc4037c..eec884b3a 100644 --- a/src/internal_modules/roc_audio/pcm_encoder.h +++ b/src/internal_modules/roc_audio/pcm_encoder.h @@ -44,6 +44,9 @@ class PcmEncoder : public IFrameEncoder, public core::NonCopyable<> { //! Finish encoding frame. virtual void end_frame(); + //! This encoder has direct mapping from number of samples to a frame size. + virtual bool variable_frame_bytecount() const; + private: PcmMapper pcm_mapper_; const size_t n_chans_; diff --git a/src/internal_modules/roc_pipeline/sender_session.cpp b/src/internal_modules/roc_pipeline/sender_session.cpp index 796c433da..64aaa81e1 100644 --- a/src/internal_modules/roc_pipeline/sender_session.cpp +++ b/src/internal_modules/roc_pipeline/sender_session.cpp @@ -150,8 +150,9 @@ SenderSession::create_transport_pipeline(SenderEndpoint* source_endpoint, pkt_encoding->sample_spec.channel_set()); packetizer_.reset(new (packetizer_) audio::Packetizer( - *pkt_writer, source_endpoint->outbound_composer(), *sequencer_, - *payload_encoder_, packet_factory_, sink_config_.packet_length, in_spec)); + arena_, *pkt_writer, source_endpoint->outbound_composer(), + *sequencer_, *payload_encoder_, packet_factory_, sink_config_.packet_length, + in_spec)); if ((status = packetizer_->init_status()) != status::StatusOK) { return status; } diff --git a/src/tests/roc_audio/test_packetizer.cpp b/src/tests/roc_audio/test_packetizer.cpp index 3ad9d57d4..4267d5fe1 100644 --- a/src/tests/roc_audio/test_packetizer.cpp +++ b/src/tests/roc_audio/test_packetizer.cpp @@ -207,8 +207,8 @@ TEST(packetizer, one_buffer_one_packet) { rtp::Identity identity; rtp::Sequencer sequencer(identity, PayloadType); - Packetizer packetizer(packet_queue, rtp_composer, sequencer, encoder, packet_factory, - PacketDuration, frame_spec); + Packetizer packetizer(arena, packet_queue, rtp_composer, sequencer, encoder, + packet_factory, PacketDuration, frame_spec); LONGS_EQUAL(status::StatusOK, packetizer.init_status()); FrameMaker frame_maker; @@ -235,7 +235,7 @@ TEST(packetizer, one_buffer_multiple_packets) { rtp::Identity identity; rtp::Sequencer sequencer(identity, PayloadType); - Packetizer packetizer(packet_queue, rtp_composer, sequencer, encoder, packet_factory, + Packetizer packetizer(arena, packet_queue, rtp_composer, sequencer, encoder, packet_factory, PacketDuration, frame_spec); LONGS_EQUAL(status::StatusOK, packetizer.init_status()); @@ -263,7 +263,7 @@ TEST(packetizer, multiple_buffers_one_packet) { rtp::Identity identity; rtp::Sequencer sequencer(identity, PayloadType); - Packetizer packetizer(packet_queue, rtp_composer, sequencer, encoder, packet_factory, + Packetizer packetizer(arena, packet_queue, rtp_composer, sequencer, encoder, packet_factory, PacketDuration, frame_spec); LONGS_EQUAL(status::StatusOK, packetizer.init_status()); @@ -297,7 +297,7 @@ TEST(packetizer, multiple_buffers_multiple_packets) { rtp::Identity identity; rtp::Sequencer sequencer(identity, PayloadType); - Packetizer packetizer(packet_queue, rtp_composer, sequencer, encoder, packet_factory, + Packetizer packetizer(arena, packet_queue, rtp_composer, sequencer, encoder, packet_factory, PacketDuration, frame_spec); LONGS_EQUAL(status::StatusOK, packetizer.init_status()); @@ -325,7 +325,7 @@ TEST(packetizer, flush) { rtp::Identity identity; rtp::Sequencer sequencer(identity, PayloadType); - Packetizer packetizer(packet_queue, rtp_composer, sequencer, encoder, packet_factory, + Packetizer packetizer(arena, packet_queue, rtp_composer, sequencer, encoder, packet_factory, PacketDuration, frame_spec); LONGS_EQUAL(status::StatusOK, packetizer.init_status()); @@ -364,7 +364,7 @@ TEST(packetizer, timestamp_zero_cts) { rtp::Identity identity; rtp::Sequencer sequencer(identity, PayloadType); - Packetizer packetizer(packet_queue, rtp_composer, sequencer, encoder, packet_factory, + Packetizer packetizer(arena, packet_queue, rtp_composer, sequencer, encoder, packet_factory, PacketDuration, frame_spec); LONGS_EQUAL(status::StatusOK, packetizer.init_status()); @@ -392,7 +392,7 @@ TEST(packetizer, metrics) { rtp::Identity identity; rtp::Sequencer sequencer(identity, PayloadType); - Packetizer packetizer(packet_queue, rtp_composer, sequencer, encoder, packet_factory, + Packetizer packetizer(arena, packet_queue, rtp_composer, sequencer, encoder, packet_factory, PacketDuration, frame_spec); LONGS_EQUAL(status::StatusOK, packetizer.init_status()); @@ -417,7 +417,7 @@ TEST(packetizer, forward_error) { rtp::Identity identity; rtp::Sequencer sequencer(identity, PayloadType); - Packetizer packetizer(packet_writer, rtp_composer, sequencer, encoder, packet_factory, + Packetizer packetizer(arena, packet_writer, rtp_composer, sequencer, encoder, packet_factory, PacketDuration, frame_spec); LONGS_EQUAL(status::StatusOK, packetizer.init_status());