Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stream recreation crashes and thread starvation caused by MAVLink-related thread leak #330

Merged
merged 27 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
205f1e5
src: stream: Post EOS from a new thread
joaoantoniocardoso Nov 22, 2023
e54073d
src: stream: gst: Change to WeakRef<Element>
joaoantoniocardoso Jan 5, 2024
89caadf
src: stream: pipeline: Use WeakRef<Element>
joaoantoniocardoso Jan 5, 2024
5788e2a
src: stream: gst: impl wait_for_element_state_async
joaoantoniocardoso Jan 5, 2024
fb5de01
src: stream: pipeline: Save dot file before removing the sink
joaoantoniocardoso Jan 5, 2024
5655fa0
src: stream: pipeline: clean format style
joaoantoniocardoso Jan 5, 2024
efef2f9
src: pipeline: Make PipelineRunner async
joaoantoniocardoso Jan 5, 2024
2dc5757
src: settings: Move to RWLock
joaoantoniocardoso Jan 5, 2024
ba40440
src: stream: Move Stream to RWLock
joaoantoniocardoso Jan 5, 2024
a4b7a81
src: stream: Add debug for when a stream is started
joaoantoniocardoso Jan 5, 2024
3086b74
src: stream: Make add_stream_and_start and start_default async
joaoantoniocardoso Jan 5, 2024
562d272
src: main, server: await for add_stream_and_start and start_default
joaoantoniocardoso Jan 5, 2024
eb59a5d
src: stream: sink: Remove duplicated sync
joaoantoniocardoso Jan 5, 2024
6783a8e
src: stream: sink: Fix typo
joaoantoniocardoso Jan 5, 2024
322f558
src: mavlink: Refactor MAVLink reconnection
joaoantoniocardoso Jan 5, 2024
61d1748
src: mavlink: rename structs
joaoantoniocardoso Jan 5, 2024
842d68d
src: mavlink: Move MavlinkCamera to async
joaoantoniocardoso Jan 5, 2024
3a7e253
src: mavlink: Implement Drop for MavinkCamera
joaoantoniocardoso Jan 5, 2024
fdba2ef
src: stream: update MAVLink struct names
joaoantoniocardoso Jan 5, 2024
ff0fdbb
src: stream: Move Manager to RWLock
joaoantoniocardoso Jan 5, 2024
cbc7df7
src: mavlink: Move Stream and watcher to async
joaoantoniocardoso Jan 5, 2024
7a5da8d
src: mavlink: Reimplement Drop for Stream
joaoantoniocardoso Jan 5, 2024
70b327b
src: mavlink: Rewrite match into a block
joaoantoniocardoso Jan 5, 2024
5836632
src: stream: Only acquire state Write lock when needed
joaoantoniocardoso Jan 5, 2024
87e80d0
src: stream: Finish MAVLink tasks before start the stream recreation …
joaoantoniocardoso Jan 5, 2024
74d5f62
src: stream: Only create the MavlinkCamera when MAVLink is not disabled
joaoantoniocardoso Jan 6, 2024
eb5a916
src: Change main runtime to explicitly Tokio
joaoantoniocardoso Jan 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod stream;
mod video;
mod video_stream;

#[actix_web::main]
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> Result<(), std::io::Error> {
// CLI should be started before logger to allow control over verbosity
cli::manager::init();
Expand All @@ -42,11 +42,13 @@ async fn main() -> Result<(), std::io::Error> {
helper::develop::start_check_tasks_on_webrtc_reconnects();
}

stream::webrtc::signalling_server::SignallingServer::default();
let _signalling_server = stream::webrtc::signalling_server::SignallingServer::default();

if let Err(error) = stream::manager::start_default() {
if let Err(error) = stream::manager::start_default().await {
error!("Failed to start default streams. Reason: {error:?}")
}

server::manager::run(&cli::manager::server_address()).await
server::manager::run(&cli::manager::server_address()).await?;

Ok(())
}
38 changes: 25 additions & 13 deletions src/mavlink/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Manager {

struct Connection {
address: String,
connection: Box<dyn MavConnection<MavMessage> + Sync + Send>,
connection: Option<Box<dyn MavConnection<MavMessage> + Sync + Send>>,
sender: broadcast::Sender<Message>,
}

Expand All @@ -36,14 +36,12 @@ impl Default for Manager {
let address =
settings::manager::mavlink_endpoint().expect("No configured mavlink endpoint");

let connection = Connection::connect(&address);

let (sender, _receiver) = broadcast::channel(100);

let this = Self {
connection: Arc::new(RwLock::new(Connection {
address,
connection,
connection: None,
sender,
})),
ids: Arc::new(RwLock::new(vec![])),
Expand Down Expand Up @@ -78,8 +76,15 @@ impl Manager {
loop {
std::thread::sleep(std::time::Duration::from_millis(10));

let Ok(inner_guard) = inner.read() else {
break; // Break to trigger reconnection
};
let Some(mavlink) = inner_guard.connection.as_deref() else {
break; // Break to trigger reconnection
};

// Receive from the Mavlink network
let (header, message) = match inner.read().unwrap().connection.recv() {
let (header, message) = match mavlink.recv() {
Ok(message) => message,
Err(error) => {
trace!("Failed receiving from mavlink: {error:?}");
Expand All @@ -99,9 +104,7 @@ impl Manager {
trace!("Message received: {header:?}, {message:?}");

// Send the received message to the cameras
if let Err(error) = inner
.read()
.unwrap()
if let Err(error) = inner_guard
.sender
.send(Message::Received((header, message)))
{
Expand All @@ -113,7 +116,8 @@ impl Manager {
// Reconnects
{
let mut inner = inner.write().unwrap();
inner.connection = Connection::connect(&inner.address);
let address = inner.address.clone();
inner.connection.replace(Connection::connect(&address));
}

std::thread::sleep(std::time::Duration::from_millis(500));
Expand Down Expand Up @@ -141,20 +145,28 @@ impl Manager {
_ => continue,
};

let Ok(inner_guard) = inner.read() else {
break; // Break to trigger reconnection
};
let Some(mavlink) = inner_guard.connection.as_deref() else {
break; // Break to trigger reconnection
};

// Send the response from the cameras to the Mavlink network
if let Err(error) = inner.read().unwrap().connection.send(&header, &message) {
if let Err(error) = mavlink.send(&header, &message) {
error!("Failed sending message to Mavlink Connection: {error:?}");

break; // Break to trigger reconnection
}

trace!("Message sent: {header:?}, {message:?}");
debug!("Message sent: {header:?}, {message:?}");
}

// Reconnects
{
let mut inner = inner.write().unwrap();
inner.connection = Connection::connect(&inner.address);
let address = inner.address.clone();
inner.connection.replace(Connection::connect(&address));
}

std::thread::sleep(std::time::Duration::from_millis(500));
Expand Down Expand Up @@ -213,7 +225,7 @@ impl Connection {
return connection;
}
Err(error) => {
error!("Failed to connect, trying again in one second. Reason: {error:#?}.");
error!("Failed to connect, trying again in one second. Reason: {error:?}.");
}
}
}
Expand Down
140 changes: 82 additions & 58 deletions src/mavlink/mavlink_camera.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,58 +16,61 @@ use super::manager::Message;
use super::utils::*;

#[derive(Debug)]
pub struct MavlinkCameraHandle {
inner: Arc<MavlinkCamera>,
_runtime: tokio::runtime::Runtime,
heartbeat_handle: tokio::task::JoinHandle<()>,
messages_handle: tokio::task::JoinHandle<()>,
pub struct MavlinkCamera {
inner: Arc<MavlinkCameraInner>,
heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
messages_handle: Option<tokio::task::JoinHandle<()>>,
}

#[derive(Debug, Clone)]
struct MavlinkCamera {
struct MavlinkCameraInner {
component: MavlinkCameraComponent,
mavlink_stream_type: mavlink::common::VideoStreamType,
video_stream_uri: Url,
video_stream_name: String,
video_source_type: VideoSourceType,
}

impl MavlinkCameraHandle {
impl MavlinkCamera {
#[instrument(level = "debug")]
pub fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result<Self> {
let inner = Arc::new(MavlinkCamera::try_new(video_and_stream_information)?);
pub async fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result<Self> {
let inner = Arc::new(MavlinkCameraInner::try_new(video_and_stream_information)?);

let sender = crate::mavlink::manager::Manager::get_sender();

let runtime = tokio::runtime::Builder::new_multi_thread()
.on_thread_start(|| debug!("Thread started"))
.on_thread_stop(|| debug!("Thread stopped"))
.thread_name_fn(|| {
static ATOMIC_ID: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
format!("MavlinkCamera-{id}")
})
.worker_threads(2)
.enable_all()
.build()
.expect("Failed building a new tokio runtime");

let heartbeat_handle =
runtime.spawn(MavlinkCamera::heartbeat_loop(inner.clone(), sender.clone()));
let messages_handle =
runtime.spawn(MavlinkCamera::messages_loop(inner.clone(), sender.clone()));
debug!("Starting MAVLink HeartBeat task...");

let inner_cloned = inner.clone();
let sender_cloned = sender.clone();
let heartbeat_handle = Some(tokio::spawn(async move {
debug!("MAVLink HeartBeat task started!");
match MavlinkCameraInner::heartbeat_loop(inner_cloned, sender_cloned).await {
Ok(_) => debug!("MAVLink HeartBeat task eneded with no errors"),
Err(error) => warn!("MAVLink HeartBeat task ended with error: {error:#?}"),
};
}));

debug!("Starting MAVLink Message task...");

let inner_cloned = inner.clone();
let sender_cloned = sender.clone();
let messages_handle = Some(tokio::spawn(async move {
debug!("MAVLink Message task started!");
match MavlinkCameraInner::messages_loop(inner_cloned, sender_cloned).await {
Ok(_) => debug!("MAVLink Message task eneded with no errors"),
Err(error) => warn!("MAVLink Message task ended with error: {error:#?}"),
};
}));

Ok(Self {
inner,
_runtime: runtime,
heartbeat_handle,
messages_handle,
})
}
}

impl MavlinkCamera {
impl MavlinkCameraInner {
#[instrument(level = "debug")]
pub fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result<Self> {
let video_stream_uri = video_and_stream_information
Expand Down Expand Up @@ -128,7 +131,10 @@ impl MavlinkCamera {

#[instrument(level = "trace", skip(sender))]
#[instrument(level = "debug", skip_all, fields(component_id = camera.component.component_id))]
pub async fn heartbeat_loop(camera: Arc<MavlinkCamera>, sender: broadcast::Sender<Message>) {
pub async fn heartbeat_loop(
camera: Arc<MavlinkCameraInner>,
sender: broadcast::Sender<Message>,
) -> Result<()> {
let component_id = camera.component.component_id;
let system_id = camera.component.system_id;

Expand All @@ -154,42 +160,33 @@ impl MavlinkCamera {
error!("Failed to send message: {error:?}");
continue;
}

debug!("Heartbeat sent");
}
}

#[instrument(level = "trace", skip(sender))]
#[instrument(level = "debug", skip_all, fields(component_id = camera.component.component_id))]
pub async fn messages_loop(camera: Arc<MavlinkCamera>, sender: broadcast::Sender<Message>) {
pub async fn messages_loop(
camera: Arc<MavlinkCameraInner>,
sender: broadcast::Sender<Message>,
) -> Result<()> {
let mut receiver = sender.subscribe();
use crate::mavlink::mavlink_camera::Message::Received;

loop {
let (header, message) = match receiver.recv().await {
Ok(Message::Received(message)) => message,
Err(broadcast::error::RecvError::Closed) => {
unreachable!(
"Closed channel: This should never happen, this channel is static!"
);
}
Ok(Message::ToBeSent(_)) | Err(broadcast::error::RecvError::Lagged(_)) => continue,
};
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

trace!("Message received: {header:?}, {message:?}");
if let Ok(Received((header, message))) = receiver.recv().await {
trace!("Message received: {header:?}, {message:?}");

tokio::spawn(Self::handle_message(
camera.clone(),
sender.clone(),
header,
message,
));
Self::handle_message(camera.clone(), sender.clone(), header, message).await;
}
}
}

#[instrument(level = "trace", skip(sender))]
#[instrument(level = "debug", skip(sender, camera), fields(component_id = camera.component.component_id))]
async fn handle_message(
camera: Arc<MavlinkCamera>,
camera: Arc<MavlinkCameraInner>,
sender: broadcast::Sender<Message>,
header: MavHeader,
message: MavMessage,
Expand Down Expand Up @@ -225,7 +222,7 @@ impl MavlinkCamera {
#[instrument(level = "trace", skip(sender))]
#[instrument(level = "debug", skip(sender, camera), fields(component_id = camera.component.component_id))]
async fn handle_command_long(
camera: &MavlinkCamera,
camera: &MavlinkCameraInner,
sender: broadcast::Sender<Message>,
their_header: &MavHeader,
data: &mavlink::common::COMMAND_LONG_DATA,
Expand Down Expand Up @@ -449,7 +446,7 @@ impl MavlinkCamera {
#[instrument(level = "trace", skip(sender))]
#[instrument(level = "debug", skip(sender, camera), fields(component_id = camera.component.component_id))]
async fn handle_param_ext_set(
camera: &MavlinkCamera,
camera: &MavlinkCameraInner,
sender: broadcast::Sender<Message>,
header: &MavHeader,
data: &mavlink::common::PARAM_EXT_SET_DATA,
Expand Down Expand Up @@ -510,7 +507,7 @@ impl MavlinkCamera {
#[instrument(level = "trace", skip(sender))]
#[instrument(level = "debug", skip(sender, camera), fields(component_id = camera.component.component_id))]
async fn handle_param_ext_request_read(
camera: &MavlinkCamera,
camera: &MavlinkCameraInner,
sender: broadcast::Sender<Message>,
header: &MavHeader,
data: &mavlink::common::PARAM_EXT_REQUEST_READ_DATA,
Expand Down Expand Up @@ -561,7 +558,7 @@ impl MavlinkCamera {
#[instrument(level = "trace", skip(sender))]
#[instrument(level = "debug", skip(sender, camera), fields(component_id = camera.component.component_id))]
async fn handle_param_ext_request_list(
camera: &MavlinkCamera,
camera: &MavlinkCameraInner,
sender: broadcast::Sender<Message>,
header: &MavHeader,
data: &mavlink::common::PARAM_EXT_REQUEST_LIST_DATA,
Expand Down Expand Up @@ -611,10 +608,37 @@ impl MavlinkCamera {
}
}

impl Drop for MavlinkCameraHandle {
impl Drop for MavlinkCamera {
#[instrument(level = "debug", skip(self))]
fn drop(&mut self) {
self.heartbeat_handle.abort();
self.messages_handle.abort();
super::manager::Manager::drop_id(self.inner.component.component_id)
debug!("Dropping MavlinkCameraHandle...");

if let Some(handle) = self.heartbeat_handle.take() {
if !handle.is_finished() {
handle.abort();
tokio::spawn(async move {
let _ = handle.await;
debug!("Mavlink Heartbeat task aborted");
});
} else {
debug!("Mavlink Heartbeat task nicely finished!");
}
}

if let Some(handle) = self.messages_handle.take() {
if !handle.is_finished() {
handle.abort();
tokio::spawn(async move {
let _ = handle.await;
debug!("Mavlink Message task aborted");
});
} else {
debug!("Mavlink Message task nicely finished!");
}
}

super::manager::Manager::drop_id(self.inner.component.component_id);

debug!("MavlinkCameraHandle Dropped!");
}
}
8 changes: 5 additions & 3 deletions src/server/pages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ pub fn v4l_post(json: web::Json<V4lControl>) -> HttpResponse {
pub async fn reset_settings(query: web::Query<ResetSettings>) -> HttpResponse {
if query.all.unwrap_or_default() {
settings::manager::reset();
if let Err(error) = stream_manager::start_default() {
if let Err(error) = stream_manager::start_default().await {
return HttpResponse::InternalServerError()
.content_type("text/plain")
.body(format!("{error:#?}"));
Expand Down Expand Up @@ -269,7 +269,7 @@ pub async fn streams() -> HttpResponse {

#[api_v2_operation]
/// Create a video stream
pub fn streams_post(json: web::Json<PostStream>) -> HttpResponse {
pub async fn streams_post(json: web::Json<PostStream>) -> HttpResponse {
let json = json.into_inner();

let video_source = match video_source::get_video_source(&json.source) {
Expand All @@ -285,7 +285,9 @@ pub fn streams_post(json: web::Json<PostStream>) -> HttpResponse {
name: json.name,
stream_information: json.stream_information,
video_source,
}) {
})
.await
{
return HttpResponse::NotAcceptable()
.content_type("text/plain")
.body(format!("{error:#?}"));
Expand Down
Loading
Loading