diff --git a/src/internal_modules/roc_packet/rtp.cpp b/src/internal_modules/roc_packet/rtp.cpp index 3b1c835c2e..ddf3113353 100644 --- a/src/internal_modules/roc_packet/rtp.cpp +++ b/src/internal_modules/roc_packet/rtp.cpp @@ -18,7 +18,8 @@ RTP::RTP() , duration(0) , capture_timestamp(0) , marker(false) - , payload_type(0) { + , payload_type(0) + , fec_recovered(false) { } int RTP::compare(const RTP& other) const { diff --git a/src/internal_modules/roc_packet/rtp.h b/src/internal_modules/roc_packet/rtp.h index 00401d2ed9..321b94267a 100644 --- a/src/internal_modules/roc_packet/rtp.h +++ b/src/internal_modules/roc_packet/rtp.h @@ -75,6 +75,10 @@ struct RTP { //! Packet payload type ("pt"). unsigned int payload_type; + //! Internal flag, valid for receiver side only. + //! Signals if this packet was recovered by FEC. + bool fec_recovered; + //! Packet header. core::Slice header; diff --git a/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h b/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h index 58291cad5e..776c508a58 100644 --- a/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h +++ b/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h @@ -31,6 +31,12 @@ struct UDP { //! Sender request state. uv_udp_send_t request; + + //! When the packet was put into jitter-buffer queue. + //! It points to a moment when the packet was transferred to a sink-thread, that + //! "consumes" this packet. The reason to have it separate is that this allows + //! us to account additional jitter introduced by thread-switch time. + core::nanoseconds_t enqueue_ts; }; } // namespace packet diff --git a/src/internal_modules/roc_pipeline/receiver_session.cpp b/src/internal_modules/roc_pipeline/receiver_session.cpp index 73de147945..e0e3bf112f 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session.cpp @@ -65,7 +65,7 @@ ReceiverSession::ReceiverSession( preader = validator_.get(); populator_.reset(new (populator_) rtp::Populator(*preader, *payload_decoder_, - encoding->sample_spec)); + encoding->sample_spec, false)); if (!populator_) { return; } @@ -115,7 +115,7 @@ ReceiverSession::ReceiverSession( preader = fec_validator_.get(); fec_populator_.reset(new (fec_populator_) rtp::Populator( - *preader, *payload_decoder_, encoding->sample_spec)); + *preader, *payload_decoder_, encoding->sample_spec, true)); if (!fec_populator_) { return; } @@ -221,6 +221,7 @@ status::StatusCode ReceiverSession::route(const packet::PacketPtr& packet) { roc_panic_if(!is_valid()); packet::UDP* udp = packet->udp(); + packet->udp()->enqueue_ts = core::timestamp(core::ClockUnix); if (!udp) { // TODO(gh-183): return StatusNoRoute return status::StatusUnknown; diff --git a/src/internal_modules/roc_rtp/populator.cpp b/src/internal_modules/roc_rtp/populator.cpp index f574a55082..50c1a85918 100644 --- a/src/internal_modules/roc_rtp/populator.cpp +++ b/src/internal_modules/roc_rtp/populator.cpp @@ -14,10 +14,12 @@ namespace rtp { Populator::Populator(packet::IReader& reader, audio::IFrameDecoder& decoder, - const audio::SampleSpec& sample_spec) + const audio::SampleSpec& sample_spec, + const bool set_recovered) : reader_(reader) , decoder_(decoder) - , sample_spec_(sample_spec) { + , sample_spec_(sample_spec) + , set_recovered_(set_recovered) { } status::StatusCode Populator::read(packet::PacketPtr& packet) { @@ -36,6 +38,8 @@ status::StatusCode Populator::read(packet::PacketPtr& packet) { packet->rtp()->payload.data(), packet->rtp()->payload.size()); } + packet->rtp()->fec_recovered = set_recovered_; + return status::StatusOK; } diff --git a/src/internal_modules/roc_rtp/populator.h b/src/internal_modules/roc_rtp/populator.h index 40dd3572ab..ab00a216f9 100644 --- a/src/internal_modules/roc_rtp/populator.h +++ b/src/internal_modules/roc_rtp/populator.h @@ -26,7 +26,8 @@ class Populator : public packet::IReader, public core::NonCopyable<> { //! Initialize. Populator(packet::IReader& reader, audio::IFrameDecoder& decoder, - const audio::SampleSpec& sample_spec); + const audio::SampleSpec& sample_spec, + const bool set_recovered); //! Read next packet. virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr&); @@ -35,6 +36,7 @@ class Populator : public packet::IReader, public core::NonCopyable<> { packet::IReader& reader_; audio::IFrameDecoder& decoder_; const audio::SampleSpec sample_spec_; + const bool set_recovered_; }; } // namespace rtp diff --git a/src/internal_modules/roc_rtp/stream_stats_monitor.cpp b/src/internal_modules/roc_rtp/stream_stats_monitor.cpp new file mode 100644 index 0000000000..a44e3d5fab --- /dev/null +++ b/src/internal_modules/roc_rtp/stream_stats_monitor.cpp @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "stream_stats_monitor.h" +#include "roc_status/status_code.h" +#include + +namespace roc { +namespace rtp { + +StreamStatsMonitor::StreamStatsMonitor(packet::IReader& reader, + core::IArena& arena, + const audio::SampleSpec& sample_spec, + core::nanoseconds_t default_packet_length, + const StreamStatsConfig &config) + : reader_(reader) + , arena_(arena) + , config_(config) + , packet_length_est_(default_packet_length) + , sample_spec_(sample_spec) + , npack_win_(ceil(config_.window_duration/default_packet_length)) + , enq_ts_buff_(arena) + , enq_ts_buff_i_(0) + , enq_ts_accum_(0) + , enq_ts_buff_ready_(false) +{ + resize_buffers_(); +} + +status::StatusCode StreamStatsMonitor::read(packet::PacketPtr& packet) { + status::StatusCode result = reader_.read(packet); + if (result == status::StatusOK) { + enq_ts_accum_ -= enq_ts_buff_[enq_ts_buff_i_]; + enq_ts_buff_[enq_ts_buff_i_] = packet->udp()->enqueue_ts; + enq_ts_accum_ += enq_ts_buff_[enq_ts_buff_i_]; + + if (++enq_ts_buff_i_ >= npack_win_) { + enq_ts_buff_ready_ = true; + enq_ts_buff_i_ = 0; + } + } + + return result; +} + +void StreamStatsMonitor::resize_buffers_() { + npack_win_ = ceil(config_.window_duration/packet_length_est_); + enq_ts_buff_.resize(npack_win_); +} + +core::nanoseconds_t StreamStatsMonitor::enq_ts_variance_(core::nanoseconds_t mean) const { + core::nanoseconds_t var = 0; + for (size_t i = 0; i < npack_win_; ++i) { + var += enq_ts_buff_[i]-mean; + } + return 0; +} + +} // namespace packet +} // namespace roc \ No newline at end of file diff --git a/src/internal_modules/roc_rtp/stream_stats_monitor.h b/src/internal_modules/roc_rtp/stream_stats_monitor.h new file mode 100644 index 0000000000..e104c6790e --- /dev/null +++ b/src/internal_modules/roc_rtp/stream_stats_monitor.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +//! @file roc_rtp/stream_stats_monitor.h +//! @brief Calculates basic network stream statistics. + + +#ifndef ROC_PACKET_STREAMSTATSMONITOR_H_ +#define ROC_PACKET_STREAMSTATSMONITOR_H_ + +#include "roc_core/noncopyable.h" +#include "roc_packet/ireader.h" +#include "roc_core/time.h" +#include "roc_audio/sample_spec.h" +#include "roc_status/status_code.h" +#include "roc_packet/ireader.h" +#include "roc_core/array.h" + +namespace roc { +namespace rtp { + +struct StreamStatsConfig { + core::nanoseconds_t window_duration; + core::nanoseconds_t window_overlap; +}; + +struct StreamStats { + core::nanoseconds_t windowed_max_jitter; + + size_t windowed_npackets; + size_t windowed_lost_packets; + size_t windowed_recovered_packets; + + float packet_loss_rate; +}; + +class StreamStatsMonitor : public packet::IReader, public core::NonCopyable<> { +public: + StreamStatsMonitor(packet::IReader& reader, core::IArena& arena, + const audio::SampleSpec& sample_spec, + core::nanoseconds_t default_packet_length, + const StreamStatsConfig &config); + + virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr& packet); + +private: + void resize_buffers_(); + core::nanoseconds_t enq_ts_variance_(core::nanoseconds_t mean) const; + + packet::IReader& reader_; + core::IArena& arena_; + const StreamStatsConfig config_; + const audio::SampleSpec sample_spec_; + + core::nanoseconds_t packet_length_est_; + size_t npack_win_; + + core::Array enq_ts_buff_; + size_t enq_ts_buff_i_; + core::nanoseconds_t enq_ts_accum_; + bool enq_ts_buff_ready_; +}; + +} // namespace packet +} // namespace roc + +#endif // ROC_PACKET_STREAMSTATSMONITOR_H_ diff --git a/src/tests/roc_rtp/test_populator.cpp b/src/tests/roc_rtp/test_populator.cpp index 8ebfde60ec..4ec693e8c0 100644 --- a/src/tests/roc_rtp/test_populator.cpp +++ b/src/tests/roc_rtp/test_populator.cpp @@ -64,7 +64,7 @@ TEST(populator, failed_to_read_packet) { for (unsigned n = 0; n < ROC_ARRAY_SIZE(codes); ++n) { test::StatusReader reader(codes[n]); audio::PcmDecoder decoder(PcmFmt, SampleSpec); - Populator populator(reader, decoder, SampleSpec); + Populator populator(reader, decoder, SampleSpec, false); packet::PacketPtr pp; LONGS_EQUAL(codes[n], populator.read(pp)); @@ -75,7 +75,7 @@ TEST(populator, failed_to_read_packet) { TEST(populator, empty_duration) { packet::Queue queue; audio::PcmDecoder decoder(PcmFmt, SampleSpec); - Populator populator(queue, decoder, SampleSpec); + Populator populator(queue, decoder, SampleSpec, false); const packet::stream_timestamp_t packet_duration = 0; const packet::stream_timestamp_t expected_duration = 32; @@ -89,12 +89,13 @@ TEST(populator, empty_duration) { CHECK(wp == rp); LONGS_EQUAL(expected_duration, rp->rtp()->duration); + CHECK_EQUAL(false, rp->rtp()->fec_recovered); } TEST(populator, non_empty_duration) { packet::Queue queue; audio::PcmDecoder decoder(PcmFmt, SampleSpec); - Populator populator(queue, decoder, SampleSpec); + Populator populator(queue, decoder, SampleSpec, false); const packet::stream_timestamp_t duration = 100; @@ -108,6 +109,27 @@ TEST(populator, non_empty_duration) { CHECK(rp); CHECK(wp == rp); LONGS_EQUAL(duration, rp->rtp()->duration); + CHECK_EQUAL(false, rp->rtp()->fec_recovered); +} + +TEST(populator, non_empty_duration_recovered) { + packet::Queue queue; + audio::PcmDecoder decoder(PcmFmt, SampleSpec); + Populator populator(queue, decoder, SampleSpec, true); + + const packet::stream_timestamp_t duration = 100; + + core::Slice buffer = buffer_factory.new_buffer(); + CHECK(buffer); + packet::PacketPtr wp = new_packet(duration); + queue.write(wp); + + packet::PacketPtr rp; + LONGS_EQUAL(0, populator.read(rp)); + CHECK(rp); + CHECK(wp == rp); + LONGS_EQUAL(duration, rp->rtp()->duration); + CHECK_EQUAL(true, rp->rtp()->fec_recovered); } } // namespace rtp