Skip to content

Commit

Permalink
:WIP rtp stream calculations
Browse files Browse the repository at this point in the history
  • Loading branch information
baranovmv committed Jan 2, 2024
1 parent b98c7ab commit 09d5909
Show file tree
Hide file tree
Showing 11 changed files with 465 additions and 9 deletions.
155 changes: 155 additions & 0 deletions src/internal_modules/roc_core/mov_stats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright (c) 2019 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

//! @file roc_core/mov_stats.h
//! @brief Profiler.

#ifndef ROC_TOOLKIT_MOV_STATS_H
#define ROC_TOOLKIT_MOV_STATS_H

#include "roc_core/array.h"
#include "roc_core/iarena.h"
#include "roc_core/panic.h"

namespace roc {
namespace core {

//! Rolling window moving average and variance.
//!
//! Efficiently implements moving average and variance based on approach
//! described in https://www.dsprelated.com/showthread/comp.dsp/97276-1.php
//!
//! @tparam T defines a sample type.
template <typename T>
class MovStats {
public:
//! Initialize.
MovStats(IArena& arena, const size_t win_len)
: buffer_(arena)
, buffer2_(arena)
, win_len_(win_len)
, buffer_i_(0)
, movsum_(T(0))
, movsum2_(T(0))
, mov_var_(T(0))
, full_(false)
, mov_max_cntr_(0)
, first_(true)
{
if(!buffer_.resize(win_len)){
roc_panic("MovStats: can't allocate storage for the ring buffer");
}
if(!buffer2_.resize(win_len)){
roc_panic("MovStats: can't allocate storage for the ring buffer");
}
memset(buffer_.data(), 0, sizeof(T)*buffer_.size());
memset(buffer2_.data(), 0, sizeof(T)*buffer2_.size());
}

//! Shift rolling window by one sample x.
void add(const T& x)
{
const T x2 = x*x;
const T x_old = buffer_[buffer_i_];
buffer_[buffer_i_] = x;
const T x2_old = buffer2_[buffer_i_];
buffer2_[buffer_i_] = x2;

movsum_ += x - x_old;
movsum2_ += x2 - x2_old;

if (first_) {
first_ = false;
mov_max_ = x;
mov_max_cntr_++;
} else {
if (x > mov_max_) {
mov_max_ = x;
mov_max_cntr_ = 1;
} else if (x == mov_max_) {
mov_max_cntr_++;
}

if (mov_max_ == x_old) {
mov_
}
}

buffer_i_++;
if (buffer_i_ == win_len_) {
buffer_i_ = 0;
full_ = true;
}

}

//! Get moving average value.
T mov_avg() const
{
const T n = full_ ? T(win_len_) : T(buffer_i_ + 1);
return movsum_ / n;
}

//! Get variance.
T mov_var() const
{
const T n = full_ ? T(win_len_) : T(buffer_i_+1);
if (n == 1) {
return (T)sqrt(movsum2_ - movsum_ * movsum_);
} else {
return (T)sqrt((n*movsum2_ - movsum_ * movsum_) / (n * n));
}
}

//! Extend rolling window length.
//! @remarks
//! Potentially could cause a gap in the estimated values as
//! decreases effective window size by dropping samples to the right from
//! the cursor in the ring buffers:
//! buffer_i_ win_len_ old win_len_ new
//! ↓ ↓ ↓
//! [■■■■■■■■■■□□□□□□□□□□□□□□□□□□□□□--------------------]
//! ↑ ↑ ↑
//! Dropped samples.
void extend_win(const size_t new_win)
{
if (new_win <= win_len_) {
roc_panic("MovStats: the window length can only grow");
}
if (!buffer_.resize(new_win)) {
roc_panic("MovStats: can not increase storage");
}

movsum_ = 0;
movsum2_ = 0;
for (size_t i = 0; i < buffer_i_; i++) {
movsum_ += buffer_[i];
movsum2_ += buffer2_[i];
}
full_ = false;
}

private:
Array<T> buffer_;
Array<T> buffer2_;
const size_t win_len_;
size_t buffer_i_;
T movsum_;
T movsum2_;
T mov_var_;
T mov_max_;
size_t mov_max_cntr_;

bool full_;
bool first_;
};

} // namespace core
} // namespace roc

#endif // ROC_TOOLKIT_MOV_STATS_H
3 changes: 2 additions & 1 deletion src/internal_modules/roc_packet/rtp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ RTP::RTP()
, duration(0)
, capture_timestamp(0)
, marker(false)
, payload_type(0) {
, payload_type(0)
, fec_recovered(false) {
}

int RTP::compare(const RTP& other) const {
Expand Down
4 changes: 4 additions & 0 deletions src/internal_modules/roc_packet/rtp.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ struct RTP {
//! Packet payload type ("pt").
unsigned int payload_type;

//! Internal flag, valid for receiver side only.
//! Signals if this packet was recovered by FEC.
bool fec_recovered;

//! Packet header.
core::Slice<uint8_t> header;

Expand Down
6 changes: 6 additions & 0 deletions src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ struct UDP {

//! Sender request state.
uv_udp_send_t request;

//! When the packet was put into jitter-buffer queue.
//! It points to a moment when the packet was transferred to a sink-thread, that
//! "consumes" this packet. The reason to have it separate is that this allows
//! us to account additional jitter introduced by thread-switch time.
core::nanoseconds_t enqueue_ts;
};

} // namespace packet
Expand Down
5 changes: 3 additions & 2 deletions src/internal_modules/roc_pipeline/receiver_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ ReceiverSession::ReceiverSession(
preader = validator_.get();

populator_.reset(new (populator_) rtp::Populator(*preader, *payload_decoder_,
encoding->sample_spec));
encoding->sample_spec, false));
if (!populator_) {
return;
}
Expand Down Expand Up @@ -115,7 +115,7 @@ ReceiverSession::ReceiverSession(
preader = fec_validator_.get();

fec_populator_.reset(new (fec_populator_) rtp::Populator(
*preader, *payload_decoder_, encoding->sample_spec));
*preader, *payload_decoder_, encoding->sample_spec, true));
if (!fec_populator_) {
return;
}
Expand Down Expand Up @@ -221,6 +221,7 @@ status::StatusCode ReceiverSession::route(const packet::PacketPtr& packet) {
roc_panic_if(!is_valid());

packet::UDP* udp = packet->udp();
packet->udp()->enqueue_ts = core::timestamp(core::ClockUnix);
if (!udp) {
// TODO(gh-183): return StatusNoRoute
return status::StatusUnknown;
Expand Down
8 changes: 6 additions & 2 deletions src/internal_modules/roc_rtp/populator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ namespace rtp {

Populator::Populator(packet::IReader& reader,
audio::IFrameDecoder& decoder,
const audio::SampleSpec& sample_spec)
const audio::SampleSpec& sample_spec,
const bool set_recovered)
: reader_(reader)
, decoder_(decoder)
, sample_spec_(sample_spec) {
, sample_spec_(sample_spec)
, set_recovered_(set_recovered) {
}

status::StatusCode Populator::read(packet::PacketPtr& packet) {
Expand All @@ -36,6 +38,8 @@ status::StatusCode Populator::read(packet::PacketPtr& packet) {
packet->rtp()->payload.data(), packet->rtp()->payload.size());
}

packet->rtp()->fec_recovered = set_recovered_;

return status::StatusOK;
}

Expand Down
4 changes: 3 additions & 1 deletion src/internal_modules/roc_rtp/populator.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class Populator : public packet::IReader, public core::NonCopyable<> {
//! Initialize.
Populator(packet::IReader& reader,
audio::IFrameDecoder& decoder,
const audio::SampleSpec& sample_spec);
const audio::SampleSpec& sample_spec,
const bool set_recovered);

//! Read next packet.
virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr&);
Expand All @@ -35,6 +36,7 @@ class Populator : public packet::IReader, public core::NonCopyable<> {
packet::IReader& reader_;
audio::IFrameDecoder& decoder_;
const audio::SampleSpec sample_spec_;
const bool set_recovered_;
};

} // namespace rtp
Expand Down
91 changes: 91 additions & 0 deletions src/internal_modules/roc_rtp/stream_stats_monitor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2023 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#include "stream_stats_monitor.h"
#include "roc_status/status_code.h"

namespace roc {
namespace rtp {

StreamStatsMonitor::StreamStatsMonitor(packet::IReader& reader,
core::IArena& arena,
const audio::SampleSpec& sample_spec,
const StreamStatsConfig &config)
: reader_(reader)
, arena_(arena)
, config_(config)
, sample_spec_(sample_spec)
, rtp_(arena, config)
{}

status::StatusCode StreamStatsMonitor::read(packet::PacketPtr& packet)
{
status::StatusCode result = reader_.read(packet);
if (result == status::StatusOK){
if (packet->rtp()) {
rtp_.process(packet);
}
}

return result;
}
StreamStats StreamStatsMonitor::stats() const {
StreamStats result;
result
return StreamStats();
}

StreamStatsMonitor::RTPStats::RTPStats(core::IArena& arena, const StreamStatsConfig &config)
: lost_(0)
, jitter_processed_(0)
, prev_packet_enq_ts_(-1)
, prev_seqnum_(0)
, packet_jitter_stats_(arena, config.window_npackets)
{}

bool StreamStatsMonitor::RTPStats::process(const packet::PacketPtr& packet) {
if (prev_packet_enq_ts_ == -1) {
prev_packet_enq_ts_ = packet->udp()->enqueue_ts;

} else {
const size_t gap = gap_(packet);
// Compute jitter only on consequential packets.
if (gap == 0){
const core::nanoseconds_t d_enq_ts = packet->udp()->enqueue_ts - prev_packet_enq_ts_;
const core::nanoseconds_t d_capt_ts = packet->rtp()->capture_timestamp - prev_capt_ts;
packet_jitter_stats_.add(d_enq_ts - d_capt_ts);
jitter_processed_++;
} else {
lost_ += gap;
}
}

prev_packet_enq_ts_ = packet->udp()->enqueue_ts;
prev_seqnum_ = packet->rtp()->seqnum;
prev_stream_timestamp = packet->rtp()->stream_timestamp;
prev_capt_ts = packet->rtp()->capture_timestamp;
return false;
}

size_t StreamStatsMonitor::RTPStats::gap_(const packet::PacketPtr& packet) const {
if (prev_packet_enq_ts_ == -1) {
roc_panic("RTPStats: attempt to detect gap on the first received packet");
}

return (size_t)abs(packet::seqnum_diff( packet->rtp()->seqnum, prev_seqnum_ + 1));
}

core::nanoseconds_t StreamStatsMonitor::RTPStats::mean_jitter() const {
return packet_jitter_stats_.mov_avg();
}

core::nanoseconds_t StreamStatsMonitor::RTPStats::var_jitter() const {
return packet_jitter_stats_.mov_var();
}
} // namespace packet
} // namespace roc
Loading

0 comments on commit 09d5909

Please sign in to comment.