Skip to content

Commit

Permalink
Rebase ring buffer branch to develop
Browse files Browse the repository at this point in the history
  • Loading branch information
ForeverASilver authored and gavv committed Dec 22, 2023
1 parent b98c7ab commit 96bd70b
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ void UdpReceiverPort::recv_cb_(uv_udp_t* handle,

pp->udp()->src_addr = src_addr;
pp->udp()->dst_addr = self.config_.bind_address;
pp->udp()->receive_timestamp = core::timestamp(core::ClockUnix);

pp->set_data(core::Slice<uint8_t>(*bp, 0, (size_t)nread));

Expand Down
3 changes: 3 additions & 0 deletions src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ struct UDP {

//! Sender request state.
uv_udp_send_t request;

//! Received Timestamp.
core::nanoseconds_t receive_timestamp;
};

} // namespace packet
Expand Down
7 changes: 7 additions & 0 deletions src/internal_modules/roc_pipeline/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ struct ReceiverSessionConfig {
//! Target latency, nanoseconds.
core::nanoseconds_t target_latency;

//! Packet prebuffer length, nanoseconds.
core::nanoseconds_t prebuf_len;

//! Packet payload type.
unsigned int payload_type;

Expand All @@ -185,6 +188,7 @@ struct ReceiverSessionConfig {

ReceiverSessionConfig()
: target_latency(DefaultLatency)
, prebuf_len(DefaultLatency)
, payload_type(0)
, resampler_backend(audio::ResamplerBackend_Default)
, resampler_profile(audio::ResamplerProfile_Medium) {
Expand Down Expand Up @@ -225,6 +229,9 @@ struct ReceiverCommonConfig {
//! Insert weird beeps instead of silence on packet loss.
bool enable_beeping;

//! Maximum number of packets per session.
size_t max_session_packets;

ReceiverCommonConfig()
: output_sample_spec(DefaultSampleSpec)
, enable_timing(false)
Expand Down
45 changes: 45 additions & 0 deletions src/internal_modules/roc_pipeline/receiver_session_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,56 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
}

if (!can_create_session_(packet)) {
enqueue_prebuf_packet_(packet);
// TODO(gh-183): return status
return status::StatusOK;
}

return create_session_(packet);
}

void ReceiverSessionGroup::enqueue_prebuf_packet_(const packet::PacketPtr& packet_ptr) {
prebuf_packets_.push_back(*packet_ptr.get());

core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);

while (prebuf_packets_.size() > 0) {
core::nanoseconds_t received = prebuf_packets_.front()->udp()->receive_timestamp;
if (now - received > receiver_config_.default_session.prebuf_len) {
prebuf_packets_.remove(*prebuf_packets_.front());
} else {
break;
}
}
}

void ReceiverSessionGroup::dequeue_prebuf_packets_(ReceiverSession& sess) {
packet::PacketPtr curr, next;

if (prebuf_packets_.size() == 0) {
return;
}

core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);

for (curr = prebuf_packets_.front(); curr; curr = next) {
next = prebuf_packets_.nextof(*curr);

// if packet is too old, remove it from the queue
core::nanoseconds_t received = curr->udp()->receive_timestamp;
if (now - received > receiver_config_.default_session.prebuf_len) {
prebuf_packets_.remove(*curr);
continue;
}

// if session handles the packet, remove it from the queue
const status::StatusCode code = sess.route(curr);
if (code == status::StatusOK) {
prebuf_packets_.remove(*curr);
}
}
}

status::StatusCode
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet) {
if (!rtcp_composer_) {
Expand Down Expand Up @@ -252,6 +295,8 @@ ReceiverSessionGroup::create_session_(const packet::PacketPtr& packet) {

receiver_state_.add_sessions(+1);

dequeue_prebuf_packets_(*sess);

return status::StatusOK;
}

Expand Down
3 changes: 3 additions & 0 deletions src/internal_modules/roc_pipeline/receiver_session_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IReceiver

status::StatusCode route_transport_packet_(const packet::PacketPtr& packet);
status::StatusCode route_control_packet_(const packet::PacketPtr& packet);
void enqueue_prebuf_packet_(const packet::PacketPtr& packet);
void dequeue_prebuf_packets_(ReceiverSession& sess);

bool can_create_session_(const packet::PacketPtr& packet);

Expand All @@ -108,6 +110,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IReceiver
core::Optional<rtcp::Session> rtcp_session_;

core::List<ReceiverSession> sessions_;
core::List<packet::Packet> prebuf_packets_;
};

} // namespace pipeline
Expand Down
8 changes: 8 additions & 0 deletions src/public_api/include/roc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,14 @@ typedef struct roc_receiver_config {
* If zero, default value is used. If negative, the timeout is disabled.
*/
long long choppy_playback_timeout;

/** Packet prebuffer length, in nanoseconds.
* Packets received for sessions that have not yet been created
* will be buffered. Any packets older than the prebuf_len
* will be discarded.
* If zero, default value is used.
*/
unsigned long long prebuf_len;
} roc_receiver_config;

/** Interface configuration.
Expand Down
56 changes: 56 additions & 0 deletions src/tests/roc_pipeline/test_receiver_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,62 @@ TEST(receiver_source, recv_timestamp_mapping_remixing) {
CHECK(first_ts);
}

TEST(receiver_source, packet_buffer) {
enum { Rate = SampleRate, Chans = Chans_Stereo, MaxPackets = 10 };

init(Rate, Chans, Rate, Chans);

ReceiverConfig config = make_config();
config.default_session.prebuf_len = 0;
ReceiverSource receiver(config, format_map, packet_factory, byte_buffer_factory,
sample_buffer_factory, arena);
CHECK(receiver.is_valid());

ReceiverSlot* slot = create_slot(receiver);
CHECK(slot);

packet::Queue queue;
packet::Queue source_queue;
packet::Queue repair_queue;

packet::IWriter* source_endpoint_writer =
create_endpoint(slot, address::Iface_AudioSource, address::Proto_RTP_RS8M_Source);
CHECK(source_endpoint_writer);

packet::IWriter* repair_endpoint_writer =
create_endpoint(slot, address::Iface_AudioRepair, address::Proto_RS8M_Repair);
CHECK(repair_endpoint_writer);

fec::WriterConfig fec_config;

test::PacketWriter packet_writer(
arena, queue, queue, format_map, packet_factory, byte_buffer_factory, src1, dst1,
dst2, PayloadType_Ch2, packet::FEC_ReedSolomon_M8, fec_config);

// setup reader
test::FrameReader frame_reader(receiver, sample_buffer_factory);

packet_writer.write_packets(fec_config.n_source_packets, SamplesPerPacket,
output_sample_spec);

for (int i = 0; i < ManyPackets; ++i) {
packet::PacketPtr pp;
UNSIGNED_LONGS_EQUAL(status::StatusOK, queue.read(pp));
CHECK(pp);

if (pp->flags() & packet::Packet::FlagAudio) {
UNSIGNED_LONGS_EQUAL(status::StatusOK, source_queue.write(pp));
}
if (pp->flags() & packet::Packet::FlagRepair) {
UNSIGNED_LONGS_EQUAL(status::StatusOK, repair_queue.write(pp));
}
}

receiver.refresh(frame_reader.refresh_ts());
frame_reader.read_nonzero_samples(SamplesPerFrame, output_sample_spec);
UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions());
}

TEST(receiver_source, metrics_sessions) {
enum { Rate = SampleRate, Chans = Chans_Stereo, MaxSess = 10 };

Expand Down
3 changes: 3 additions & 0 deletions src/tools/roc_recv/cmdline.ggo
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ section "Options"
option "no-play-timeout" - "No playback timeout, TIME units"
string optional

option "prebuf-len" - "Length of packet prebuffer, TIME units"
string optional

option "choppy-play-timeout" - "Choppy playback timeout, TIME units"
string optional

Expand Down
13 changes: 13 additions & 0 deletions src/tools/roc_recv/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,19 @@ int main(int argc, char** argv) {
receiver_config.default_session.target_latency);
}

if (args.prebuf_len_given) {
core::nanoseconds_t prebuf_len = 0;
if (!core::parse_duration(args.prebuf_len_arg, prebuf_len)) {
roc_log(LogError, "invalid --prebuf-len");
return 1;
}
receiver_config.default_session.prebuf_len =
(core::nanoseconds_t)args.prebuf_len_arg;
} else {
receiver_config.default_session.prebuf_len =
receiver_config.default_session.target_latency;
}

if (args.choppy_play_timeout_given) {
if (!core::parse_duration(
args.choppy_play_timeout_arg,
Expand Down

0 comments on commit 96bd70b

Please sign in to comment.