From 08be26d50ed548917de3109ea614cee8794df888 Mon Sep 17 00:00:00 2001 From: Steven Zhu Date: Mon, 7 Dec 2020 03:24:31 -0600 Subject: [PATCH] prevent task starvation (#434) --- src/modules/roc_ctl/task_queue.cpp | 19 +++++-- src/modules/roc_ctl/task_queue.h | 1 + src/tests/roc_ctl/test_task_queue.cpp | 77 ++++++++++++++++++++++++++- 3 files changed, 92 insertions(+), 5 deletions(-) diff --git a/src/modules/roc_ctl/task_queue.cpp b/src/modules/roc_ctl/task_queue.cpp index b3587b9a8..4c80c74fb 100644 --- a/src/modules/roc_ctl/task_queue.cpp +++ b/src/modules/roc_ctl/task_queue.cpp @@ -49,6 +49,7 @@ TaskQueue::ICompletionHandler::~ICompletionHandler() { TaskQueue::TaskQueue() : started_(false) , stop_(false) + , fetch_ready_(true) , ready_queue_size_(0) { started_ = Thread::start(); } @@ -159,10 +160,22 @@ bool TaskQueue::process_tasks_() { core::Mutex::Lock lock(task_mutex_); for (;;) { - Task* task = fetch_ready_task_(); - - if (!task) { + Task* task = NULL; + + if (fetch_ready_) { + task = fetch_ready_task_(); + if (!task) { + task = fetch_sleeping_task_(); + } else { + fetch_ready_ = !fetch_ready_; + } + } else { task = fetch_sleeping_task_(); + if (!task) { + task = fetch_ready_task_(); + } else { + fetch_ready_ = !fetch_ready_; + } } if (!task) { diff --git a/src/modules/roc_ctl/task_queue.h b/src/modules/roc_ctl/task_queue.h index 5b9e2b67a..31e06d338 100644 --- a/src/modules/roc_ctl/task_queue.h +++ b/src/modules/roc_ctl/task_queue.h @@ -282,6 +282,7 @@ class TaskQueue : private core::Thread { bool started_; core::Atomic stop_; + bool fetch_ready_; core::Atomic ready_queue_size_; core::MpscQueue ready_queue_; diff --git a/src/tests/roc_ctl/test_task_queue.cpp b/src/tests/roc_ctl/test_task_queue.cpp index c61fb9726..3433e0288 100644 --- a/src/tests/roc_ctl/test_task_queue.cpp +++ b/src/tests/roc_ctl/test_task_queue.cpp @@ -747,9 +747,9 @@ TEST(task_queue, schedule_at_and_schedule) { const core::nanoseconds_t now = core::timestamp(); tq.schedule(tasks[0], &handler); - tq.schedule_at(tasks[1], now + core::Millisecond * 3, &handler); + tq.schedule_at(tasks[1], now + core::Millisecond * 7, &handler); tq.schedule(tasks[2], &handler); - tq.schedule_at(tasks[3], now + core::Millisecond * 1, &handler); + tq.schedule_at(tasks[3], now + core::Millisecond * 5, &handler); tq.unblock_one(); CHECK(handler.wait_called() == &tasks[0]); @@ -1290,5 +1290,78 @@ TEST(task_queue, reschedule_cancelled) { UNSIGNED_LONGS_EQUAL(1, tq.num_tasks()); } +TEST(task_queue, no_starvation) { + TestTaskQueue tq; + CHECK(tq.valid()); + + enum { NumTasks = 6 }; + + UNSIGNED_LONGS_EQUAL(0, tq.num_tasks()); + + TestHandler handler; + handler.expect_success(true); + handler.expect_n_calls(NumTasks); + + TestTaskQueue::Task tasks[NumTasks]; + + tq.block(); + + const core::nanoseconds_t now = core::timestamp(); + const core::nanoseconds_t WaitTime = core::Millisecond; + + tq.schedule_at(tasks[0], now + WaitTime, &handler); + tq.schedule_at(tasks[1], now + WaitTime * 2, &handler); + tq.schedule_at(tasks[2], now + WaitTime * 3, &handler); + tq.schedule(tasks[3], &handler); + tq.schedule(tasks[4], &handler); + tq.schedule(tasks[5], &handler); + + for (size_t i = 0; i < NumTasks; i++) { + tq.set_nth_result(i, true); + } + + // wait for sleeping task to sync + core::sleep_for(WaitTime * (NumTasks / 2)); + + // check that the tasks are fetched from alternating queues + tq.unblock_one(); + TaskQueue::Task* temp = handler.wait_called(); + CHECK(temp == &tasks[0] || temp == &tasks[3]); + UNSIGNED_LONGS_EQUAL(1, tq.num_tasks()); + CHECK(tasks[0].success() || tasks[3].success()); + + tq.unblock_one(); + temp = handler.wait_called(); + CHECK(temp == &tasks[0] || temp == &tasks[3]); + UNSIGNED_LONGS_EQUAL(2, tq.num_tasks()); + CHECK(tasks[0].success() && tasks[3].success()); + + tq.unblock_one(); + temp = handler.wait_called(); + CHECK(temp == &tasks[1] || temp == &tasks[4]); + UNSIGNED_LONGS_EQUAL(3, tq.num_tasks()); + CHECK(tasks[1].success() || tasks[4].success()); + + tq.unblock_one(); + temp = handler.wait_called(); + CHECK(temp == &tasks[1] || temp == &tasks[4]); + UNSIGNED_LONGS_EQUAL(4, tq.num_tasks()); + CHECK(tasks[1].success() && tasks[4].success()); + + tq.unblock_one(); + temp = handler.wait_called(); + CHECK(temp == &tasks[2] || temp == &tasks[5]); + UNSIGNED_LONGS_EQUAL(5, tq.num_tasks()); + CHECK(tasks[2].success() || tasks[5].success()); + + tq.unblock_one(); + temp = handler.wait_called(); + CHECK(temp == &tasks[2] || temp == &tasks[5]); + UNSIGNED_LONGS_EQUAL(6, tq.num_tasks()); + CHECK(tasks[2].success() && tasks[5].success()); + + tq.check_all_unblocked(); +} + } // namespace ctl } // namespace roc