Skip to content

Commit

Permalink
Process pipeline tasks in-place if issued from I/O thread
Browse files Browse the repository at this point in the history
This commit eliminates unwanted delays in task processing,
when I/O thread schedules task before processing frame.
  • Loading branch information
gavv committed Sep 12, 2023
1 parent 78148af commit 2e8a0cb
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 0 deletions.
23 changes: 23 additions & 0 deletions src/internal_modules/roc_pipeline/pipeline_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ PipelineLoop::PipelineLoop(IPipelineTaskScheduler& scheduler,
, pending_tasks_(0)
, pending_frames_(0)
, processing_state_(ProcNotScheduled)
, frame_processing_tid_(0)
, next_frame_deadline_(0)
, subframe_tasks_deadline_(0)
, samples_processed_(0)
Expand Down Expand Up @@ -261,6 +262,8 @@ bool PipelineLoop::process_subframes_and_tasks_precise_(audio::Frame& frame) {

report_stats_();

frame_processing_tid_.exclusive_store(tid_imp());

pipeline_mutex_.unlock();

if (--pending_frames_ == 0 && pending_tasks_ != 0) {
Expand Down Expand Up @@ -411,9 +414,29 @@ PipelineLoop::update_next_frame_deadline_(core::nanoseconds_t frame_start_time,
bool PipelineLoop::interframe_task_processing_allowed_(
core::nanoseconds_t next_frame_deadline) const {
if (!config_.enable_precise_task_scheduling) {
// task scheduling disabled, so we just process all task in-place
return true;
}

uint64_t frame_tid = 0;
if (frame_processing_tid_.try_load(frame_tid)) {
if (frame_tid == 0) {
// no frames were ever processed yet
// until the very first frame, we allow processing all tasks in-place
return true;
}
if (frame_tid == tid_imp()) {
// last frame was processed at current thread
// we assume that frames are usually processed at the same thread, and
// hence allow processing tasks in-place on that thread, because likely
// it will anyway wait for task completion before proceeding to frame
return true;
}
}

// this task is scheduled not from the thread that processes frames
// if there is enough time until next frame, we allow processing task in-place,
// otherwise the task should be queued to avoid blocking frame processing
const core::nanoseconds_t now = timestamp_imp();

return now < (next_frame_deadline - no_task_proc_half_interval_)
Expand Down
6 changes: 6 additions & 0 deletions src/internal_modules/roc_pipeline/pipeline_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ class PipelineLoop : public core::NonCopyable<> {
//! Get current time.
virtual core::nanoseconds_t timestamp_imp() const = 0;

//! Get current thread id.
virtual uint64_t tid_imp() const = 0;

//! Process subframe.
virtual bool process_subframe_imp(audio::Frame& frame) = 0;

Expand Down Expand Up @@ -318,6 +321,9 @@ class PipelineLoop : public core::NonCopyable<> {
// asynchronous processing state
core::Atomic<int> processing_state_;

// tid of last thread that performed frame processing
core::Seqlock<uint64_t> frame_processing_tid_;

// when next frame is expected to be started
core::Seqlock<core::nanoseconds_t> next_frame_deadline_;

Expand Down
5 changes: 5 additions & 0 deletions src/internal_modules/roc_pipeline/receiver_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_core/shared_ptr.h"
#include "roc_core/thread.h"

namespace roc {
namespace pipeline {
Expand Down Expand Up @@ -233,6 +234,10 @@ core::nanoseconds_t ReceiverLoop::timestamp_imp() const {
return core::timestamp(core::ClockMonotonic);
}

uint64_t ReceiverLoop::tid_imp() const {
return core::Thread::get_tid();
}

bool ReceiverLoop::process_subframe_imp(audio::Frame& frame) {
// TODO: handle returned deadline and schedule refresh
source_.refresh(core::timestamp(core::ClockUnix));
Expand Down
1 change: 1 addition & 0 deletions src/internal_modules/roc_pipeline/receiver_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class ReceiverLoop : public PipelineLoop, private sndio::ISource {

// Methods of PipelineLoop
virtual core::nanoseconds_t timestamp_imp() const;
virtual uint64_t tid_imp() const;
virtual bool process_subframe_imp(audio::Frame& frame);
virtual bool process_task_imp(PipelineTask& task);

Expand Down
5 changes: 5 additions & 0 deletions src/internal_modules/roc_pipeline/sender_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "roc_audio/resampler_map.h"
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_core/thread.h"

namespace roc {
namespace pipeline {
Expand Down Expand Up @@ -223,6 +224,10 @@ core::nanoseconds_t SenderLoop::timestamp_imp() const {
return core::timestamp(core::ClockMonotonic);
}

uint64_t SenderLoop::tid_imp() const {
return core::Thread::get_tid();
}

bool SenderLoop::process_subframe_imp(audio::Frame& frame) {
sink_.write(frame);

Expand Down
1 change: 1 addition & 0 deletions src/internal_modules/roc_pipeline/sender_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class SenderLoop : public PipelineLoop, private sndio::ISink {

// Methods of PipelineLoop
virtual core::nanoseconds_t timestamp_imp() const;
virtual uint64_t tid_imp() const;
virtual bool process_subframe_imp(audio::Frame&);
virtual bool process_task_imp(PipelineTask&);

Expand Down
4 changes: 4 additions & 0 deletions src/tests/roc_pipeline/bench_pipeline_loop_contention.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ class NoopPipeline : public PipelineLoop,
return core::timestamp(core::ClockMonotonic);
}

virtual uint64_t tid_imp() const {
return 0;
}

virtual bool process_subframe_imp(audio::Frame&) {
return true;
}
Expand Down
4 changes: 4 additions & 0 deletions src/tests/roc_pipeline/bench_pipeline_loop_peak_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ class TestPipeline : public PipelineLoop,
return core::timestamp(core::ClockMonotonic);
}

virtual uint64_t tid_imp() const {
return 0;
}

virtual bool process_subframe_imp(audio::Frame&) {
stats_.frame_processing_started();
busy_wait(FrameProcessingDuration);
Expand Down
88 changes: 88 additions & 0 deletions src/tests/roc_pipeline/test_pipeline_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const core::nanoseconds_t StartTime = 10000000 * core::Second;

const core::nanoseconds_t FrameProcessingTime = 50 * core::Microsecond;

const uint64_t DefaultThread = 1, ProcessingThread = 2, BackgroundThread = 3;

const float Epsilon = 1e6f;

const audio::SampleSpec SampleSpecs(SampleRate, audio::ChanLayout_Surround, Chans);
Expand All @@ -58,6 +60,7 @@ class TestPipeline : public PipelineLoop, private IPipelineTaskScheduler {
, frame_allow_counter_(999999)
, task_allow_counter_(999999)
, time_(StartTime)
, tid_(DefaultThread)
, exp_frame_val_(0)
, exp_frame_sz_(0)
, exp_frame_flags_(0)
Expand All @@ -74,6 +77,11 @@ class TestPipeline : public PipelineLoop, private IPipelineTaskScheduler {
time_ = t;
}

void set_tid(uint64_t t) {
core::Mutex::Lock lock(mutex_);
tid_ = t;
}

void block_frames() {
core::Mutex::Lock lock(mutex_);
frame_allow_counter_ = 0;
Expand Down Expand Up @@ -195,6 +203,11 @@ class TestPipeline : public PipelineLoop, private IPipelineTaskScheduler {
return time_;
}

virtual uint64_t tid_imp() const {
core::Mutex::Lock lock(mutex_);
return tid_;
}

virtual bool process_subframe_imp(audio::Frame& frame) {
core::Mutex::Lock lock(mutex_);
bool first_iter = true;
Expand Down Expand Up @@ -267,6 +280,7 @@ class TestPipeline : public PipelineLoop, private IPipelineTaskScheduler {
int task_allow_counter_;

core::nanoseconds_t time_;
uint64_t tid_;

audio::sample_t exp_frame_val_;
size_t exp_frame_sz_;
Expand Down Expand Up @@ -487,12 +501,18 @@ TEST(pipeline_loop, schedule_when_can_process_tasks) {

pipeline.set_time(StartTime);

// next call is done from "processing thread"
pipeline.set_tid(ProcessingThread);

// process_subframes_and_tasks() should allow task processing
// until (StartTime + FrameSize * core::Microsecond - NoTaskProcessingGap / 2)
CHECK(pipeline.process_subframes_and_tasks(frame));

UNSIGNED_LONGS_EQUAL(1, pipeline.num_processed_frames());

// further calls are done from "background thread"
pipeline.set_tid(BackgroundThread);

TestCompleter completer(pipeline);
TestPipeline::Task task;

Expand All @@ -518,6 +538,50 @@ TEST(pipeline_loop, schedule_when_can_process_tasks) {
UNSIGNED_LONGS_EQUAL(0, pipeline.num_preemptions());
}

TEST(pipeline_loop, schedule_when_cant_process_tasks_but_from_processing_thread) {
TestPipeline pipeline(config);

audio::Frame frame1(samples, FrameSize);
fill_frame(frame1, 0.1f, 0, FrameSize);
pipeline.expect_frame(0.1f, FrameSize);

pipeline.set_time(StartTime);

// next call is done from "processing thread"
pipeline.set_tid(ProcessingThread);

// process_subframes_and_tasks() should allow task processing
// until (StartTime + FrameSize * core::Microsecond - NoTaskProcessingGap / 2)
CHECK(pipeline.process_subframes_and_tasks(frame1));

UNSIGNED_LONGS_EQUAL(1, pipeline.num_processed_frames());

TestCompleter completer(pipeline);
TestPipeline::Task task;

// deadline expired
pipeline.set_time(StartTime + FrameSize * core::Microsecond
- NoTaskProcessingGap / 2);

// schedule() should process task in-place even when deadline expired,
// because we're still on "processing thread"
pipeline.schedule(task, completer);

POINTERS_EQUAL(&task, completer.get_task());

UNSIGNED_LONGS_EQUAL(0, pipeline.num_pending_tasks());
UNSIGNED_LONGS_EQUAL(1, pipeline.num_processed_tasks());

UNSIGNED_LONGS_EQUAL(1, pipeline.num_tasks_processed_in_sched());
UNSIGNED_LONGS_EQUAL(0, pipeline.num_tasks_processed_in_frame());
UNSIGNED_LONGS_EQUAL(0, pipeline.num_tasks_processed_in_proc());

UNSIGNED_LONGS_EQUAL(0, pipeline.num_sched_calls());
UNSIGNED_LONGS_EQUAL(0, pipeline.num_sched_cancellations());

UNSIGNED_LONGS_EQUAL(0, pipeline.num_preemptions());
}

TEST(pipeline_loop, schedule_when_cant_process_tasks_then_process_frame) {
TestPipeline pipeline(config);

Expand All @@ -527,12 +591,18 @@ TEST(pipeline_loop, schedule_when_cant_process_tasks_then_process_frame) {

pipeline.set_time(StartTime);

// next call is done from "processing thread"
pipeline.set_tid(ProcessingThread);

// process_subframes_and_tasks() should allow task processing
// until (StartTime + FrameSize * core::Microsecond - NoTaskProcessingGap / 2)
CHECK(pipeline.process_subframes_and_tasks(frame1));

UNSIGNED_LONGS_EQUAL(1, pipeline.num_processed_frames());

// further calls are done from "background thread"
pipeline.set_tid(BackgroundThread);

TestCompleter completer(pipeline);
TestPipeline::Task task;

Expand Down Expand Up @@ -565,12 +635,18 @@ TEST(pipeline_loop, schedule_when_cant_process_tasks_then_process_frame) {

pipeline.set_time(StartTime + FrameSize * core::Microsecond);

// next call is done from "processing thread"
pipeline.set_tid(ProcessingThread);

// process_subframes_and_tasks() should call cancel_task_processing() and
// process the task from the queue
CHECK(pipeline.process_subframes_and_tasks(frame2));

UNSIGNED_LONGS_EQUAL(2, pipeline.num_processed_frames());

// further calls are done from "background thread"
pipeline.set_tid(BackgroundThread);

POINTERS_EQUAL(&task, completer.get_task());

UNSIGNED_LONGS_EQUAL(0, pipeline.num_pending_tasks());
Expand All @@ -595,12 +671,18 @@ TEST(pipeline_loop, schedule_when_cant_process_tasks_then_process_tasks) {

pipeline.set_time(StartTime);

// next call is done from "processing thread"
pipeline.set_tid(ProcessingThread);

// process_subframes_and_tasks() should allow task processing
// until (StartTime + FrameSize * core::Microsecond - NoTaskProcessingGap / 2)
CHECK(pipeline.process_subframes_and_tasks(frame1));

UNSIGNED_LONGS_EQUAL(1, pipeline.num_processed_frames());

// further calls are done from "background thread"
pipeline.set_tid(BackgroundThread);

TestCompleter completer(pipeline);
TestPipeline::Task task;

Expand Down Expand Up @@ -1141,9 +1223,15 @@ TEST(pipeline_loop, process_tasks_interframe_deadline) {
fill_frame(frame, 0.1f, 0, FrameSize);
pipeline.expect_frame(0.1f, FrameSize);

// next call is done from "processing thread"
pipeline.set_tid(ProcessingThread);

// process frame and set inter-frame task processing deadline
CHECK(pipeline.process_subframes_and_tasks(frame));

// further calls are done from "background thread"
pipeline.set_tid(BackgroundThread);

// next process_task_imp() call will block
pipeline.block_tasks();

Expand Down

0 comments on commit 2e8a0cb

Please sign in to comment.