From 09d5909adcf68281e6383381d20be907eb1fee0d Mon Sep 17 00:00:00 2001 From: Mikhail Baranov Date: Sat, 4 Nov 2023 23:16:40 +0100 Subject: [PATCH] :WIP rtp stream calculations --- src/internal_modules/roc_core/mov_stats.h | 155 ++++++++++++++++++ src/internal_modules/roc_packet/rtp.cpp | 3 +- src/internal_modules/roc_packet/rtp.h | 4 + .../roc_packet/target_libuv/roc_packet/udp.h | 6 + .../roc_pipeline/receiver_session.cpp | 5 +- src/internal_modules/roc_rtp/populator.cpp | 8 +- src/internal_modules/roc_rtp/populator.h | 4 +- .../roc_rtp/stream_stats_monitor.cpp | 91 ++++++++++ .../roc_rtp/stream_stats_monitor.h | 80 +++++++++ src/tests/roc_core/test_mov_stats.cpp | 90 ++++++++++ src/tests/roc_rtp/test_populator.cpp | 28 +++- 11 files changed, 465 insertions(+), 9 deletions(-) create mode 100644 src/internal_modules/roc_core/mov_stats.h create mode 100644 src/internal_modules/roc_rtp/stream_stats_monitor.cpp create mode 100644 src/internal_modules/roc_rtp/stream_stats_monitor.h create mode 100644 src/tests/roc_core/test_mov_stats.cpp diff --git a/src/internal_modules/roc_core/mov_stats.h b/src/internal_modules/roc_core/mov_stats.h new file mode 100644 index 0000000000..9731cfe2ce --- /dev/null +++ b/src/internal_modules/roc_core/mov_stats.h @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2019 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +//! @file roc_core/mov_stats.h +//! @brief Profiler. + +#ifndef ROC_TOOLKIT_MOV_STATS_H +#define ROC_TOOLKIT_MOV_STATS_H + +#include "roc_core/array.h" +#include "roc_core/iarena.h" +#include "roc_core/panic.h" + +namespace roc { +namespace core { + +//! Rolling window moving average and variance. +//! +//! Efficiently implements moving average and variance based on approach +//! described in https://www.dsprelated.com/showthread/comp.dsp/97276-1.php +//! +//! @tparam T defines a sample type. +template +class MovStats { +public: + //! Initialize. + MovStats(IArena& arena, const size_t win_len) + : buffer_(arena) + , buffer2_(arena) + , win_len_(win_len) + , buffer_i_(0) + , movsum_(T(0)) + , movsum2_(T(0)) + , mov_var_(T(0)) + , full_(false) + , mov_max_cntr_(0) + , first_(true) + { + if(!buffer_.resize(win_len)){ + roc_panic("MovStats: can't allocate storage for the ring buffer"); + } + if(!buffer2_.resize(win_len)){ + roc_panic("MovStats: can't allocate storage for the ring buffer"); + } + memset(buffer_.data(), 0, sizeof(T)*buffer_.size()); + memset(buffer2_.data(), 0, sizeof(T)*buffer2_.size()); + } + + //! Shift rolling window by one sample x. + void add(const T& x) + { + const T x2 = x*x; + const T x_old = buffer_[buffer_i_]; + buffer_[buffer_i_] = x; + const T x2_old = buffer2_[buffer_i_]; + buffer2_[buffer_i_] = x2; + + movsum_ += x - x_old; + movsum2_ += x2 - x2_old; + + if (first_) { + first_ = false; + mov_max_ = x; + mov_max_cntr_++; + } else { + if (x > mov_max_) { + mov_max_ = x; + mov_max_cntr_ = 1; + } else if (x == mov_max_) { + mov_max_cntr_++; + } + + if (mov_max_ == x_old) { + mov_ + } + } + + buffer_i_++; + if (buffer_i_ == win_len_) { + buffer_i_ = 0; + full_ = true; + } + + } + + //! Get moving average value. + T mov_avg() const + { + const T n = full_ ? T(win_len_) : T(buffer_i_ + 1); + return movsum_ / n; + } + + //! Get variance. + T mov_var() const + { + const T n = full_ ? T(win_len_) : T(buffer_i_+1); + if (n == 1) { + return (T)sqrt(movsum2_ - movsum_ * movsum_); + } else { + return (T)sqrt((n*movsum2_ - movsum_ * movsum_) / (n * n)); + } + } + + //! Extend rolling window length. + //! @remarks + //! Potentially could cause a gap in the estimated values as + //! decreases effective window size by dropping samples to the right from + //! the cursor in the ring buffers: + //! buffer_i_ win_len_ old win_len_ new + //! ↓ ↓ ↓ + //! [■■■■■■■■■■□□□□□□□□□□□□□□□□□□□□□--------------------] + //! ↑ ↑ ↑ + //! Dropped samples. + void extend_win(const size_t new_win) + { + if (new_win <= win_len_) { + roc_panic("MovStats: the window length can only grow"); + } + if (!buffer_.resize(new_win)) { + roc_panic("MovStats: can not increase storage"); + } + + movsum_ = 0; + movsum2_ = 0; + for (size_t i = 0; i < buffer_i_; i++) { + movsum_ += buffer_[i]; + movsum2_ += buffer2_[i]; + } + full_ = false; + } + +private: + Array buffer_; + Array buffer2_; + const size_t win_len_; + size_t buffer_i_; + T movsum_; + T movsum2_; + T mov_var_; + T mov_max_; + size_t mov_max_cntr_; + + bool full_; + bool first_; +}; + +} // namespace core +} // namespace roc + +#endif // ROC_TOOLKIT_MOV_STATS_H diff --git a/src/internal_modules/roc_packet/rtp.cpp b/src/internal_modules/roc_packet/rtp.cpp index 3b1c835c2e..ddf3113353 100644 --- a/src/internal_modules/roc_packet/rtp.cpp +++ b/src/internal_modules/roc_packet/rtp.cpp @@ -18,7 +18,8 @@ RTP::RTP() , duration(0) , capture_timestamp(0) , marker(false) - , payload_type(0) { + , payload_type(0) + , fec_recovered(false) { } int RTP::compare(const RTP& other) const { diff --git a/src/internal_modules/roc_packet/rtp.h b/src/internal_modules/roc_packet/rtp.h index 00401d2ed9..321b94267a 100644 --- a/src/internal_modules/roc_packet/rtp.h +++ b/src/internal_modules/roc_packet/rtp.h @@ -75,6 +75,10 @@ struct RTP { //! Packet payload type ("pt"). unsigned int payload_type; + //! Internal flag, valid for receiver side only. + //! Signals if this packet was recovered by FEC. + bool fec_recovered; + //! Packet header. core::Slice header; diff --git a/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h b/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h index 58291cad5e..776c508a58 100644 --- a/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h +++ b/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h @@ -31,6 +31,12 @@ struct UDP { //! Sender request state. uv_udp_send_t request; + + //! When the packet was put into jitter-buffer queue. + //! It points to a moment when the packet was transferred to a sink-thread, that + //! "consumes" this packet. The reason to have it separate is that this allows + //! us to account additional jitter introduced by thread-switch time. + core::nanoseconds_t enqueue_ts; }; } // namespace packet diff --git a/src/internal_modules/roc_pipeline/receiver_session.cpp b/src/internal_modules/roc_pipeline/receiver_session.cpp index 73de147945..e0e3bf112f 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session.cpp @@ -65,7 +65,7 @@ ReceiverSession::ReceiverSession( preader = validator_.get(); populator_.reset(new (populator_) rtp::Populator(*preader, *payload_decoder_, - encoding->sample_spec)); + encoding->sample_spec, false)); if (!populator_) { return; } @@ -115,7 +115,7 @@ ReceiverSession::ReceiverSession( preader = fec_validator_.get(); fec_populator_.reset(new (fec_populator_) rtp::Populator( - *preader, *payload_decoder_, encoding->sample_spec)); + *preader, *payload_decoder_, encoding->sample_spec, true)); if (!fec_populator_) { return; } @@ -221,6 +221,7 @@ status::StatusCode ReceiverSession::route(const packet::PacketPtr& packet) { roc_panic_if(!is_valid()); packet::UDP* udp = packet->udp(); + packet->udp()->enqueue_ts = core::timestamp(core::ClockUnix); if (!udp) { // TODO(gh-183): return StatusNoRoute return status::StatusUnknown; diff --git a/src/internal_modules/roc_rtp/populator.cpp b/src/internal_modules/roc_rtp/populator.cpp index f574a55082..50c1a85918 100644 --- a/src/internal_modules/roc_rtp/populator.cpp +++ b/src/internal_modules/roc_rtp/populator.cpp @@ -14,10 +14,12 @@ namespace rtp { Populator::Populator(packet::IReader& reader, audio::IFrameDecoder& decoder, - const audio::SampleSpec& sample_spec) + const audio::SampleSpec& sample_spec, + const bool set_recovered) : reader_(reader) , decoder_(decoder) - , sample_spec_(sample_spec) { + , sample_spec_(sample_spec) + , set_recovered_(set_recovered) { } status::StatusCode Populator::read(packet::PacketPtr& packet) { @@ -36,6 +38,8 @@ status::StatusCode Populator::read(packet::PacketPtr& packet) { packet->rtp()->payload.data(), packet->rtp()->payload.size()); } + packet->rtp()->fec_recovered = set_recovered_; + return status::StatusOK; } diff --git a/src/internal_modules/roc_rtp/populator.h b/src/internal_modules/roc_rtp/populator.h index 40dd3572ab..ab00a216f9 100644 --- a/src/internal_modules/roc_rtp/populator.h +++ b/src/internal_modules/roc_rtp/populator.h @@ -26,7 +26,8 @@ class Populator : public packet::IReader, public core::NonCopyable<> { //! Initialize. Populator(packet::IReader& reader, audio::IFrameDecoder& decoder, - const audio::SampleSpec& sample_spec); + const audio::SampleSpec& sample_spec, + const bool set_recovered); //! Read next packet. virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr&); @@ -35,6 +36,7 @@ class Populator : public packet::IReader, public core::NonCopyable<> { packet::IReader& reader_; audio::IFrameDecoder& decoder_; const audio::SampleSpec sample_spec_; + const bool set_recovered_; }; } // namespace rtp diff --git a/src/internal_modules/roc_rtp/stream_stats_monitor.cpp b/src/internal_modules/roc_rtp/stream_stats_monitor.cpp new file mode 100644 index 0000000000..15578a756b --- /dev/null +++ b/src/internal_modules/roc_rtp/stream_stats_monitor.cpp @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "stream_stats_monitor.h" +#include "roc_status/status_code.h" + +namespace roc { +namespace rtp { + +StreamStatsMonitor::StreamStatsMonitor(packet::IReader& reader, + core::IArena& arena, + const audio::SampleSpec& sample_spec, + const StreamStatsConfig &config) + : reader_(reader) + , arena_(arena) + , config_(config) + , sample_spec_(sample_spec) + , rtp_(arena, config) +{} + +status::StatusCode StreamStatsMonitor::read(packet::PacketPtr& packet) +{ + status::StatusCode result = reader_.read(packet); + if (result == status::StatusOK){ + if (packet->rtp()) { + rtp_.process(packet); + } + } + + return result; +} +StreamStats StreamStatsMonitor::stats() const { + StreamStats result; + result + return StreamStats(); +} + +StreamStatsMonitor::RTPStats::RTPStats(core::IArena& arena, const StreamStatsConfig &config) + : lost_(0) + , jitter_processed_(0) + , prev_packet_enq_ts_(-1) + , prev_seqnum_(0) + , packet_jitter_stats_(arena, config.window_npackets) +{} + +bool StreamStatsMonitor::RTPStats::process(const packet::PacketPtr& packet) { + if (prev_packet_enq_ts_ == -1) { + prev_packet_enq_ts_ = packet->udp()->enqueue_ts; + + } else { + const size_t gap = gap_(packet); + // Compute jitter only on consequential packets. + if (gap == 0){ + const core::nanoseconds_t d_enq_ts = packet->udp()->enqueue_ts - prev_packet_enq_ts_; + const core::nanoseconds_t d_capt_ts = packet->rtp()->capture_timestamp - prev_capt_ts; + packet_jitter_stats_.add(d_enq_ts - d_capt_ts); + jitter_processed_++; + } else { + lost_ += gap; + } + } + + prev_packet_enq_ts_ = packet->udp()->enqueue_ts; + prev_seqnum_ = packet->rtp()->seqnum; + prev_stream_timestamp = packet->rtp()->stream_timestamp; + prev_capt_ts = packet->rtp()->capture_timestamp; + return false; +} + +size_t StreamStatsMonitor::RTPStats::gap_(const packet::PacketPtr& packet) const { + if (prev_packet_enq_ts_ == -1) { + roc_panic("RTPStats: attempt to detect gap on the first received packet"); + } + + return (size_t)abs(packet::seqnum_diff( packet->rtp()->seqnum, prev_seqnum_ + 1)); +} + +core::nanoseconds_t StreamStatsMonitor::RTPStats::mean_jitter() const { + return packet_jitter_stats_.mov_avg(); +} + +core::nanoseconds_t StreamStatsMonitor::RTPStats::var_jitter() const { + return packet_jitter_stats_.mov_var(); +} +} // namespace packet +} // namespace roc \ No newline at end of file diff --git a/src/internal_modules/roc_rtp/stream_stats_monitor.h b/src/internal_modules/roc_rtp/stream_stats_monitor.h new file mode 100644 index 0000000000..fcf10515bc --- /dev/null +++ b/src/internal_modules/roc_rtp/stream_stats_monitor.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +//! @file roc_rtp/stream_stats_monitor.h +//! @brief Calculates basic network stream statistics. + + +#ifndef ROC_PACKET_STREAMSTATSMONITOR_H_ +#define ROC_PACKET_STREAMSTATSMONITOR_H_ + +#include "roc_core/noncopyable.h" +#include "roc_packet/ireader.h" +#include "roc_core/mov_stats.h" +#include "roc_core/time.h" +#include "roc_audio/sample_spec.h" +#include "roc_packet/units.h" +#include "roc_status/status_code.h" + +namespace roc { +namespace rtp { + +struct StreamStatsConfig { + size_t window_npackets; +}; + +struct StreamStats { + core::nanoseconds_t windowed_max_jitter; + + size_t windowed_npackets; + size_t windowed_lost_packets; + size_t windowed_recovered_packets; + + float packet_loss_rate; +}; + +class StreamStatsMonitor : public packet::IReader, public core::NonCopyable<> { +public: + StreamStatsMonitor(packet::IReader& reader, core::IArena& arena, + const audio::SampleSpec& sample_spec, + const StreamStatsConfig &config); + + virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr& packet); + + ROC_ATTR_NODISCARD StreamStats stats() const; +private: + packet::IReader& reader_; + core::IArena& arena_; + const StreamStatsConfig config_; + const audio::SampleSpec sample_spec_; + + class RTPStats { + public: + RTPStats(core::IArena& arena, const StreamStatsConfig &config); + bool process(const packet::PacketPtr& packet); + + core::nanoseconds_t mean_jitter() const; + core::nanoseconds_t var_jitter() const; + private: + size_t lost_; + size_t jitter_processed_; + core::nanoseconds_t prev_packet_enq_ts_; + packet::seqnum_t prev_seqnum_; + packet::stream_timestamp_t prev_stream_timestamp; + core::nanoseconds_t prev_capt_ts; + core::MovStats packet_jitter_stats_; + + size_t gap_(const packet::PacketPtr &packet) const; + + } rtp_; +}; + +} // namespace packet +} // namespace roc + +#endif // ROC_PACKET_STREAMSTATSMONITOR_H_ diff --git a/src/tests/roc_core/test_mov_stats.cpp b/src/tests/roc_core/test_mov_stats.cpp new file mode 100644 index 0000000000..94638acc40 --- /dev/null +++ b/src/tests/roc_core/test_mov_stats.cpp @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include + +#include "roc_core/mov_stats.h" +#include "roc_core/heap_arena.h" + +namespace roc { +namespace core { + +namespace { + +enum { NumObjects = 10, EmbeddedCap = 5 }; + +struct Object { + static long n_objects; + + size_t value; + + Object(size_t v = 0) + : value(v) { + n_objects++; + } + + Object(const Object& other) + : value(other.value) { + n_objects++; + } + + ~Object() { + n_objects--; + } +}; + +long Object::n_objects = 0; + +} // namespace + +TEST_GROUP(movstats) { + HeapArena arena; +}; + +TEST(movstats, single_pass) { + const size_t n = 10; + MovStats stats(arena, n); + const int64_t target_avg = (n-1) * n / 2; + int64_t target_var = 0; + for (size_t i = 0; i < n; i++) { + const int64_t x = int64_t(i * n); + stats.add(x); + target_var += (x - target_avg) * (x - target_avg); + } + target_var = (int64_t) sqrt(target_var / (int64_t)n); + + LONGS_EQUAL(target_avg, stats.mov_avg()); + LONGS_EQUAL(target_var, stats.mov_var()); +} + +TEST(movstats, one_n_half_pass) { + const size_t n = 10; + MovStats stats(arena, n); + for (size_t i = 0; i < (n * 10 + n/2); i++) { + const int64_t x = (int64_t) pow(-1., (double)i); + stats.add(x); + } + + LONGS_EQUAL(0, stats.mov_avg()); + LONGS_EQUAL(1, stats.mov_var()); + + const int64_t target_avg = (n-1) * n / 2; + int64_t target_var = 0; + for (size_t i = 0; i < n; i++) { + const int64_t x = int64_t(i * n); + stats.add(x); + target_var += (x - target_avg) * (x - target_avg); + } + target_var = (int64_t) sqrt(target_var / (int64_t)n); + + LONGS_EQUAL(target_avg, stats.mov_avg()); + LONGS_EQUAL(target_var, stats.mov_var()); +} + +} // namespace core +} // namespace roc diff --git a/src/tests/roc_rtp/test_populator.cpp b/src/tests/roc_rtp/test_populator.cpp index abeaf71bdd..72f79f036b 100644 --- a/src/tests/roc_rtp/test_populator.cpp +++ b/src/tests/roc_rtp/test_populator.cpp @@ -64,7 +64,7 @@ TEST(populator, failed_to_read_packet) { for (unsigned n = 0; n < ROC_ARRAY_SIZE(codes); ++n) { test::StatusReader reader(codes[n]); audio::PcmDecoder decoder(PcmFmt, SampleSpec); - Populator populator(reader, decoder, SampleSpec); + Populator populator(reader, decoder, SampleSpec, false); packet::PacketPtr pp; LONGS_EQUAL(codes[n], populator.read(pp)); @@ -75,7 +75,7 @@ TEST(populator, failed_to_read_packet) { TEST(populator, empty_duration) { packet::Queue queue; audio::PcmDecoder decoder(PcmFmt, SampleSpec); - Populator populator(queue, decoder, SampleSpec); + Populator populator(queue, decoder, SampleSpec, false); const packet::stream_timestamp_t packet_duration = 0; const packet::stream_timestamp_t expected_duration = 32; @@ -89,12 +89,13 @@ TEST(populator, empty_duration) { CHECK(wp == rp); LONGS_EQUAL(expected_duration, rp->rtp()->duration); + CHECK_EQUAL(false, rp->rtp()->fec_recovered); } TEST(populator, non_empty_duration) { packet::Queue queue; audio::PcmDecoder decoder(PcmFmt, SampleSpec); - Populator populator(queue, decoder, SampleSpec); + Populator populator(queue, decoder, SampleSpec, false); const packet::stream_timestamp_t duration = 100; @@ -108,6 +109,27 @@ TEST(populator, non_empty_duration) { CHECK(rp); CHECK(wp == rp); LONGS_EQUAL(duration, rp->rtp()->duration); + CHECK_EQUAL(false, rp->rtp()->fec_recovered); +} + +TEST(populator, non_empty_duration_recovered) { + packet::Queue queue; + audio::PcmDecoder decoder(PcmFmt, SampleSpec); + Populator populator(queue, decoder, SampleSpec, true); + + const packet::stream_timestamp_t duration = 100; + + core::Slice buffer = buffer_factory.new_buffer(); + CHECK(buffer); + packet::PacketPtr wp = new_packet(duration); + queue.write(wp); + + packet::PacketPtr rp; + LONGS_EQUAL(0, populator.read(rp)); + CHECK(rp); + CHECK(wp == rp); + LONGS_EQUAL(duration, rp->rtp()->duration); + CHECK_EQUAL(true, rp->rtp()->fec_recovered); } } // namespace rtp