Skip to content

Commit

Permalink
prevent task starvation (roc-streaming#434)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenzhu94 authored and gavv committed Mar 5, 2021
1 parent 9b577df commit 08be26d
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 5 deletions.
19 changes: 16 additions & 3 deletions src/modules/roc_ctl/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ TaskQueue::ICompletionHandler::~ICompletionHandler() {
TaskQueue::TaskQueue()
: started_(false)
, stop_(false)
, fetch_ready_(true)
, ready_queue_size_(0) {
started_ = Thread::start();
}
Expand Down Expand Up @@ -159,10 +160,22 @@ bool TaskQueue::process_tasks_() {
core::Mutex::Lock lock(task_mutex_);

for (;;) {
Task* task = fetch_ready_task_();

if (!task) {
Task* task = NULL;

if (fetch_ready_) {
task = fetch_ready_task_();
if (!task) {
task = fetch_sleeping_task_();
} else {
fetch_ready_ = !fetch_ready_;
}
} else {
task = fetch_sleeping_task_();
if (!task) {
task = fetch_ready_task_();
} else {
fetch_ready_ = !fetch_ready_;
}
}

if (!task) {
Expand Down
1 change: 1 addition & 0 deletions src/modules/roc_ctl/task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ class TaskQueue : private core::Thread {

bool started_;
core::Atomic<int> stop_;
bool fetch_ready_;

core::Atomic<int> ready_queue_size_;
core::MpscQueue<Task, core::NoOwnership> ready_queue_;
Expand Down
77 changes: 75 additions & 2 deletions src/tests/roc_ctl/test_task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,9 +747,9 @@ TEST(task_queue, schedule_at_and_schedule) {
const core::nanoseconds_t now = core::timestamp();

tq.schedule(tasks[0], &handler);
tq.schedule_at(tasks[1], now + core::Millisecond * 3, &handler);
tq.schedule_at(tasks[1], now + core::Millisecond * 7, &handler);
tq.schedule(tasks[2], &handler);
tq.schedule_at(tasks[3], now + core::Millisecond * 1, &handler);
tq.schedule_at(tasks[3], now + core::Millisecond * 5, &handler);

tq.unblock_one();
CHECK(handler.wait_called() == &tasks[0]);
Expand Down Expand Up @@ -1290,5 +1290,78 @@ TEST(task_queue, reschedule_cancelled) {
UNSIGNED_LONGS_EQUAL(1, tq.num_tasks());
}

TEST(task_queue, no_starvation) {
TestTaskQueue tq;
CHECK(tq.valid());

enum { NumTasks = 6 };

UNSIGNED_LONGS_EQUAL(0, tq.num_tasks());

TestHandler handler;
handler.expect_success(true);
handler.expect_n_calls(NumTasks);

TestTaskQueue::Task tasks[NumTasks];

tq.block();

const core::nanoseconds_t now = core::timestamp();
const core::nanoseconds_t WaitTime = core::Millisecond;

tq.schedule_at(tasks[0], now + WaitTime, &handler);
tq.schedule_at(tasks[1], now + WaitTime * 2, &handler);
tq.schedule_at(tasks[2], now + WaitTime * 3, &handler);
tq.schedule(tasks[3], &handler);
tq.schedule(tasks[4], &handler);
tq.schedule(tasks[5], &handler);

for (size_t i = 0; i < NumTasks; i++) {
tq.set_nth_result(i, true);
}

// wait for sleeping task to sync
core::sleep_for(WaitTime * (NumTasks / 2));

// check that the tasks are fetched from alternating queues
tq.unblock_one();
TaskQueue::Task* temp = handler.wait_called();
CHECK(temp == &tasks[0] || temp == &tasks[3]);
UNSIGNED_LONGS_EQUAL(1, tq.num_tasks());
CHECK(tasks[0].success() || tasks[3].success());

tq.unblock_one();
temp = handler.wait_called();
CHECK(temp == &tasks[0] || temp == &tasks[3]);
UNSIGNED_LONGS_EQUAL(2, tq.num_tasks());
CHECK(tasks[0].success() && tasks[3].success());

tq.unblock_one();
temp = handler.wait_called();
CHECK(temp == &tasks[1] || temp == &tasks[4]);
UNSIGNED_LONGS_EQUAL(3, tq.num_tasks());
CHECK(tasks[1].success() || tasks[4].success());

tq.unblock_one();
temp = handler.wait_called();
CHECK(temp == &tasks[1] || temp == &tasks[4]);
UNSIGNED_LONGS_EQUAL(4, tq.num_tasks());
CHECK(tasks[1].success() && tasks[4].success());

tq.unblock_one();
temp = handler.wait_called();
CHECK(temp == &tasks[2] || temp == &tasks[5]);
UNSIGNED_LONGS_EQUAL(5, tq.num_tasks());
CHECK(tasks[2].success() || tasks[5].success());

tq.unblock_one();
temp = handler.wait_called();
CHECK(temp == &tasks[2] || temp == &tasks[5]);
UNSIGNED_LONGS_EQUAL(6, tq.num_tasks());
CHECK(tasks[2].success() && tasks[5].success());

tq.check_all_unblocked();
}

} // namespace ctl
} // namespace roc

0 comments on commit 08be26d

Please sign in to comment.