Skip to content

Commit

Permalink
:WIP rtp stream calculations
Browse files Browse the repository at this point in the history
  • Loading branch information
baranovmv committed Dec 29, 2023
1 parent b98c7ab commit a984021
Show file tree
Hide file tree
Showing 11 changed files with 475 additions and 9 deletions.
131 changes: 131 additions & 0 deletions src/internal_modules/roc_core/mov_stats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (c) 2019 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

//! @file roc_core/mov_stats.h
//! @brief Profiler.

#ifndef ROC_TOOLKIT_MOV_STATS_H
#define ROC_TOOLKIT_MOV_STATS_H

#include "roc_core/array.h"
#include "roc_core/iarena.h"
#include "roc_core/panic.h"

namespace roc {
namespace core {

//! Rolling window moving average and variance.
//!
//! Efficiently implements moving average and variance based on approach
//! described in https://www.dsprelated.com/showthread/comp.dsp/97276-1.php
//!
//! @tparam T defines a sample type.
template <typename T>
class MovStats {
public:
//! Initialize.
MovStats(IArena& arena, const size_t win_len)
: buffer_(arena)
, buffer2_(arena)
, win_len_(win_len)
, buffer_i_(0)
, movsum_(T(0))
, movsum2_(T(0))
, mov_var_(T(0))
, full_(false)
{
if(!buffer_.resize(win_len)){
roc_panic("MovStats: can't allocate storage for the ring buffer");
}
if(!buffer2_.resize(win_len)){
roc_panic("MovStats: can't allocate storage for the ring buffer");
}
memset(buffer_.data(), 0, sizeof(T)*buffer_.size());
memset(buffer2_.data(), 0, sizeof(T)*buffer2_.size());
}

//! Shift rolling window by one sample x.
void add(const T& x)
{
const T x2 = x*x;
const T x_old = buffer_[buffer_i_];
buffer_[buffer_i_] = x;
const T x2_old = buffer2_[buffer_i_];
buffer2_[buffer_i_] = x2;

movsum_ += x - x_old;
movsum2_ += x2 - x2_old;

buffer_i_++;
if (buffer_i_ == win_len_) {
buffer_i_ = 0;
full_ = true;
}
}

//! Get moving average value.
T mov_avg() const
{
const T n = full_ ? T(win_len_) : T(buffer_i_ + 1);
return movsum_ / n;
}

//! Get variance.
T mov_var() const
{
const T n = full_ ? T(win_len_) : T(buffer_i_+1);
if (n == 1) {
return (T)sqrt(movsum2_ - movsum_ * movsum_);
} else {
return (T)sqrt((n*movsum2_ - movsum_ * movsum_) / (n * n));
}
}

//! Extend rolling window length.
//! @remarks
//! Potentially could cause a gap in the estimated values as
//! decreases effective window size by dropping samples to the right from
//! the cursor in the ring buffers:
//! buffer_i_ win_len_ old win_len_ new
//! ↓ ↓ ↓
//! [■■■■■■■■■■□□□□□□□□□□□□□□□□□□□□□____________________]
//! ↑ ↑ ↑
//! Dropped samples.
void extend_win(const size_t new_win)
{
if (new_win <= win_len_) {
roc_panic("MovStats: the window length can only grow");
}
if (!buffer_.resize(new_win)) {
roc_panic("MovStats: can not increase storage");
}

movsum_ = 0;
movsum2_ = 0;
for (size_t i = 0; i < buffer_i_; i++) {
movsum_ += buffer_[i];
movsum2_ += buffer2_[i];
}
full_ = false;
}

private:
Array<T> buffer_;
Array<T> buffer2_;
const size_t win_len_;
size_t buffer_i_;
T movsum_;
T movsum2_;
T mov_var_;
bool full_;
};

} // namespace core
} // namespace roc

#endif // ROC_TOOLKIT_MOV_STATS_H
3 changes: 2 additions & 1 deletion src/internal_modules/roc_packet/rtp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ RTP::RTP()
, duration(0)
, capture_timestamp(0)
, marker(false)
, payload_type(0) {
, payload_type(0)
, fec_recovered(false) {
}

int RTP::compare(const RTP& other) const {
Expand Down
4 changes: 4 additions & 0 deletions src/internal_modules/roc_packet/rtp.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ struct RTP {
//! Packet payload type ("pt").
unsigned int payload_type;

//! Internal flag, valid for receiver side only.
//! Signals if this packet was recovered by FEC.
bool fec_recovered;

//! Packet header.
core::Slice<uint8_t> header;

Expand Down
6 changes: 6 additions & 0 deletions src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ struct UDP {

//! Sender request state.
uv_udp_send_t request;

//! When the packet was put into jitter-buffer queue.
//! It points to a moment when the packet was transferred to a sink-thread, that
//! "consumes" this packet. The reason to have it separate is that this allows
//! us to account additional jitter introduced by thread-switch time.
core::nanoseconds_t enqueue_ts;
};

} // namespace packet
Expand Down
5 changes: 3 additions & 2 deletions src/internal_modules/roc_pipeline/receiver_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ ReceiverSession::ReceiverSession(
preader = validator_.get();

populator_.reset(new (populator_) rtp::Populator(*preader, *payload_decoder_,
encoding->sample_spec));
encoding->sample_spec, false));
if (!populator_) {
return;
}
Expand Down Expand Up @@ -115,7 +115,7 @@ ReceiverSession::ReceiverSession(
preader = fec_validator_.get();

fec_populator_.reset(new (fec_populator_) rtp::Populator(
*preader, *payload_decoder_, encoding->sample_spec));
*preader, *payload_decoder_, encoding->sample_spec, true));
if (!fec_populator_) {
return;
}
Expand Down Expand Up @@ -221,6 +221,7 @@ status::StatusCode ReceiverSession::route(const packet::PacketPtr& packet) {
roc_panic_if(!is_valid());

packet::UDP* udp = packet->udp();
packet->udp()->enqueue_ts = core::timestamp(core::ClockUnix);
if (!udp) {
// TODO(gh-183): return StatusNoRoute
return status::StatusUnknown;
Expand Down
8 changes: 6 additions & 2 deletions src/internal_modules/roc_rtp/populator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ namespace rtp {

Populator::Populator(packet::IReader& reader,
audio::IFrameDecoder& decoder,
const audio::SampleSpec& sample_spec)
const audio::SampleSpec& sample_spec,
const bool set_recovered)
: reader_(reader)
, decoder_(decoder)
, sample_spec_(sample_spec) {
, sample_spec_(sample_spec)
, set_recovered_(set_recovered) {
}

status::StatusCode Populator::read(packet::PacketPtr& packet) {
Expand All @@ -36,6 +38,8 @@ status::StatusCode Populator::read(packet::PacketPtr& packet) {
packet->rtp()->payload.data(), packet->rtp()->payload.size());
}

packet->rtp()->fec_recovered = set_recovered_;

return status::StatusOK;
}

Expand Down
4 changes: 3 additions & 1 deletion src/internal_modules/roc_rtp/populator.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class Populator : public packet::IReader, public core::NonCopyable<> {
//! Initialize.
Populator(packet::IReader& reader,
audio::IFrameDecoder& decoder,
const audio::SampleSpec& sample_spec);
const audio::SampleSpec& sample_spec,
const bool set_recovered);

//! Read next packet.
virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr&);
Expand All @@ -35,6 +36,7 @@ class Populator : public packet::IReader, public core::NonCopyable<> {
packet::IReader& reader_;
audio::IFrameDecoder& decoder_;
const audio::SampleSpec sample_spec_;
const bool set_recovered_;
};

} // namespace rtp
Expand Down
36 changes: 36 additions & 0 deletions src/internal_modules/roc_rtp/stream_stats_monitor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2023 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#include "stream_stats_monitor.h"
#include "roc_status/status_code.h"

namespace roc {
namespace rtp {

StreamStatsMonitor::StreamStatsMonitor(packet::IReader& reader,
core::IArena& arena,
const audio::SampleSpec& sample_spec,
const StreamStatsConfig &config)
: reader_(reader)
, arena_(arena)
, config_(config)
, sample_spec_(sample_spec)
{}

status::StatusCode StreamStatsMonitor::read(packet::PacketPtr& packet)
{
status::StatusCode result = reader_.read(packet);
if (result == status::StatusOK){

}

return result;
}

} // namespace packet
} // namespace roc
58 changes: 58 additions & 0 deletions src/internal_modules/roc_rtp/stream_stats_monitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2023 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

//! @file roc_rtp/stream_stats_monitor.h
//! @brief Calculates basic network stream statistics.


#ifndef ROC_PACKET_STREAMSTATSMONITOR_H_
#define ROC_PACKET_STREAMSTATSMONITOR_H_

#include "roc_core/noncopyable.h"
#include "roc_packet/ireader.h"
#include "roc_core/time.h"
#include "roc_audio/sample_spec.h"
#include "roc_status/status_code.h"

namespace roc {
namespace rtp {

struct StreamStatsConfig {
core::nanoseconds_t window_duration;
core::nanoseconds_t window_overlap;
};

struct StreamStats {
core::nanoseconds_t windowed_max_jitter;

size_t windowed_npackets;
size_t windowed_lost_packets;
size_t windowed_recovered_packets;

float packet_loss_rate;
};

class StreamStatsMonitor : public packet::IReader, public core::NonCopyable<> {
public:
StreamStatsMonitor(packet::IReader& reader, core::IArena& arena,
const audio::SampleSpec& sample_spec,
const StreamStatsConfig &config);

virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr& packet);
private:
packet::IReader& reader_;
core::IArena& arena_;
const StreamStatsConfig config_;
const audio::SampleSpec sample_spec_;

};

} // namespace packet
} // namespace roc

#endif // ROC_PACKET_STREAMSTATSMONITOR_H_
Loading

0 comments on commit a984021

Please sign in to comment.