Skip to content

Commit

Permalink
tests: Fix API tests hangs and failures on valgrind
Browse files Browse the repository at this point in the history
TEST(loopback_sender_2_receiver, metrics_measurements) sometimes hangs
because e2e latency remains zero.

This commit adds support for 2 new flags for test::Proxy:
FlagDeliveryDelay and FlagDeliveryJitter.

metrics_measurements now uses a proxy with this flags, to ensure that
e2e latency and jitter are never zero in test environment.

Test timeout is increased for valgrind because metrics_measurements is
still sometimes quite slow.

Also latency is increased for plugin_plc test because startup time is
sometimes quite slow on valgrind.
  • Loading branch information
gavv committed Dec 19, 2024
1 parent 73869b0 commit daa0309
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 70 deletions.
4 changes: 2 additions & 2 deletions scripts/ci_checks/linux-checks/valgrind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ scons -Q \
find bin/x86_64-pc-linux-gnu -name 'roc-test-*' |\
while read tst
do
python3 scripts/scons_helpers/timeout-run.py 300 \
python3 scripts/scons_helpers/timeout-run.py 3000 \
valgrind \
--max-stackframe=10475520 \
--error-exitcode=1 --exit-on-first-error=yes \
Expand All @@ -34,7 +34,7 @@ scons -Q \
find bin/x86_64-pc-linux-gnu -name 'roc-test-*' |\
while read tst
do
python3 scripts/scons_helpers/timeout-run.py 300 \
python3 scripts/scons_helpers/timeout-run.py 3000 \
valgrind \
--max-stackframe=10475520 \
--error-exitcode=1 --exit-on-first-error=yes \
Expand Down
6 changes: 2 additions & 4 deletions src/internal_modules/roc_packet/concurrent_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ status::StatusCode ConcurrentQueue::init_status() const {
}

status::StatusCode ConcurrentQueue::write(const PacketPtr& packet) {
if (!packet) {
roc_panic("concurrent queue: packet is null");
if (packet) {
queue_.push_back(*packet);
}

queue_.push_back(*packet);

if (write_sem_) {
write_sem_->post();
}
Expand Down
60 changes: 58 additions & 2 deletions src/tests/public_api/test_helpers/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

#include "roc_address/socket_addr.h"
#include "roc_core/atomic.h"
#include "roc_core/fast_random.h"
#include "roc_core/heap_arena.h"
#include "roc_core/thread.h"
#include "roc_netio/network_loop.h"
#include "roc_packet/concurrent_queue.h"
#include "roc_packet/fifo_queue.h"
#include "roc_packet/packet_factory.h"
#include "roc_status/status_code.h"
Expand All @@ -27,7 +30,7 @@ namespace roc {
namespace api {
namespace test {

class Proxy : private packet::IWriter {
class Proxy : public core::Thread, private packet::IWriter {
public:
Proxy(const roc_endpoint* receiver_source_endp,
const roc_endpoint* receiver_repair_endp,
Expand All @@ -36,6 +39,7 @@ class Proxy : private packet::IWriter {
unsigned flags)
: packet_pool_("proxy_packet_pool", arena_)
, buffer_pool_("proxy_buffer_pool", arena_, 2000)
, queue_(packet::ConcurrentQueue::Blocking)
, net_loop_(packet_pool_, buffer_pool_, arena_)
, n_source_packets_(n_source_packets)
, n_repair_packets_(n_repair_packets)
Expand Down Expand Up @@ -132,7 +136,44 @@ class Proxy : private packet::IWriter {
return n_dropped_packets_;
}

void stop_and_join() {
stopped_ = true;
LONGS_EQUAL(status::StatusOK, queue_.write(NULL));
join();
}

private:
virtual void run() {
bool first_packet = true;

for (;;) {
packet::PacketPtr pp;
const status::StatusCode code = queue_.read(pp, packet::ModeFetch);
if (code == status::StatusDrain) {
break;
}
CHECK(code == status::StatusOK);
CHECK(pp);

if (stopped_) {
break;
}

if (first_packet) {
first_packet = false;
if (flags_ & test::FlagDeliveryDelay) {
delivery_delay_();
}
}

if (flags_ & test::FlagDeliveryJitter) {
delivery_jitter_();
}

LONGS_EQUAL(status::StatusOK, writer_->write(pp));
}
}

virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr& pp) {
pp->udp()->src_addr = send_config_.bind_address;

Expand Down Expand Up @@ -178,11 +219,23 @@ class Proxy : private packet::IWriter {
if (drop) {
n_dropped_packets_++;
} else {
LONGS_EQUAL(status::StatusOK, writer_->write(pp));
LONGS_EQUAL(status::StatusOK, queue_.write(pp));
}
return true;
}

void delivery_delay_() {
core::sleep_for(core::ClockMonotonic,
(core::nanoseconds_t)core::fast_random_range(
core::Millisecond * 2, core::Millisecond * 5));
}

void delivery_jitter_() {
core::sleep_for(core::ClockMonotonic,
(core::nanoseconds_t)core::fast_random_range(
core::Microsecond * 1, core::Microsecond * 5));
}

core::HeapArena arena_;

core::SlabPool<packet::Packet> packet_pool_;
Expand All @@ -201,6 +254,7 @@ class Proxy : private packet::IWriter {
packet::FifoQueue source_queue_;
packet::FifoQueue repair_queue_;

packet::ConcurrentQueue queue_;
packet::IWriter* writer_;

netio::NetworkLoop net_loop_;
Expand All @@ -211,6 +265,8 @@ class Proxy : private packet::IWriter {

const unsigned flags_;
size_t pos_;

core::Atomic<int> stopped_;
};

} // namespace test
Expand Down
3 changes: 2 additions & 1 deletion src/tests/public_api/test_helpers/receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,9 @@ class Receiver : public core::Thread {
return conn_metrics_[n];
}

void stop() {
void stop_and_join() {
stopped_ = true;
join();
}

private:
Expand Down
3 changes: 2 additions & 1 deletion src/tests/public_api/test_helpers/sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ class Sender : public core::Thread {
return conn_metrics_[n];
}

void stop() {
void stop_and_join() {
stopped_ = true;
join();
}

private:
Expand Down
2 changes: 2 additions & 0 deletions src/tests/public_api/test_helpers/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ enum {
FlagInfinite = (1 << 5),
FlagLoseSomePkts = (1 << 6),
FlagLoseAllRepairPkts = (1 << 7),
FlagDeliveryDelay = (1 << 8),
FlagDeliveryJitter = (1 << 9),
};

inline float increment_sample_value(float sample_value, float sample_step) {
Expand Down
Loading

0 comments on commit daa0309

Please sign in to comment.