From cad2ef2de9292de66b01af8299d43b655518fab7 Mon Sep 17 00:00:00 2001 From: Mikhail Baranov Date: Sun, 28 Jan 2024 19:45:45 +0100 Subject: [PATCH] LinkMeter mov stats test --- src/internal_modules/roc_core/mov_stats.h | 161 ++++++++++++++++-- .../roc_pipeline/receiver_session.cpp | 6 +- src/internal_modules/roc_rtp/link_meter.cpp | 63 ++++++- src/internal_modules/roc_rtp/link_meter.h | 40 ++++- src/tests/roc_rtp/test_link_meter.cpp | 111 +++++++++--- 5 files changed, 336 insertions(+), 45 deletions(-) diff --git a/src/internal_modules/roc_core/mov_stats.h b/src/internal_modules/roc_core/mov_stats.h index 9731cfe2c..837ad016e 100644 --- a/src/internal_modules/roc_core/mov_stats.h +++ b/src/internal_modules/roc_core/mov_stats.h @@ -37,9 +37,13 @@ class MovStats { , movsum_(T(0)) , movsum2_(T(0)) , mov_var_(T(0)) - , full_(false) , mov_max_cntr_(0) + , full_(false) , first_(true) + , queue_max_(arena, win_len) + , curr_max_(T(0)) + , queue_min_(arena, win_len) + , curr_min_(T(0)) { if(!buffer_.resize(win_len)){ roc_panic("MovStats: can't allocate storage for the ring buffer"); @@ -63,29 +67,60 @@ class MovStats { movsum_ += x - x_old; movsum2_ += x2 - x2_old; - if (first_) { - first_ = false; - mov_max_ = x; - mov_max_cntr_++; - } else { - if (x > mov_max_) { - mov_max_ = x; - mov_max_cntr_ = 1; - } else if (x == mov_max_) { - mov_max_cntr_++; - } - - if (mov_max_ == x_old) { - mov_ - } - } - buffer_i_++; if (buffer_i_ == win_len_) { buffer_i_ = 0; full_ = true; } + slide_max(x, x_old); + slide_min(x, x_old); + } + + // Keeping a sliding max by using a sorted deque. + // The wedge is always sorted in descending order. + // The current max is always at the front of the wedge. + // https://www.geeksforgeeks.org/sliding-window-maximum-maximum-of-all-subarrays-of-size-k/ + void slide_max(const T& x, const T x_old) { + if (queue_max_.is_empty()) { + queue_max_.push_back(x); + curr_max_ = x; + } else { + if (queue_max_.front() == x_old) { + queue_max_.pop_front(); + curr_max_ = queue_max_.front(); + } + while (!queue_max_.is_empty() && queue_max_.back() < x) { + queue_max_.pop_back(); + } + if (queue_max_.is_empty()) { + curr_max_ = x; + } + queue_max_.push_back(x); + } + } + + // Keeping a sliding min by using a sorted deque. + // The wedge is always sorted in ascending order. + // The current min is always at the front of the wedge. + // https://www.geeksforgeeks.org/sliding-window-maximum-maximum-of-all-subarrays-of-size-k/ + void slide_min(const T& x, const T x_old) { + if (queue_min_.is_empty()) { + queue_min_.push_back(x); + curr_min_ = x; + } else { + if (queue_min_.front() == x_old) { + queue_min_.pop_front(); + curr_min_ = queue_min_.front(); + } + while (!queue_min_.is_empty() && queue_min_.back() > x) { + queue_min_.pop_back(); + } + if (queue_min_.is_empty()) { + curr_min_ = x; + } + queue_min_.push_back(x); + } } //! Get moving average value. @@ -106,6 +141,16 @@ class MovStats { } } + T mov_max() const + { + return curr_max_; + } + + T mov_min() const + { + return curr_min_; + } + //! Extend rolling window length. //! @remarks //! Potentially could cause a gap in the estimated values as @@ -137,6 +182,7 @@ class MovStats { private: Array buffer_; Array buffer2_; + const size_t win_len_; size_t buffer_i_; T movsum_; @@ -147,6 +193,85 @@ class MovStats { bool full_; bool first_; + + class Queue { + public: + Queue(core::IArena& arena, size_t len) + : buff_(arena) + , buff_len_(len) + , begin_(0) + , end_(0) + { + if (!buff_.resize(len)) { + roc_panic("Queue: can't allocate storage for the buffer"); + } + } + + T& front() + { + if (is_empty()) { + roc_panic("Queue: front() called on empty buffer"); + } + return buff_[begin_]; + } + + T& back() + { + if (is_empty()) { + roc_panic("Queue: back() called on empty buffer"); + } + return buff_[(end_ - 1 + buff_len_) % buff_len_]; + } + + size_t len() const + { + return (end_ - begin_ + buff_len_) % buff_len_; + } + + void push_front(const T& x) + { + begin_ = (begin_ - 1 + buff_len_) % buff_len_; + buff_[begin_] = x; + } + + void pop_front() + { + if (is_empty()) { + roc_panic("Queue: pop_front() called on empty buffer"); + } + begin_ = (begin_ + 1) % buff_len_; + } + + void push_back(const T& x) + { + buff_[end_] = x; + end_ = (end_ + 1) % buff_len_; + } + + void pop_back() + { + if (is_empty()) { + roc_panic("Queue: pop_back() called on empty buffer"); + } + end_ = (end_ - 1 + buff_len_) % buff_len_; + } + + bool is_empty() + { + return begin_ == end_; + } + + private: + Array buff_; + size_t buff_len_; + size_t begin_; + size_t end_; + }; + + Queue queue_max_; + T curr_max_; + Queue queue_min_; + T curr_min_; }; } // namespace core diff --git a/src/internal_modules/roc_pipeline/receiver_session.cpp b/src/internal_modules/roc_pipeline/receiver_session.cpp index 8b27c304f..9c1794bb7 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session.cpp @@ -43,7 +43,8 @@ ReceiverSession::ReceiverSession( packet::IWriter* pwriter = source_queue_.get(); - source_meter_.reset(new (source_meter_) rtp::LinkMeter()); + source_meter_.reset(new (source_meter_) + rtp::LinkMeter(arena, encoding->sample_spec, 100)); if (!source_meter_) { return; } @@ -85,7 +86,8 @@ ReceiverSession::ReceiverSession( return; } - repair_meter_.reset(new (repair_meter_) rtp::LinkMeter()); + repair_meter_.reset(new (repair_meter_) + rtp::LinkMeter(arena, encoding->sample_spec, 100)); if (!repair_meter_) { return; } diff --git a/src/internal_modules/roc_rtp/link_meter.cpp b/src/internal_modules/roc_rtp/link_meter.cpp index 404d2fb38..42e7a55ea 100644 --- a/src/internal_modules/roc_rtp/link_meter.cpp +++ b/src/internal_modules/roc_rtp/link_meter.cpp @@ -6,19 +6,28 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -#include "roc_rtp/link_meter.h" #include "roc_core/panic.h" +#include "link_meter.h" namespace roc { namespace rtp { -LinkMeter::LinkMeter() +LinkMeter::LinkMeter(core::IArena& arena, + const audio::SampleSpec& sample_spec, + size_t run_win_len) : writer_(NULL) , reader_(NULL) + , sample_spec_(sample_spec) , first_packet_(true) , has_metrics_(false) , seqnum_hi_(0) - , seqnum_lo_(0) { + , seqnum_lo_(0) + , lost_(0) + , fract_lost_counter_(0) + , jitter_processed_(0) + , period_n_packets_(0) + , prev_packet_enq_ts_(-1) + , packet_jitter_stats_(arena, run_win_len) { } status::StatusCode LinkMeter::write(const packet::PacketPtr& packet) { @@ -76,13 +85,61 @@ void LinkMeter::update_metrics_(const packet::Packet& packet) { // Detect wrap. seqnum_hi_ += (uint16_t)-1; } + + if (!first_packet_) { + const size_t gap = gap_(packet); + // Compute jitter only on consequential packets. + if (gap == 0 && prev_pack_duration_ > 0){ + const core::nanoseconds_t d_enq_ts = packet.udp()->enqueue_ts - + prev_packet_enq_ts_; + const core::nanoseconds_t d_capt_ts = sample_spec_.samples_per_chan_2_ns(prev_pack_duration_); + packet_jitter_stats_.add(std::abs(d_enq_ts - d_capt_ts)); + metrics_.max_jitter = packet_jitter_stats_.mov_max(); + metrics_.min_jitter = packet_jitter_stats_.mov_min(); + jitter_processed_++; + metrics_.jitter = sample_spec_.ns_2_samples_per_chan(mean_jitter()); + } else { + lost_ += gap; + fract_lost_counter_ += gap; + metrics_.num_packets_covered += gap; + } + } + + prev_packet_enq_ts_ = packet.udp()->enqueue_ts; + prev_pack_duration_ = packet.rtp()->duration; seqnum_lo_ = packet.rtp()->seqnum; metrics_.ext_last_seqnum = seqnum_hi_ + seqnum_lo_; + prev_stream_timestamp = packet.rtp()->stream_timestamp; + period_n_packets_++; + metrics_.num_packets_covered++; + metrics_.cum_loss = lost_; + metrics_.fract_loss = (float)fract_lost_counter_ / (float)(fract_lost_counter_ + period_n_packets_); } first_packet_ = false; has_metrics_ = true; } +size_t rtp::LinkMeter::gap_(const packet::Packet& packet) const { + if (first_packet_) { + roc_panic("RTPStats: attempt to detect gap on the first received packet"); + } + + return (size_t)abs(packet::seqnum_diff( packet.rtp()->seqnum, seqnum_lo_ + 1)); +} + +core::nanoseconds_t rtp::LinkMeter::mean_jitter() const { + return packet_jitter_stats_.mov_avg(); +} + +core::nanoseconds_t rtp::LinkMeter::var_jitter() const { + return packet_jitter_stats_.mov_var(); +} + +void LinkMeter::reset_metrics() { + period_n_packets_ = 0; + fract_lost_counter_ = 0; + metrics_.num_packets_covered = 0; +} } // namespace rtp } // namespace roc diff --git a/src/internal_modules/roc_rtp/link_meter.h b/src/internal_modules/roc_rtp/link_meter.h index df74ddfe9..c690d2abe 100644 --- a/src/internal_modules/roc_rtp/link_meter.h +++ b/src/internal_modules/roc_rtp/link_meter.h @@ -12,6 +12,9 @@ #ifndef ROC_RTP_LINK_METER_H_ #define ROC_RTP_LINK_METER_H_ +#include "roc_audio/sample_spec.h" +#include "roc_core/iarena.h" +#include "roc_core/mov_stats.h" #include "roc_core/noncopyable.h" #include "roc_packet/ireader.h" #include "roc_packet/iwriter.h" @@ -47,11 +50,24 @@ struct LinkMetrics { //! time, measured in timestamp units. packet::stream_timestamp_t jitter; + + //! Number of packets covered by this report. + //! This field start with 0 after each call LinkMeter::reset_metrics() and it contains + //! received and lost packets. + size_t num_packets_covered; + + //! Running max of Jitter. + core::nanoseconds_t max_jitter; + + //! Running min of Jitter. + core::nanoseconds_t min_jitter; + LinkMetrics() : ext_last_seqnum(0) , fract_loss(0) , cum_loss(0) - , jitter(0) { + , jitter(0) + , num_packets_covered(0) { } }; @@ -65,7 +81,7 @@ struct LinkMetrics { //! that should be updated as early as possible. //! //! - As a reader, right before decoding packet. Here LinkMeter -//! computes metrics that can be updated only when packets +//! 1 computes metrics that can be updated only when packets //! are going to be played. //! //! In both cases, LinkMeter passes through packets to/from nested @@ -75,7 +91,9 @@ class LinkMeter : public packet::IWriter, public core::NonCopyable<> { public: //! Initialize. - LinkMeter(); + LinkMeter(core::IArena& arena, + const audio::SampleSpec& sample_spec, + size_t run_win_len); //! Write packet and update metrics. //! @remarks @@ -100,15 +118,22 @@ class LinkMeter : public packet::IWriter, //! Check if metrics are already gathered and can be reported. bool has_metrics() const; + void reset_metrics(); + //! Get metrics. LinkMetrics metrics() const; + core::nanoseconds_t mean_jitter() const; + core::nanoseconds_t var_jitter() const; private: void update_metrics_(const packet::Packet& packet); + size_t gap_(const packet::Packet& packet) const; packet::IWriter* writer_; packet::IReader* reader_; + const audio::SampleSpec sample_spec_; + bool first_packet_; bool has_metrics_; @@ -116,6 +141,15 @@ class LinkMeter : public packet::IWriter, uint32_t seqnum_hi_; uint16_t seqnum_lo_; + + size_t lost_; + size_t fract_lost_counter_; + size_t jitter_processed_; + size_t period_n_packets_; + core::nanoseconds_t prev_packet_enq_ts_; + packet::stream_timestamp_t prev_stream_timestamp; + packet::stream_timestamp_t prev_pack_duration_; + core::MovStats packet_jitter_stats_; }; } // namespace rtp diff --git a/src/tests/roc_rtp/test_link_meter.cpp b/src/tests/roc_rtp/test_link_meter.cpp index 14a66d60c..e75f10dd5 100644 --- a/src/tests/roc_rtp/test_link_meter.cpp +++ b/src/tests/roc_rtp/test_link_meter.cpp @@ -8,6 +8,8 @@ #include +#include "roc_audio/sample_spec.h" +#include "roc_core/fast_random.h" #include "roc_core/heap_arena.h" #include "roc_packet/packet_factory.h" #include "roc_packet/queue.h" @@ -21,12 +23,20 @@ namespace { core::HeapArena arena; packet::PacketFactory packet_factory(arena); -packet::PacketPtr new_packet(packet::seqnum_t sn) { +enum { ChMask = 3, PacketSz = 128, SampleRate = 10000, Duration = 100, RunningWinLen = Duration}; +audio::SampleSpec sample_spec(SampleRate, audio::Sample_RawFormat, + audio::ChanLayout_Surround, audio::ChanOrder_Smpte, ChMask); +const core::nanoseconds_t start_ts = 1691499037871419405; +const core::nanoseconds_t step_ts = Duration * core::Second / SampleRate; + +packet::PacketPtr new_packet(packet::seqnum_t sn, const core::nanoseconds_t ts) { packet::PacketPtr packet = packet_factory.new_packet(); CHECK(packet); - packet->add_flags(packet::Packet::FlagRTP); + packet->add_flags(packet::Packet::FlagRTP | packet::Packet::FlagUDP); packet->rtp()->seqnum = sn; + packet->rtp()->duration = Duration; + packet->udp()->enqueue_ts = ts; return packet; } @@ -47,16 +57,16 @@ class StatusWriter : public packet::IWriter { } // namespace -TEST_GROUP(link_meter) {}; +TEST_GROUP(link_meter) { }; TEST(link_meter, has_metrics) { packet::Queue queue; - LinkMeter meter; + LinkMeter meter(arena, sample_spec, RunningWinLen); meter.set_writer(queue); CHECK(!meter.has_metrics()); - CHECK_EQUAL(status::StatusOK, meter.write(new_packet(100))); + CHECK_EQUAL(status::StatusOK, meter.write(new_packet(100, start_ts))); CHECK_EQUAL(1, queue.size()); CHECK(meter.has_metrics()); @@ -64,54 +74,68 @@ TEST(link_meter, has_metrics) { TEST(link_meter, last_seqnum) { packet::Queue queue; - LinkMeter meter; + LinkMeter meter(arena, sample_spec, RunningWinLen); meter.set_writer(queue); + core::nanoseconds_t ts = start_ts; CHECK_EQUAL(0, meter.metrics().ext_last_seqnum); - CHECK_EQUAL(status::StatusOK, meter.write(new_packet(100))); + CHECK_EQUAL(status::StatusOK, meter.write(new_packet(100, ts))); CHECK_EQUAL(100, meter.metrics().ext_last_seqnum); + ts += step_ts; // seqnum increased, metric updated - CHECK_EQUAL(status::StatusOK, meter.write(new_packet(102))); + CHECK_EQUAL(status::StatusOK, meter.write(new_packet(102, ts + step_ts))); CHECK_EQUAL(102, meter.metrics().ext_last_seqnum); // seqnum decreased, ignored - CHECK_EQUAL(status::StatusOK, meter.write(new_packet(101))); + CHECK_EQUAL(status::StatusOK, meter.write(new_packet(101, ts))); CHECK_EQUAL(102, meter.metrics().ext_last_seqnum); + ts += step_ts * 2; // seqnum increased, metric updated - CHECK_EQUAL(status::StatusOK, meter.write(new_packet(103))); - CHECK_EQUAL(103, meter.metrics().ext_last_seqnum); + CHECK_EQUAL(status::StatusOK, meter.write(new_packet(103, ts))); + + CHECK_EQUAL(0, meter.mean_jitter()); + CHECK_EQUAL(0, meter.var_jitter()); + + LinkMetrics metrics = meter.metrics(); + CHECK_EQUAL(0, metrics.jitter); + DOUBLES_EQUAL(1.f/4.f, metrics.fract_loss, 0.0001); + CHECK_EQUAL(1, metrics.cum_loss); + CHECK_EQUAL(103, metrics.ext_last_seqnum); + CHECK_EQUAL(4, metrics.num_packets_covered); + CHECK_EQUAL(4, queue.size()); } TEST(link_meter, last_seqnum_wrap) { packet::Queue queue; - LinkMeter meter; + LinkMeter meter(arena, sample_spec, RunningWinLen); meter.set_writer(queue); + core::nanoseconds_t ts = start_ts; CHECK_EQUAL(0, meter.metrics().ext_last_seqnum); // no overflow - CHECK_EQUAL(status::StatusOK, meter.write(new_packet(65533))); + CHECK_EQUAL(status::StatusOK, meter.write(new_packet(65533, ts))); CHECK_EQUAL(65533, meter.metrics().ext_last_seqnum); // no overflow - CHECK_EQUAL(status::StatusOK, meter.write(new_packet(65535))); + CHECK_EQUAL(status::StatusOK, meter.write(new_packet(65535, ts + step_ts * 2))); CHECK_EQUAL(65535, meter.metrics().ext_last_seqnum); // overflow - CHECK_EQUAL(status::StatusOK, meter.write(new_packet(2))); + CHECK_EQUAL(status::StatusOK, meter.write(new_packet(2, ts + step_ts * 3))); CHECK_EQUAL(65537, meter.metrics().ext_last_seqnum); // late packet, ignored - CHECK_EQUAL(status::StatusOK, meter.write(new_packet(65534))); + CHECK_EQUAL(status::StatusOK, meter.write(new_packet(65534, ts + step_ts))); CHECK_EQUAL(65537, meter.metrics().ext_last_seqnum); // new packet - CHECK_EQUAL(status::StatusOK, meter.write(new_packet(5))); + CHECK_EQUAL(status::StatusOK, meter.write(new_packet(5, ts + step_ts * 6))); CHECK_EQUAL(65540, meter.metrics().ext_last_seqnum); CHECK_EQUAL(5, queue.size()); @@ -119,10 +143,59 @@ TEST(link_meter, last_seqnum_wrap) { TEST(link_meter, forward_error) { StatusWriter writer(status::StatusNoMem); - LinkMeter meter; + LinkMeter meter(arena, sample_spec, RunningWinLen); meter.set_writer(writer); - CHECK_EQUAL(status::StatusNoMem, meter.write(new_packet(100))); + CHECK_EQUAL(status::StatusNoMem, meter.write(new_packet(100, start_ts))); +} + +TEST(link_meter, jitter_test) { + packet::Queue queue; + LinkMeter meter(arena, sample_spec, RunningWinLen); + meter.set_writer(queue); + const size_t num_packets = Duration * 100; + core::nanoseconds_t ts_store[num_packets]; + + core::nanoseconds_t ts = start_ts; + for (size_t i = 0; i < num_packets; i++) { + packet::seqnum_t seqnum = 65500 + i; + ts_store[i] = ts; + CHECK_EQUAL(status::StatusOK, meter.write(new_packet(seqnum, ts))); + ts += step_ts + (core::nanoseconds_t)(core::fast_random_gaussian() * core::Millisecond); + + if (i != 0 && i % RunningWinLen == 0) { + // Check meter metrics running max in min jitter in last Duration number + // of packets in ts_store. + core::nanoseconds_t min_jitter = core::Second; + core::nanoseconds_t max_jitter = 0; + for (size_t j = 0; j < RunningWinLen; j++) { + core::nanoseconds_t jitter = std::abs(ts_store[i - j] - ts_store[i - j - 1] - step_ts); + min_jitter = std::min(min_jitter, jitter); + max_jitter = std::max(max_jitter, jitter); + } + CHECK_EQUAL(min_jitter, meter.metrics().min_jitter); + CHECK_EQUAL(max_jitter, meter.metrics().max_jitter); + + // Reference average and variance of jitter from ts_store values. + core::nanoseconds_t sum = 0; + for (size_t j = 0; j < RunningWinLen; j++) { + sum += std::abs(ts_store[i - j] - ts_store[ i - j - 1] - step_ts); + } + const core::nanoseconds_t mean = sum / RunningWinLen; + + sum = 0; + for (size_t j = 0; j < RunningWinLen; j++) { + core::nanoseconds_t jitter = std::abs(ts_store[i - j] - ts_store[i - j - 1] - step_ts); + sum += (jitter - mean) * (jitter - mean); + } + const core::nanoseconds_t var = sum / RunningWinLen; + + // Check the jitter value + DOUBLES_EQUAL(mean, meter.mean_jitter(), core::Microsecond * 1); + DOUBLES_EQUAL(sqrt(var), meter.var_jitter(), core::Microsecond * 1); + } + } + } } // namespace rtp