diff --git a/src/internal_modules/roc_netio/target_libuv/roc_netio/udp_receiver_port.cpp b/src/internal_modules/roc_netio/target_libuv/roc_netio/udp_receiver_port.cpp index 5f8d5a3e01..61e7b8f18d 100644 --- a/src/internal_modules/roc_netio/target_libuv/roc_netio/udp_receiver_port.cpp +++ b/src/internal_modules/roc_netio/target_libuv/roc_netio/udp_receiver_port.cpp @@ -287,6 +287,7 @@ void UdpReceiverPort::recv_cb_(uv_udp_t* handle, pp->udp()->src_addr = src_addr; pp->udp()->dst_addr = self.config_.bind_address; + pp->udp()->receive_timestamp = core::timestamp(core::ClockUnix); pp->set_data(core::Slice(*bp, 0, (size_t)nread)); diff --git a/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h b/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h index 58291cad5e..c8a2b70cf6 100644 --- a/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h +++ b/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h @@ -31,6 +31,9 @@ struct UDP { //! Sender request state. uv_udp_send_t request; + + //! Received Timestamp. + core::nanoseconds_t receive_timestamp; }; } // namespace packet diff --git a/src/internal_modules/roc_pipeline/config.h b/src/internal_modules/roc_pipeline/config.h index 535f87219e..ed592ed449 100644 --- a/src/internal_modules/roc_pipeline/config.h +++ b/src/internal_modules/roc_pipeline/config.h @@ -162,6 +162,9 @@ struct ReceiverSessionConfig { //! Target latency, nanoseconds. core::nanoseconds_t target_latency; + //! Packet prebuffer length, nanoseconds. + core::nanoseconds_t prebuf_len; + //! Packet payload type. unsigned int payload_type; @@ -188,6 +191,7 @@ struct ReceiverSessionConfig { ReceiverSessionConfig() : target_latency(DefaultLatency) + , prebuf_len(DefaultLatency) , payload_type(0) , resampler_backend(audio::ResamplerBackend_Default) , resampler_profile(audio::ResamplerProfile_Medium) { @@ -228,6 +232,9 @@ struct ReceiverCommonConfig { //! Insert weird beeps instead of silence on packet loss. bool enable_beeping; + //! Maximum number of packets per session. + size_t max_session_packets; + ReceiverCommonConfig() : output_sample_spec(DefaultSampleSpec) , enable_timing(false) diff --git a/src/internal_modules/roc_pipeline/receiver_session_group.cpp b/src/internal_modules/roc_pipeline/receiver_session_group.cpp index b49899abe6..1b3ef6e80d 100644 --- a/src/internal_modules/roc_pipeline/receiver_session_group.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session_group.cpp @@ -166,6 +166,7 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) { } if (!can_create_session_(packet)) { + enqueue_prebuf_packet_(packet); // TODO(gh-183): return status return status::StatusOK; } @@ -173,6 +174,48 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) { return create_session_(packet); } +void ReceiverSessionGroup::enqueue_prebuf_packet_(const packet::PacketPtr& packet_ptr) { + prebuf_packets_.push_back(*packet_ptr.get()); + + core::nanoseconds_t now = core::timestamp(core::ClockMonotonic); + + while (prebuf_packets_.size() > 0) { + core::nanoseconds_t received = prebuf_packets_.front()->udp()->receive_timestamp; + if (now - received > receiver_config_.default_session.prebuf_len) { + prebuf_packets_.remove(*prebuf_packets_.front()); + } else { + break; + } + } +} + +void ReceiverSessionGroup::dequeue_prebuf_packets_(ReceiverSession& sess) { + packet::PacketPtr curr, next; + + if (prebuf_packets_.size() == 0) { + return; + } + + core::nanoseconds_t now = core::timestamp(core::ClockMonotonic); + + for (curr = prebuf_packets_.front(); curr; curr = next) { + next = prebuf_packets_.nextof(*curr); + + // if packet is too old, remove it from the queue + core::nanoseconds_t received = curr->udp()->receive_timestamp; + if (now - received > receiver_config_.default_session.prebuf_len) { + prebuf_packets_.remove(*curr); + continue; + } + + // if session handles the packet, remove it from the queue + const status::StatusCode code = sess.route(curr); + if (code == status::StatusOK) { + prebuf_packets_.remove(*curr); + } + } +} + status::StatusCode ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet) { if (!rtcp_composer_) { @@ -252,6 +295,8 @@ ReceiverSessionGroup::create_session_(const packet::PacketPtr& packet) { receiver_state_.add_sessions(+1); + dequeue_prebuf_packets_(*sess); + return status::StatusOK; } diff --git a/src/internal_modules/roc_pipeline/receiver_session_group.h b/src/internal_modules/roc_pipeline/receiver_session_group.h index 4f73db2bd1..d14b8e8812 100644 --- a/src/internal_modules/roc_pipeline/receiver_session_group.h +++ b/src/internal_modules/roc_pipeline/receiver_session_group.h @@ -82,6 +82,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IReceiver status::StatusCode route_transport_packet_(const packet::PacketPtr& packet); status::StatusCode route_control_packet_(const packet::PacketPtr& packet); + void enqueue_prebuf_packet_(const packet::PacketPtr& packet); + void dequeue_prebuf_packets_(ReceiverSession& sess); bool can_create_session_(const packet::PacketPtr& packet); @@ -108,6 +110,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IReceiver core::Optional rtcp_session_; core::List sessions_; + core::List prebuf_packets_; }; } // namespace pipeline diff --git a/src/public_api/include/roc/config.h b/src/public_api/include/roc/config.h index 1765f6df3c..6efcac42da 100644 --- a/src/public_api/include/roc/config.h +++ b/src/public_api/include/roc/config.h @@ -781,6 +781,14 @@ typedef struct roc_receiver_config { * If zero, default value is used. If negative, the timeout is disabled. */ long long choppy_playback_timeout; + + /** Packet prebuffer length, in nanoseconds. + * Packets received for sessions that have not yet been created + * will be buffered. Any packets older than the prebuf_len + * will be discarded. + * If zero, default value is used. + */ + unsigned long long prebuf_len; } roc_receiver_config; /** Interface configuration. diff --git a/src/tests/roc_pipeline/test_receiver_source.cpp b/src/tests/roc_pipeline/test_receiver_source.cpp index ca62e1ed35..e5ddbfca81 100644 --- a/src/tests/roc_pipeline/test_receiver_source.cpp +++ b/src/tests/roc_pipeline/test_receiver_source.cpp @@ -1762,6 +1762,62 @@ IGNORE_TEST(receiver_source, timestamp_mapping_remixing) { CHECK(first_ts); } +TEST(receiver_source, packet_buffer) { + enum { Rate = SampleRate, Chans = Chans_Stereo, MaxPackets = 10 }; + + init(Rate, Chans, Rate, Chans); + + ReceiverConfig config = make_config(); + config.default_session.prebuf_len = 0; + ReceiverSource receiver(config, format_map, packet_factory, byte_buffer_factory, + sample_buffer_factory, arena); + CHECK(receiver.is_valid()); + + ReceiverSlot* slot = create_slot(receiver); + CHECK(slot); + + packet::Queue queue; + packet::Queue source_queue; + packet::Queue repair_queue; + + packet::IWriter* source_endpoint_writer = + create_endpoint(slot, address::Iface_AudioSource, address::Proto_RTP_RS8M_Source); + CHECK(source_endpoint_writer); + + packet::IWriter* repair_endpoint_writer = + create_endpoint(slot, address::Iface_AudioRepair, address::Proto_RS8M_Repair); + CHECK(repair_endpoint_writer); + + fec::WriterConfig fec_config; + + test::PacketWriter packet_writer( + arena, queue, queue, format_map, packet_factory, byte_buffer_factory, src1, dst1, + dst2, PayloadType_Ch2, packet::FEC_ReedSolomon_M8, fec_config); + + // setup reader + test::FrameReader frame_reader(receiver, sample_buffer_factory); + + packet_writer.write_packets(fec_config.n_source_packets, SamplesPerPacket, + output_sample_spec); + + for (int i = 0; i < ManyPackets; ++i) { + packet::PacketPtr pp; + UNSIGNED_LONGS_EQUAL(status::StatusOK, queue.read(pp)); + CHECK(pp); + + if (pp->flags() & packet::Packet::FlagAudio) { + UNSIGNED_LONGS_EQUAL(status::StatusOK, source_queue.write(pp)); + } + if (pp->flags() & packet::Packet::FlagRepair) { + UNSIGNED_LONGS_EQUAL(status::StatusOK, repair_queue.write(pp)); + } + } + + receiver.refresh(frame_reader.refresh_ts()); + frame_reader.read_nonzero_samples(SamplesPerFrame, output_sample_spec); + UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions()); +} + TEST(receiver_source, metrics_sessions) { enum { Rate = SampleRate, Chans = Chans_Stereo, MaxSess = 10 }; diff --git a/src/tools/roc_recv/cmdline.ggo b/src/tools/roc_recv/cmdline.ggo index 684db2eb48..73a1117cd7 100644 --- a/src/tools/roc_recv/cmdline.ggo +++ b/src/tools/roc_recv/cmdline.ggo @@ -41,6 +41,9 @@ section "Options" option "no-play-timeout" - "No playback timeout, TIME units" string optional + option "prebuf-len" - "Length of packet prebuffer, TIME units" + string optional + option "choppy-play-timeout" - "Choppy playback timeout, TIME units" string optional diff --git a/src/tools/roc_recv/main.cpp b/src/tools/roc_recv/main.cpp index fe94b80d55..a37459d9cb 100644 --- a/src/tools/roc_recv/main.cpp +++ b/src/tools/roc_recv/main.cpp @@ -155,6 +155,19 @@ int main(int argc, char** argv) { } } + if (args.prebuf_len_given) { + core::nanoseconds_t prebuf_len = 0; + if (!core::parse_duration(args.prebuf_len_arg, prebuf_len)) { + roc_log(LogError, "invalid --prebuf-len"); + return 1; + } + receiver_config.default_session.prebuf_len = + (core::nanoseconds_t)args.prebuf_len_arg; + } else { + receiver_config.default_session.prebuf_len = + receiver_config.default_session.target_latency; + } + if (args.choppy_play_timeout_given) { if (!core::parse_duration( args.choppy_play_timeout_arg,