diff --git a/src/internal_modules/roc_core/mov_stats.h b/src/internal_modules/roc_core/mov_stats.h new file mode 100644 index 0000000000..3ae1874e55 --- /dev/null +++ b/src/internal_modules/roc_core/mov_stats.h @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2019 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +//! @file roc_core/mov_stats.h +//! @brief Profiler. + +#ifndef ROC_TOOLKIT_MOV_STATS_H +#define ROC_TOOLKIT_MOV_STATS_H + +#include "roc_core/array.h" +#include "roc_core/iarena.h" +#include "roc_core/panic.h" + +namespace roc { +namespace core { + +//! Rolling window moving average and variance. +//! +//! Efficiently implements moving average and variance based on approach +//! described in https://www.dsprelated.com/showthread/comp.dsp/97276-1.php +//! +//! @tparam T defines a sample type. +template +class MovStats { +public: + //! Initialize. + MovStats(IArena& arena, const size_t win_len) + : buffer_(arena) + , buffer2_(arena) + , win_len_(win_len) + , buffer_i_(0) + , movsum_(T(0)) + , movsum2_(T(0)) + , mov_var_(T(0)) + , full_(false) + { + if(!buffer_.resize(win_len)){ + roc_panic("MovStats: can't allocate storage for the ring buffer"); + } + if(!buffer2_.resize(win_len)){ + roc_panic("MovStats: can't allocate storage for the ring buffer"); + } + memset(buffer_.data(), 0, sizeof(T)*buffer_.size()); + memset(buffer2_.data(), 0, sizeof(T)*buffer2_.size()); + } + + //! Shift rolling window by one sample x. + void add(const T& x) + { + const T x2 = x*x; + const T x_old = buffer_[buffer_i_]; + buffer_[buffer_i_] = x; + const T x2_old = buffer2_[buffer_i_]; + buffer2_[buffer_i_] = x2; + + movsum_ += x - x_old; + movsum2_ += x2 - x2_old; + + buffer_i_++; + if (buffer_i_ == win_len_) { + buffer_i_ = 0; + full_ = true; + } + } + + //! Get moving average value. + T mov_avg() const + { + const T n = full_ ? T(win_len_) : T(buffer_i_ + 1); + return movsum_ / n; + } + + //! Get variance. + T mov_var() const + { + const T n = full_ ? T(win_len_) : T(buffer_i_+1); + if (n == 1) { + return (T)sqrt(movsum2_ - movsum_ * movsum_); + } else { + return (T)sqrt((n*movsum2_ - movsum_ * movsum_) / (n * n)); + } + } + + //! Extend rolling window length. + //! @remarks + //! Potentially could cause a gap in the estimated values as + //! decreases effective window size by dropping samples to the right from + //! the cursor in the ring buffers: + //! buffer_i_ win_len_ old win_len_ new + //! ↓ ↓ ↓ + //! [■■■■■■■■■■□□□□□□□□□□□□□□□□□□□□□____________________] + //! ↑ ↑ ↑ + //! Dropped samples. + void extend_win(const size_t new_win) + { + if (new_win <= win_len_) { + roc_panic("MovStats: the window length can only grow"); + } + if (!buffer_.resize(new_win)) { + roc_panic("MovStats: can not increase storage"); + } + + movsum_ = 0; + movsum2_ = 0; + for (size_t i = 0; i < buffer_i_; i++) { + movsum_ += buffer_[i]; + movsum2_ += buffer2_[i]; + } + full_ = false; + } + +private: + Array buffer_; + Array buffer2_; + const size_t win_len_; + size_t buffer_i_; + T movsum_; + T movsum2_; + T mov_var_; + bool full_; +}; + +} // namespace core +} // namespace roc + +#endif // ROC_TOOLKIT_MOV_STATS_H diff --git a/src/internal_modules/roc_packet/rtp.cpp b/src/internal_modules/roc_packet/rtp.cpp index 3b1c835c2e..ddf3113353 100644 --- a/src/internal_modules/roc_packet/rtp.cpp +++ b/src/internal_modules/roc_packet/rtp.cpp @@ -18,7 +18,8 @@ RTP::RTP() , duration(0) , capture_timestamp(0) , marker(false) - , payload_type(0) { + , payload_type(0) + , fec_recovered(false) { } int RTP::compare(const RTP& other) const { diff --git a/src/internal_modules/roc_packet/rtp.h b/src/internal_modules/roc_packet/rtp.h index 00401d2ed9..321b94267a 100644 --- a/src/internal_modules/roc_packet/rtp.h +++ b/src/internal_modules/roc_packet/rtp.h @@ -75,6 +75,10 @@ struct RTP { //! Packet payload type ("pt"). unsigned int payload_type; + //! Internal flag, valid for receiver side only. + //! Signals if this packet was recovered by FEC. + bool fec_recovered; + //! Packet header. core::Slice header; diff --git a/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h b/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h index 58291cad5e..776c508a58 100644 --- a/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h +++ b/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h @@ -31,6 +31,12 @@ struct UDP { //! Sender request state. uv_udp_send_t request; + + //! When the packet was put into jitter-buffer queue. + //! It points to a moment when the packet was transferred to a sink-thread, that + //! "consumes" this packet. The reason to have it separate is that this allows + //! us to account additional jitter introduced by thread-switch time. + core::nanoseconds_t enqueue_ts; }; } // namespace packet diff --git a/src/internal_modules/roc_pipeline/receiver_session.cpp b/src/internal_modules/roc_pipeline/receiver_session.cpp index 73de147945..e0e3bf112f 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session.cpp @@ -65,7 +65,7 @@ ReceiverSession::ReceiverSession( preader = validator_.get(); populator_.reset(new (populator_) rtp::Populator(*preader, *payload_decoder_, - encoding->sample_spec)); + encoding->sample_spec, false)); if (!populator_) { return; } @@ -115,7 +115,7 @@ ReceiverSession::ReceiverSession( preader = fec_validator_.get(); fec_populator_.reset(new (fec_populator_) rtp::Populator( - *preader, *payload_decoder_, encoding->sample_spec)); + *preader, *payload_decoder_, encoding->sample_spec, true)); if (!fec_populator_) { return; } @@ -221,6 +221,7 @@ status::StatusCode ReceiverSession::route(const packet::PacketPtr& packet) { roc_panic_if(!is_valid()); packet::UDP* udp = packet->udp(); + packet->udp()->enqueue_ts = core::timestamp(core::ClockUnix); if (!udp) { // TODO(gh-183): return StatusNoRoute return status::StatusUnknown; diff --git a/src/internal_modules/roc_rtp/populator.cpp b/src/internal_modules/roc_rtp/populator.cpp index f574a55082..50c1a85918 100644 --- a/src/internal_modules/roc_rtp/populator.cpp +++ b/src/internal_modules/roc_rtp/populator.cpp @@ -14,10 +14,12 @@ namespace rtp { Populator::Populator(packet::IReader& reader, audio::IFrameDecoder& decoder, - const audio::SampleSpec& sample_spec) + const audio::SampleSpec& sample_spec, + const bool set_recovered) : reader_(reader) , decoder_(decoder) - , sample_spec_(sample_spec) { + , sample_spec_(sample_spec) + , set_recovered_(set_recovered) { } status::StatusCode Populator::read(packet::PacketPtr& packet) { @@ -36,6 +38,8 @@ status::StatusCode Populator::read(packet::PacketPtr& packet) { packet->rtp()->payload.data(), packet->rtp()->payload.size()); } + packet->rtp()->fec_recovered = set_recovered_; + return status::StatusOK; } diff --git a/src/internal_modules/roc_rtp/populator.h b/src/internal_modules/roc_rtp/populator.h index 40dd3572ab..ab00a216f9 100644 --- a/src/internal_modules/roc_rtp/populator.h +++ b/src/internal_modules/roc_rtp/populator.h @@ -26,7 +26,8 @@ class Populator : public packet::IReader, public core::NonCopyable<> { //! Initialize. Populator(packet::IReader& reader, audio::IFrameDecoder& decoder, - const audio::SampleSpec& sample_spec); + const audio::SampleSpec& sample_spec, + const bool set_recovered); //! Read next packet. virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr&); @@ -35,6 +36,7 @@ class Populator : public packet::IReader, public core::NonCopyable<> { packet::IReader& reader_; audio::IFrameDecoder& decoder_; const audio::SampleSpec sample_spec_; + const bool set_recovered_; }; } // namespace rtp diff --git a/src/internal_modules/roc_rtp/stream_stats_monitor.cpp b/src/internal_modules/roc_rtp/stream_stats_monitor.cpp new file mode 100644 index 0000000000..0ca531d9ec --- /dev/null +++ b/src/internal_modules/roc_rtp/stream_stats_monitor.cpp @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "stream_stats_monitor.h" +#include "roc_status/status_code.h" + +namespace roc { +namespace rtp { + +StreamStatsMonitor::StreamStatsMonitor(packet::IReader& reader, + core::IArena& arena, + const audio::SampleSpec& sample_spec, + const StreamStatsConfig &config) + : reader_(reader) + , arena_(arena) + , config_(config) + , sample_spec_(sample_spec) +{} + +status::StatusCode StreamStatsMonitor::read(packet::PacketPtr& packet) +{ + status::StatusCode result = reader_.read(packet); + if (result == status::StatusOK){ + + } + + return result; +} + +} // namespace packet +} // namespace roc \ No newline at end of file diff --git a/src/internal_modules/roc_rtp/stream_stats_monitor.h b/src/internal_modules/roc_rtp/stream_stats_monitor.h new file mode 100644 index 0000000000..bc72f34d30 --- /dev/null +++ b/src/internal_modules/roc_rtp/stream_stats_monitor.h @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +//! @file roc_rtp/stream_stats_monitor.h +//! @brief Calculates basic network stream statistics. + + +#ifndef ROC_PACKET_STREAMSTATSMONITOR_H_ +#define ROC_PACKET_STREAMSTATSMONITOR_H_ + +#include "roc_core/noncopyable.h" +#include "roc_packet/ireader.h" +#include "roc_core/time.h" +#include "roc_audio/sample_spec.h" +#include "roc_status/status_code.h" + +namespace roc { +namespace rtp { + +struct StreamStatsConfig { + core::nanoseconds_t window_duration; + core::nanoseconds_t window_overlap; +}; + +struct StreamStats { + core::nanoseconds_t windowed_max_jitter; + + size_t windowed_npackets; + size_t windowed_lost_packets; + size_t windowed_recovered_packets; + + float packet_loss_rate; +}; + +class StreamStatsMonitor : public packet::IReader, public core::NonCopyable<> { +public: + StreamStatsMonitor(packet::IReader& reader, core::IArena& arena, + const audio::SampleSpec& sample_spec, + const StreamStatsConfig &config); + + virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr& packet); +private: + packet::IReader& reader_; + core::IArena& arena_; + const StreamStatsConfig config_; + const audio::SampleSpec sample_spec_; + +}; + +} // namespace packet +} // namespace roc + +#endif // ROC_PACKET_STREAMSTATSMONITOR_H_ diff --git a/src/tests/roc_core/test_mov_stats.cpp b/src/tests/roc_core/test_mov_stats.cpp new file mode 100644 index 0000000000..9fbab23836 --- /dev/null +++ b/src/tests/roc_core/test_mov_stats.cpp @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2015 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include + +#include "roc_core/array.h" +#include "roc_core/heap_arena.h" + +namespace roc { +namespace core { + +namespace { + +enum { NumObjects = 10, EmbeddedCap = 5 }; + +struct Object { + static long n_objects; + + size_t value; + + Object(size_t v = 0) + : value(v) { + n_objects++; + } + + Object(const Object& other) + : value(other.value) { + n_objects++; + } + + ~Object() { + n_objects--; + } +}; + +long Object::n_objects = 0; + +} // namespace + +TEST_GROUP(array) { + HeapArena arena; +}; + +TEST(array, empty) { + Array array(arena); + + LONGS_EQUAL(0, array.capacity()); + LONGS_EQUAL(0, array.size()); + LONGS_EQUAL(0, Object::n_objects); +} + +TEST(array, grow) { + Array array(arena); + + CHECK(array.grow(3)); + + LONGS_EQUAL(3, array.capacity()); + LONGS_EQUAL(0, array.size()); + LONGS_EQUAL(0, Object::n_objects); + + CHECK(array.grow(1)); + + LONGS_EQUAL(3, array.capacity()); + LONGS_EQUAL(0, array.size()); + LONGS_EQUAL(0, Object::n_objects); +} + +TEST(array, grow_exp) { + Array array(arena); + + CHECK(array.grow_exp(3)); + + LONGS_EQUAL(4, array.capacity()); + LONGS_EQUAL(0, array.size()); + LONGS_EQUAL(0, Object::n_objects); + + CHECK(array.grow_exp(1)); + + LONGS_EQUAL(4, array.capacity()); + LONGS_EQUAL(0, array.size()); + LONGS_EQUAL(0, Object::n_objects); + + CHECK(array.grow_exp(4)); + + LONGS_EQUAL(4, array.capacity()); + LONGS_EQUAL(0, array.size()); + LONGS_EQUAL(0, Object::n_objects); + + CHECK(array.grow_exp(5)); + + LONGS_EQUAL(8, array.capacity()); + LONGS_EQUAL(0, array.size()); + LONGS_EQUAL(0, Object::n_objects); +} + +TEST(array, resize) { + Array array(arena); + + CHECK(array.resize(3)); + + LONGS_EQUAL(3, array.capacity()); + LONGS_EQUAL(3, array.size()); + LONGS_EQUAL(3, Object::n_objects); + + CHECK(array.resize(1)); + + LONGS_EQUAL(3, array.capacity()); + LONGS_EQUAL(1, array.size()); + LONGS_EQUAL(1, Object::n_objects); +} + +TEST(array, push_back) { + Array array(arena); + + CHECK(array.grow(NumObjects)); + + for (size_t n = 0; n < NumObjects; n++) { + CHECK(array.push_back(Object(n))); + + LONGS_EQUAL(NumObjects, array.capacity()); + LONGS_EQUAL(n + 1, array.size()); + LONGS_EQUAL(n + 1, Object::n_objects); + } + + for (size_t n = 0; n < NumObjects; n++) { + LONGS_EQUAL(n, array[n].value); + } +} + +TEST(array, data) { + Array array(arena); + + CHECK(array.data() == NULL); + + CHECK(array.resize(NumObjects)); + + CHECK(array.data() != NULL); + + for (size_t n = 0; n < NumObjects; n++) { + POINTERS_EQUAL(&array[n], array.data() + n); + } +} + +TEST(array, embedding) { + Array array(arena); + + CHECK(array.resize(EmbeddedCap)); + + LONGS_EQUAL(0, arena.num_allocations()); + + // data is inside of the array + CHECK((char*)array.data() >= (char*)&array + && (char*)(array.data() + EmbeddedCap) <= (char*)&array + sizeof(array)); + + CHECK(array.resize(NumObjects)); + + LONGS_EQUAL(1, arena.num_allocations()); + + // data is outside of the array + CHECK((char*)(array.data() + EmbeddedCap) < (char*)&array + || (char*)array.data() > (char*)&array + sizeof(array)); +} + +TEST(array, constructor_destructor) { + LONGS_EQUAL(0, arena.num_allocations()); + + { + Array array(arena); + + CHECK(array.grow(3)); + + CHECK(array.push_back(Object(1))); + CHECK(array.push_back(Object(2))); + CHECK(array.push_back(Object(3))); + + LONGS_EQUAL(0, arena.num_allocations()); + LONGS_EQUAL(3, Object::n_objects); + + CHECK(array.grow(7)); + + LONGS_EQUAL(1, arena.num_allocations()); + LONGS_EQUAL(3, Object::n_objects); + + CHECK(array.push_back(Object(4))); + CHECK(array.push_back(Object(5))); + + LONGS_EQUAL(1, arena.num_allocations()); + LONGS_EQUAL(5, Object::n_objects); + } + + LONGS_EQUAL(0, arena.num_allocations()); + LONGS_EQUAL(0, Object::n_objects); +} + +} // namespace core +} // namespace roc diff --git a/src/tests/roc_rtp/test_populator.cpp b/src/tests/roc_rtp/test_populator.cpp index abeaf71bdd..72f79f036b 100644 --- a/src/tests/roc_rtp/test_populator.cpp +++ b/src/tests/roc_rtp/test_populator.cpp @@ -64,7 +64,7 @@ TEST(populator, failed_to_read_packet) { for (unsigned n = 0; n < ROC_ARRAY_SIZE(codes); ++n) { test::StatusReader reader(codes[n]); audio::PcmDecoder decoder(PcmFmt, SampleSpec); - Populator populator(reader, decoder, SampleSpec); + Populator populator(reader, decoder, SampleSpec, false); packet::PacketPtr pp; LONGS_EQUAL(codes[n], populator.read(pp)); @@ -75,7 +75,7 @@ TEST(populator, failed_to_read_packet) { TEST(populator, empty_duration) { packet::Queue queue; audio::PcmDecoder decoder(PcmFmt, SampleSpec); - Populator populator(queue, decoder, SampleSpec); + Populator populator(queue, decoder, SampleSpec, false); const packet::stream_timestamp_t packet_duration = 0; const packet::stream_timestamp_t expected_duration = 32; @@ -89,12 +89,13 @@ TEST(populator, empty_duration) { CHECK(wp == rp); LONGS_EQUAL(expected_duration, rp->rtp()->duration); + CHECK_EQUAL(false, rp->rtp()->fec_recovered); } TEST(populator, non_empty_duration) { packet::Queue queue; audio::PcmDecoder decoder(PcmFmt, SampleSpec); - Populator populator(queue, decoder, SampleSpec); + Populator populator(queue, decoder, SampleSpec, false); const packet::stream_timestamp_t duration = 100; @@ -108,6 +109,27 @@ TEST(populator, non_empty_duration) { CHECK(rp); CHECK(wp == rp); LONGS_EQUAL(duration, rp->rtp()->duration); + CHECK_EQUAL(false, rp->rtp()->fec_recovered); +} + +TEST(populator, non_empty_duration_recovered) { + packet::Queue queue; + audio::PcmDecoder decoder(PcmFmt, SampleSpec); + Populator populator(queue, decoder, SampleSpec, true); + + const packet::stream_timestamp_t duration = 100; + + core::Slice buffer = buffer_factory.new_buffer(); + CHECK(buffer); + packet::PacketPtr wp = new_packet(duration); + queue.write(wp); + + packet::PacketPtr rp; + LONGS_EQUAL(0, populator.read(rp)); + CHECK(rp); + CHECK(wp == rp); + LONGS_EQUAL(duration, rp->rtp()->duration); + CHECK_EQUAL(true, rp->rtp()->fec_recovered); } } // namespace rtp