From 205f1e596a018c2701814a6a11444bda40ae6c30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Wed, 22 Nov 2023 13:55:54 -0300 Subject: [PATCH 01/27] src: stream: Post EOS from a new thread --- src/stream/mod.rs | 10 +++++++--- src/stream/sink/image_sink.rs | 10 +++++++--- src/stream/sink/udp_sink.rs | 10 +++++++--- src/stream/sink/webrtc_sink.rs | 10 +++++++--- 4 files changed, 28 insertions(+), 12 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 0e5dc85a..6225d0e0 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -287,9 +287,13 @@ impl Drop for StreamState { let pipeline_state = self.pipeline.inner_state_as_ref(); let pipeline = &pipeline_state.pipeline; - if let Err(error) = pipeline.post_message(::gst::message::Eos::new()) { - error!("Failed posting Eos message into Pipeline bus. Reason: {error:?}"); - } + let pipeline_weak = pipeline.downgrade(); + std::thread::spawn(move || { + let pipeline = pipeline_weak.upgrade().unwrap(); + if let Err(error) = pipeline.post_message(::gst::message::Eos::new()) { + error!("Failed posting Eos message into Pipeline bus. Reason: {error:?}"); + } + }); if let Err(error) = pipeline.set_state(::gst::State::Null) { error!("Failed setting Pipeline state to Null. Reason: {error:?}"); diff --git a/src/stream/sink/image_sink.rs b/src/stream/sink/image_sink.rs index 9c0bf4bd..99e3ede8 100644 --- a/src/stream/sink/image_sink.rs +++ b/src/stream/sink/image_sink.rs @@ -284,9 +284,13 @@ impl SinkInterface for ImageSink { #[instrument(level = "debug", skip(self))] fn eos(&self) { - if let Err(error) = self.pipeline.post_message(gst::message::Eos::new()) { - error!("Failed posting Eos message into Sink bus. Reason: {error:?}"); - } + let pipeline_weak = self.pipeline.downgrade(); + std::thread::spawn(move || { + let pipeline = pipeline_weak.upgrade().unwrap(); + if let Err(error) = pipeline.post_message(gst::message::Eos::new()) { + error!("Failed posting Eos message into Sink bus. Reason: {error:?}"); + } + }); } } diff --git a/src/stream/sink/udp_sink.rs b/src/stream/sink/udp_sink.rs index cc09ee83..456ed55b 100644 --- a/src/stream/sink/udp_sink.rs +++ b/src/stream/sink/udp_sink.rs @@ -264,9 +264,13 @@ impl SinkInterface for UdpSink { #[instrument(level = "debug", skip(self))] fn eos(&self) { - if let Err(error) = self.pipeline.post_message(gst::message::Eos::new()) { - error!("Failed posting Eos message into Sink bus. Reason: {error:?}"); - } + let pipeline_weak = self.pipeline.downgrade(); + std::thread::spawn(move || { + let pipeline = pipeline_weak.upgrade().unwrap(); + if let Err(error) = pipeline.post_message(gst::message::Eos::new()) { + error!("Failed posting Eos message into Sink bus. Reason: {error:?}"); + } + }); } } diff --git a/src/stream/sink/webrtc_sink.rs b/src/stream/sink/webrtc_sink.rs index ff21fbfe..85523db5 100644 --- a/src/stream/sink/webrtc_sink.rs +++ b/src/stream/sink/webrtc_sink.rs @@ -293,9 +293,13 @@ impl SinkInterface for WebRTCSink { #[instrument(level = "debug", skip(self))] fn eos(&self) { - if let Err(error) = self.webrtcbin.post_message(gst::message::Eos::new()) { - error!("Failed posting Eos message into Sink bus. Reason: {error:?}"); - } + let webrtcbin_weak = self.webrtcbin.downgrade(); + std::thread::spawn(move || { + let webrtcbin = webrtcbin_weak.upgrade().unwrap(); + if let Err(error) = webrtcbin.post_message(gst::message::Eos::new()) { + error!("Failed posting Eos message into Sink bus. Reason: {error:?}"); + } + }); } } From e54073d0659d6efc3e1700dbaf271cbf4dad4f6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:17:41 -0300 Subject: [PATCH 02/27] src: stream: gst: Change to WeakRef --- src/stream/gst/utils.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/stream/gst/utils.rs b/src/stream/gst/utils.rs index 0c7d6208..c1ed54ae 100644 --- a/src/stream/gst/utils.rs +++ b/src/stream/gst/utils.rs @@ -41,15 +41,24 @@ pub fn set_plugin_rank(plugin_name: &str, rank: gst::Rank) -> Result<()> { } pub fn wait_for_element_state( - element: &gst::Element, + element_weak: gst::glib::WeakRef, state: gst::State, polling_time_millis: u64, timeout_time_secs: u64, ) -> Result<()> { let mut trials = 1000 * timeout_time_secs / polling_time_millis; - while element.current_state() != state { + loop { std::thread::sleep(std::time::Duration::from_millis(polling_time_millis)); + + let Some(element) = element_weak.upgrade() else { + return Err(anyhow!("Cannot access Element")); + }; + + if element.current_state() == state { + break; + } + trials -= 1; if trials == 0 { return Err(anyhow!( From 89caadfc75ccc12180c9933126c0a0d1455b177e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:19:35 -0300 Subject: [PATCH 03/27] src: stream: pipeline: Use WeakRef --- src/stream/mod.rs | 9 +++------ src/stream/pipeline/mod.rs | 9 +++------ 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 6225d0e0..0a960787 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -298,12 +298,9 @@ impl Drop for StreamState { if let Err(error) = pipeline.set_state(::gst::State::Null) { error!("Failed setting Pipeline state to Null. Reason: {error:?}"); } - if let Err(error) = wait_for_element_state( - pipeline.upcast_ref::<::gst::Element>(), - ::gst::State::Null, - 100, - 10, - ) { + if let Err(error) = + wait_for_element_state(pipeline.downgrade(), ::gst::State::Null, 100, 10) + { let _ = pipeline.set_state(::gst::State::Null); error!("Failed setting Pipeline state to Null. Reason: {error:?}"); } diff --git a/src/stream/pipeline/mod.rs b/src/stream/pipeline/mod.rs index 260dbd77..736b8c59 100644 --- a/src/stream/pipeline/mod.rs +++ b/src/stream/pipeline/mod.rs @@ -165,12 +165,9 @@ impl PipelineState { } } - if let Err(error) = wait_for_element_state( - pipeline.upcast_ref::(), - gst::State::Playing, - 100, - 2, - ) { + if let Err(error) = + wait_for_element_state(pipeline.downgrade(), gst::State::Playing, 100, 2) + { let _ = pipeline.set_state(gst::State::Null); sink.unlink(pipeline, pipeline_id)?; return Err(anyhow!( From 5788e2a7dc798798e7cc8183e8373f59d1a4fcc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:19:01 -0300 Subject: [PATCH 04/27] src: stream: gst: impl wait_for_element_state_async --- src/stream/gst/utils.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/stream/gst/utils.rs b/src/stream/gst/utils.rs index c1ed54ae..7c8a62a0 100644 --- a/src/stream/gst/utils.rs +++ b/src/stream/gst/utils.rs @@ -69,3 +69,33 @@ pub fn wait_for_element_state( Ok(()) } + +pub async fn wait_for_element_state_async( + element_weak: gst::glib::WeakRef, + state: gst::State, + polling_time_millis: u64, + timeout_time_secs: u64, +) -> Result<()> { + let mut trials = 1000 * timeout_time_secs / polling_time_millis; + + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(polling_time_millis)).await; + + let Some(element) = element_weak.upgrade() else { + return Err(anyhow!("Cannot access Element")); + }; + + if element.current_state() == state { + break; + } + + trials -= 1; + if trials == 0 { + return Err(anyhow!( + "set state timed-out ({timeout_time_secs:?} seconds)" + )); + } + } + + Ok(()) +} From fb5de01f8519dfa5a936cac1c460996e47feb443 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:21:18 -0300 Subject: [PATCH 05/27] src: stream: pipeline: Save dot file before removing the sink --- src/stream/pipeline/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/stream/pipeline/mod.rs b/src/stream/pipeline/mod.rs index 736b8c59..f7b6f709 100644 --- a/src/stream/pipeline/mod.rs +++ b/src/stream/pipeline/mod.rs @@ -213,9 +213,6 @@ impl PipelineState { #[instrument(level = "info", skip(self))] pub fn remove_sink(&mut self, sink_id: &uuid::Uuid) -> Result<()> { let pipeline_id = &self.pipeline_id; - let sink = self.sinks.remove(sink_id).context(format!( - "Failed to remove sink {sink_id} from Sinks of the Pipeline {pipeline_id}" - ))?; let pipeline = &self.pipeline; pipeline.debug_to_dot_file_with_ts( @@ -223,6 +220,10 @@ impl PipelineState { format!("pipeline-{pipeline_id}-sink-{sink_id}-before-removing"), ); + let sink = self.sinks.remove(sink_id).context(format!( + "Failed to remove sink {sink_id} from Sinks of the Pipeline {pipeline_id}" + ))?; + // Terminate the Sink sink.eos(); From 5655fa0659a765f8ed01afa5d7ef014eebea8bde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:22:13 -0300 Subject: [PATCH 06/27] src: stream: pipeline: clean format style --- src/stream/pipeline/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/pipeline/mod.rs b/src/stream/pipeline/mod.rs index f7b6f709..56de85b6 100644 --- a/src/stream/pipeline/mod.rs +++ b/src/stream/pipeline/mod.rs @@ -198,7 +198,7 @@ impl PipelineState { // added. if !matches!(&sink, Sink::Image(..)) { if let Err(error) = pipeline.sync_children_states() { - error!("Failed to syncronize children states. Reason: {:?}", error); + error!("Failed to syncronize children states. Reason: {error:?}"); } } From efef2f906e3389b1fd328caa689cb3cf6f900612 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:20:18 -0300 Subject: [PATCH 07/27] src: pipeline: Make PipelineRunner async --- src/stream/pipeline/runner.rs | 342 ++++++++++++++++++++-------------- 1 file changed, 206 insertions(+), 136 deletions(-) diff --git a/src/stream/pipeline/runner.rs b/src/stream/pipeline/runner.rs index adc0f0eb..1eb5d836 100644 --- a/src/stream/pipeline/runner.rs +++ b/src/stream/pipeline/runner.rs @@ -1,19 +1,34 @@ -use std::sync::{Arc, Mutex}; - -use gst::prelude::*; - use anyhow::{anyhow, Context, Result}; - +use gst::prelude::*; use tracing::*; -use crate::stream::gst::utils::wait_for_element_state; +use crate::stream::gst::utils::wait_for_element_state_async; #[derive(Debug)] -#[allow(dead_code)] pub struct PipelineRunner { - pipeline_weak: gst::glib::WeakRef, - start: Arc>, - watcher_thread_handle: std::thread::JoinHandle<()>, + start: tokio::sync::mpsc::Sender<()>, + handle: Option>, +} + +impl Drop for PipelineRunner { + #[instrument(level = "debug", skip(self))] + fn drop(&mut self) { + debug!("Dropping PipelineRunner..."); + + if let Some(handle) = self.handle.take() { + if !handle.is_finished() { + handle.abort(); + tokio::spawn(async move { + let _ = handle.await; + debug!("PipelineRunner task aborted"); + }); + } else { + debug!("PipelineRunner task nicely finished!"); + } + } + + debug!("PipelineRunner Dropped!"); + } } impl PipelineRunner { @@ -26,46 +41,52 @@ impl PipelineRunner { let pipeline_weak = pipeline.downgrade(); let pipeline_id = *pipeline_id; - let start = Arc::new(Mutex::new(false)); + let (start_tx, start_rx) = tokio::sync::mpsc::channel(1); + + debug!("Starting PipelineRunner task..."); Ok(Self { - pipeline_weak: pipeline_weak.clone(), - start: start.clone(), - watcher_thread_handle: std::thread::Builder::new() - .name(format!("PipelineRunner-{pipeline_id}")) - .spawn(move || { - if let Err(error) = - PipelineRunner::runner(pipeline_weak, &pipeline_id, start, allow_block) - { - error!("PipelineWatcher ended with error: {error}"); - } else { - info!("PipelineWatcher ended normally."); - } - }) - .context(format!( - "Failed when spawing PipelineRunner thread for Pipeline {pipeline_id:#?}" - ))?, + start: start_tx, + handle: Some(tokio::spawn(async move { + debug!("PipelineRunner task started!"); + match Self::runner(pipeline_weak, pipeline_id, start_rx, allow_block).await { + Ok(_) => debug!("PipelineRunner task eneded with no errors"), + Err(error) => warn!("PipelineRunner task ended with error: {error:#?}"), + }; + })), }) } #[instrument(level = "debug", skip(self))] pub fn start(&self) -> Result<()> { - *self.start.lock().unwrap() = true; + let start = self.start.clone(); + tokio::spawn(async move { + debug!("Pipeline Start task started!"); + if let Err(error) = start.send(()).await { + error!("Failed to send start command: {error:#?}"); + } + debug!("Pipeline Start task ended"); + }); + Ok(()) } #[instrument(level = "debug", skip(self))] pub fn is_running(&self) -> bool { - !self.watcher_thread_handle.is_finished() + self.handle + .as_ref() + .map(|handle| !handle.is_finished()) + .unwrap_or(false) } - #[instrument(level = "debug")] - fn runner( + #[instrument(level = "debug", skip(pipeline_weak, pipeline_id, start))] + async fn runner( pipeline_weak: gst::glib::WeakRef, - pipeline_id: &uuid::Uuid, - start: Arc>, + pipeline_id: uuid::Uuid, + mut start: tokio::sync::mpsc::Receiver<()>, allow_block: bool, ) -> Result<()> { + let (finsh_tx, mut finish) = tokio::sync::mpsc::channel(1); let pipeline = pipeline_weak .upgrade() .context("Unable to access the Pipeline from its weak reference")?; @@ -74,6 +95,131 @@ impl PipelineRunner { .bus() .context("Unable to access the pipeline bus")?; + // Send our bus messages via a futures channel to be handled asynchronously + let pipeline_weak_cloned = pipeline_weak.clone(); + let (bus_tx, mut bus_rx) = tokio::sync::mpsc::unbounded_channel::(); + let bus_tx = std::sync::Mutex::new(bus_tx); + bus.set_sync_handler(move |_, msg| { + let _ = bus_tx.lock().unwrap().send(msg.to_owned()); + gst::BusSyncReply::Drop + }); + + /* Iterate messages on the bus until an error or EOS occurs, + * although in this example the only error we'll hopefully + * get is if the user closes the output window */ + debug!("Starting BusWatcher task..."); + tokio::spawn(async move { + debug!("BusWatcher task started!"); + while let Some(message) = bus_rx.recv().await { + use gst::MessageView; + + let Some(pipeline) = pipeline_weak_cloned.upgrade() else { + break; + }; + + match message.view() { + MessageView::Eos(eos) => { + pipeline.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::all(), + format!("pipeline-{pipeline_id}-eos"), + ); + let msg = format!("Received EndOfStream: {eos:?}"); + trace!(msg); + let _ = finsh_tx.send(msg).await; + break; + } + MessageView::Error(error) => { + let msg = format!( + "Error from {:?}: {} ({:?})", + error.src().map(|s| s.path_string()), + error.error(), + error.debug() + ); + pipeline.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::all(), + format!("pipeline-{pipeline_id}-error"), + ); + trace!(msg); + let _ = finsh_tx.send(msg).await; + break; + } + MessageView::StateChanged(state) => { + pipeline.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::all(), + format!( + "pipeline-{pipeline_id}-{:?}-to-{:?}", + state.old(), + state.current() + ), + ); + + trace!( + "State changed from {:?}: {:?} to {:?} ({:?})", + state.src().map(|s| s.path_string()), + state.old(), + state.current(), + state.pending() + ); + } + MessageView::Latency(latency) => { + let current_latency = pipeline.latency(); + trace!("Latency message: {latency:?}. Current latency: {latency:?}",); + if let Err(error) = pipeline.recalculate_latency() { + warn!("Failed to recalculate latency: {error:?}"); + } + let new_latency = pipeline.latency(); + if current_latency != new_latency { + debug!("New latency: {new_latency:?}"); + } + } + other_message => trace!("{other_message:#?}"), + } + } + + debug!("BusWatcher task ended!"); + }); + + // Wait until start receive the signal + debug!("PipelineRunner waiting for start command..."); + loop { + tokio::select! { + reason = finish.recv() => { + return Err(anyhow!("{reason:?}")); + } + _ = start.recv() => { + debug!("PipelineRunner received start command"); + + let pipeline = pipeline_weak + .upgrade() + .context("Unable to access the Pipeline from its weak reference")?; + + if pipeline.current_state() != gst::State::Playing { + if let Err(error) = pipeline.set_state(gst::State::Playing) { + error!( + "Failed setting Pipeline {} to Playing state. Reason: {:?}", + pipeline_id, error + ); + continue; + } + } + + if let Err(error) = wait_for_element_state_async( + pipeline_weak.clone(), + gst::State::Playing, + 100, + 5, + ).await { + + return Err(anyhow!("{error:?}")); + } + + break; + } + }; + } + + debug!("PipelineRunner started!"); + // Check if we need to break external loop. // Some cameras have a duplicated timestamp when starting. // to avoid restarting the camera once and once again, @@ -82,120 +228,44 @@ impl PipelineRunner { let mut lost_timestamps: usize = 0; let max_lost_timestamps: usize = 30; - 'outer: loop { - std::thread::sleep(std::time::Duration::from_millis(100)); + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - // Wait the signal to start - if *start.lock().unwrap() && pipeline.current_state() != gst::State::Playing { - if let Err(error) = pipeline.set_state(gst::State::Playing) { - return Err(anyhow!( - "Failed setting Pipeline {pipeline_id} to Playing state. Reason: {error:?}" - )); - } - if let Err(error) = wait_for_element_state( - pipeline.upcast_ref::(), - gst::State::Playing, - 100, - 5, - ) { - return Err(anyhow!( - "Failed setting Pipeline {pipeline_id} to Playing state. Reason: {error:?}" - )); - } + if let Some(reason) = finish.recv().await { + return Err(anyhow!("{reason:?}")); } - 'inner: loop { + if !allow_block { // Restart pipeline if pipeline position do not change, // occur if usb connection is lost and gst do not detect it - if !allow_block { - if let Some(position) = pipeline.query_position::() { - previous_position = match previous_position { - Some(current_previous_position) => { - if current_previous_position.nseconds() != 0 - && current_previous_position == position - { - lost_timestamps += 1; - } else if lost_timestamps > 0 { - // We are back in track, erase lost timestamps - warn!("Position normalized, but didn't changed for {lost_timestamps} timestamps"); - lost_timestamps = 0; - } - if lost_timestamps == 1 { - warn!("Position did not change for {lost_timestamps}, silently tracking until {max_lost_timestamps}, then the stream will be recreated"); - } else if lost_timestamps > max_lost_timestamps { - return Err(anyhow!("Pipeline lost too many timestamps (max. was {max_lost_timestamps})")); - } - - Some(position) - } - None => Some(position), - } - } - } + let pipeline = pipeline_weak + .upgrade() + .context("Unable to access the Pipeline from its weak reference")?; - /* Iterate messages on the bus until an error or EOS occurs, - * although in this example the only error we'll hopefully - * get is if the user closes the output window */ - while let Some(msg) = bus.timed_pop(gst::ClockTime::from_mseconds(100)) { - use gst::MessageView; - - match msg.view() { - MessageView::Eos(eos) => { - debug!("Received EndOfStream: {eos:?}"); - pipeline.debug_to_dot_file_with_ts( - gst::DebugGraphDetails::all(), - format!("pipeline-{pipeline_id}-eos"), - ); - break 'outer; - } - MessageView::Error(error) => { - error!( - "Error from {:?}: {} ({:?})", - error.src().map(|s| s.path_string()), - error.error(), - error.debug() - ); - pipeline.debug_to_dot_file_with_ts( - gst::DebugGraphDetails::all(), - format!("pipeline-{pipeline_id}-error"), - ); - break 'inner; - } - MessageView::StateChanged(state) => { - pipeline.debug_to_dot_file_with_ts( - gst::DebugGraphDetails::all(), - format!( - "pipeline-{pipeline_id}-{:?}-to-{:?}", - state.old(), - state.current() - ), - ); - - trace!( - "State changed from {:?}: {:?} to {:?} ({:?})", - state.src().map(|s| s.path_string()), - state.old(), - state.current(), - state.pending() - ); - } - MessageView::Latency(latency) => { - let current_latency = pipeline.latency(); - trace!("Latency message: {latency:?}. Current latency: {latency:?}",); - if let Err(error) = pipeline.recalculate_latency() { - warn!("Failed to recalculate latency: {error:?}"); + if let Some(position) = pipeline.query_position::() { + previous_position = match previous_position { + Some(current_previous_position) => { + if current_previous_position.nseconds() != 0 + && current_previous_position == position + { + lost_timestamps += 1; + } else if lost_timestamps > 0 { + // We are back in track, erase lost timestamps + warn!("Position normalized, but didn't changed for {lost_timestamps} timestamps"); + lost_timestamps = 0; } - let new_latency = pipeline.latency(); - if current_latency != new_latency { - debug!("New latency: {new_latency:?}"); + if lost_timestamps == 1 { + warn!("Position did not change for {lost_timestamps}, silently tracking until {max_lost_timestamps}, then the stream will be recreated"); + } else if lost_timestamps > max_lost_timestamps { + return Err(anyhow!("Pipeline lost too many timestamps (max. was {max_lost_timestamps})")); } + + Some(position) } - other_message => trace!("{other_message:#?}"), - }; + None => Some(position), + } } } } - - Ok(()) } } From 2dc5757581c450bccc8fd18720bac732701a30e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:23:04 -0300 Subject: [PATCH 08/27] src: settings: Move to RWLock --- src/settings/manager.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/settings/manager.rs b/src/settings/manager.rs index 5310e5e9..93b34a9f 100644 --- a/src/settings/manager.rs +++ b/src/settings/manager.rs @@ -3,7 +3,7 @@ use directories::ProjectDirs; use serde::{Deserialize, Serialize}; use std::io::prelude::*; use std::path::Path; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use tracing::*; use crate::cli; @@ -34,7 +34,7 @@ struct Manager { } lazy_static! { - static ref MANAGER: Arc> = Arc::new(Mutex::new(Manager { content: None })); + static ref MANAGER: Arc> = Arc::new(RwLock::new(Manager { content: None })); } impl Default for SettingsStruct { @@ -101,7 +101,7 @@ impl Manager { // Init settings manager with the desired settings file, // will be created if does not exist pub fn init(file_name: Option<&str>) { - let mut manager = MANAGER.lock().unwrap(); + let mut manager = MANAGER.write().unwrap(); let file_name = file_name.unwrap_or("settings.json"); manager.content = Some(Manager::with(file_name)); } @@ -153,7 +153,7 @@ fn save_settings_to_file(file_name: &str, content: &SettingsStruct) -> Result<() // Save the latest state of the settings pub fn save() { - let manager = MANAGER.lock().unwrap(); + let manager = MANAGER.read().unwrap(); //TODO: deal com save problems here if let Some(content) = &manager.content { if let Err(error) = save_settings_to_file(&content.file_name, &content.config) { @@ -171,12 +171,12 @@ pub fn save() { #[allow(dead_code)] pub fn header() -> HeaderSettingsFile { - let manager = MANAGER.lock().unwrap(); + let manager = MANAGER.read().unwrap(); manager.content.as_ref().unwrap().config.header.clone() } pub fn mavlink_endpoint() -> Option { - let manager = MANAGER.lock().unwrap(); + let manager = MANAGER.read().unwrap(); return manager .content .as_ref() @@ -189,7 +189,7 @@ pub fn mavlink_endpoint() -> Option { pub fn set_mavlink_endpoint(endpoint: &str) { //TODO: make content more easy to access { - let mut manager = MANAGER.lock().unwrap(); + let mut manager = MANAGER.write().unwrap(); let mut content = manager.content.as_mut(); content.as_mut().unwrap().config.mavlink_endpoint = Some(endpoint.into()); } @@ -197,15 +197,15 @@ pub fn set_mavlink_endpoint(endpoint: &str) { } pub fn streams() -> Vec { - let manager = MANAGER.lock().unwrap(); + let manager = MANAGER.read().unwrap(); let content = manager.content.as_ref(); content.unwrap().config.streams.clone() } pub fn set_streams(streams: &[VideoAndStreamInformation]) { - // Take care of scope mutex + // Take care of scope RwLock { - let mut manager = MANAGER.lock().unwrap(); + let mut manager = MANAGER.write().unwrap(); let mut content = manager.content.as_mut(); content.as_mut().unwrap().config.streams.clear(); content @@ -219,9 +219,9 @@ pub fn set_streams(streams: &[VideoAndStreamInformation]) { } pub fn reset() { - // Take care of scope mutex + // Take care of scope RwLock { - let mut manager = MANAGER.lock().unwrap(); + let mut manager = MANAGER.write().unwrap(); manager.content.as_mut().unwrap().config = SettingsStruct::default(); } save(); @@ -254,7 +254,7 @@ mod tests { #[test] fn test_no_aboslute_path() { init(None); - let manager = MANAGER.lock().unwrap(); + let manager = MANAGER.read().unwrap(); let file_name = &manager.content.as_ref().unwrap().file_name; assert!( std::path::Path::new(&file_name).exists(), From ba404409d8f57257d34909905a2c40061dc513d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:25:52 -0300 Subject: [PATCH 09/27] src: stream: Move Stream to RWLock --- src/stream/manager.rs | 49 +++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/src/stream/manager.rs b/src/stream/manager.rs index c9dffe2e..4ffe07f7 100644 --- a/src/stream/manager.rs +++ b/src/stream/manager.rs @@ -1,6 +1,6 @@ use std::{ collections::HashMap, - sync::{Arc, Mutex}, + sync::{Arc, RwLock}, }; use crate::{ @@ -39,7 +39,7 @@ pub struct Manager { } lazy_static! { - static ref MANAGER: Arc> = Default::default(); + static ref MANAGER: Arc> = Default::default(); } impl Manager { @@ -48,7 +48,7 @@ impl Manager { let video_and_stream_informations = self .streams .values() - .filter_map(|stream| match stream.state.lock() { + .filter_map(|stream| match stream.state.read() { Ok(guard) => Some(guard.video_and_stream_information.clone()), Err(error) => { error!("Failed locking a Mutex. Reason: {error}"); @@ -92,7 +92,7 @@ pub fn remove_all_streams() -> Result<()> { let keys = { let mut ids = vec![]; - match MANAGER.lock() { + match MANAGER.read() { Ok(guard) => guard, Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), } @@ -202,13 +202,13 @@ pub fn streams() -> Result> { #[instrument(level = "debug")] #[cached(time = 1)] pub fn get_first_sdp_from_source(source: String) -> ClonableResult { - let manager = match MANAGER.lock() { + let manager = match MANAGER.read() { Ok(guard) => guard, Err(error) => return Err(Arc::new(anyhow!("Failed locking a Mutex. Reason: {error}"))), }; let Some(result) = manager.streams.values().find_map(|stream| { - let state = match stream.state.lock() { + let state = match stream.state.read() { Ok(guard) => guard, Err(error) => { error!("Failed locking a Mutex. Reason: {error}"); @@ -284,7 +284,7 @@ pub async fn get_jpeg_thumbnail_from_source( .expect("Failed building a new tokio runtime") .block_on(async move { let res = async move { - let manager = match MANAGER.lock() { + let manager = match MANAGER.read() { Ok(guard) => guard, Err(error) => { return Some(Err(Arc::new(anyhow!( @@ -293,7 +293,7 @@ pub async fn get_jpeg_thumbnail_from_source( } }; let Some(stream) = manager.streams.values().find(|stream| { - let state = match stream.state.lock() { + let state = match stream.state.read() { Ok(guard) => guard, Err(error) => { error!("Failed locking a Mutex. Reason: {error}"); @@ -311,7 +311,7 @@ pub async fn get_jpeg_thumbnail_from_source( return None; }; - let state = match stream.state.lock() { + let state = match stream.state.read() { Ok(guard) => guard, Err(error) => { error!("Failed locking a Mutex. Reason: {error}"); @@ -374,12 +374,12 @@ pub fn add_stream_and_start(video_and_stream_information: VideoAndStreamInformat #[instrument(level = "debug")] pub fn remove_stream_by_name(stream_name: &str) -> Result<()> { - let manager = match MANAGER.lock() { + let manager = match MANAGER.read() { Ok(guard) => guard, Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), }; if let Some(stream_id) = &manager.streams.iter().find_map(|(id, stream)| { - let state = match stream.state.lock() { + let state = match stream.state.read() { Ok(guard) => guard, Err(error) => { error!("Failed locking a Mutex. Reason: {error}"); @@ -406,7 +406,7 @@ impl WebRTCSessionManagementInterface for Manager { bind: &webrtc::signalling_protocol::BindOffer, sender: tokio::sync::mpsc::UnboundedSender>, ) -> Result { - let mut manager = match MANAGER.lock() { + let mut manager = match MANAGER.write() { Ok(guard) => guard, Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), }; @@ -426,7 +426,7 @@ impl WebRTCSessionManagementInterface for Manager { }; let sink = Sink::WebRTC(WebRTCSink::try_new(bind, sender)?); - stream.state.lock().unwrap().pipeline.add_sink(sink)?; + stream.state.write().unwrap().pipeline.add_sink(sink)?; debug!("WebRTC session created: {session_id:?}"); Ok(session_id) @@ -437,7 +437,7 @@ impl WebRTCSessionManagementInterface for Manager { bind: &webrtc::signalling_protocol::BindAnswer, _reason: String, ) -> Result<()> { - let mut manager = match MANAGER.lock() { + let mut manager = match MANAGER.write() { Ok(guard) => guard, Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), }; @@ -447,14 +447,13 @@ impl WebRTCSessionManagementInterface for Manager { .get_mut(&bind.producer_id) .context(format!("Producer {:?} not found", bind.producer_id))?; - let mut state = match stream.state.lock() { + let mut state = match stream.state.write() { Ok(guard) => guard, Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), }; state .pipeline - .inner_state_mut() .remove_sink(&bind.session_id) .context(format!("Cannot remove session {:?}", bind.session_id))?; @@ -468,7 +467,7 @@ impl WebRTCSessionManagementInterface for Manager { bind: &webrtc::signalling_protocol::BindAnswer, sdp: &webrtc::signalling_protocol::RTCSessionDescription, ) -> Result<()> { - let manager = match MANAGER.lock() { + let manager = match MANAGER.read() { Ok(guard) => guard, Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), }; @@ -478,7 +477,7 @@ impl WebRTCSessionManagementInterface for Manager { .get(&bind.producer_id) .context(format!("Producer {:?} not found", bind.producer_id))? .state - .lock() + .read() { Ok(guard) => guard, Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), @@ -519,7 +518,7 @@ impl WebRTCSessionManagementInterface for Manager { sdp_m_line_index: u32, candidate: &str, ) -> Result<()> { - let manager = match MANAGER.lock() { + let manager = match MANAGER.read() { Ok(guard) => guard, Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), }; @@ -529,7 +528,7 @@ impl WebRTCSessionManagementInterface for Manager { .get(&bind.producer_id) .context(format!("Producer {:?} not found", bind.producer_id))? .state - .lock() + .read() { Ok(guard) => guard, Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), @@ -557,12 +556,12 @@ impl WebRTCSessionManagementInterface for Manager { impl StreamManagementInterface for Manager { #[instrument(level = "debug")] fn add_stream(stream: Stream) -> Result<()> { - let mut manager = match MANAGER.lock() { + let mut manager = match MANAGER.write() { Ok(guard) => guard, Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), }; - let stream_id = match stream.state.lock() { + let stream_id = match stream.state.read() { Ok(guard) => guard.pipeline_id, Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), }; @@ -577,7 +576,7 @@ impl StreamManagementInterface for Manager { #[instrument(level = "debug")] fn remove_stream(stream_id: &webrtc::signalling_protocol::PeerId) -> Result<()> { - let mut manager = match MANAGER.lock() { + let mut manager = match MANAGER.write() { Ok(guard) => guard, Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), }; @@ -600,7 +599,7 @@ impl StreamManagementInterface for Manager { #[instrument(level = "debug")] fn streams_information() -> Result> { - let manager = match MANAGER.lock() { + let manager = match MANAGER.read() { Ok(guard) => guard, Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), }; @@ -609,7 +608,7 @@ impl StreamManagementInterface for Manager { .streams .values() .filter_map(|stream| { - let state = match stream.state.lock() { + let state = match stream.state.read() { Ok(guard) => guard, Err(error) => { error!("Failed locking a Mutex. Reason: {error}"); From a4b7a8178f05ff29556a5f3cb6eed46ccc58030f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:26:33 -0300 Subject: [PATCH 10/27] src: stream: Add debug for when a stream is started --- src/stream/manager.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/stream/manager.rs b/src/stream/manager.rs index 4ffe07f7..0e22a192 100644 --- a/src/stream/manager.rs +++ b/src/stream/manager.rs @@ -571,6 +571,8 @@ impl StreamManagementInterface for Manager { } manager.update_settings(); + info!("Stream {stream_id} successfully added!"); + Ok(()) } From 3086b7471cfcec261253f411bf979f01354637da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:28:15 -0300 Subject: [PATCH 11/27] src: stream: Make add_stream_and_start and start_default async --- src/stream/manager.rs | 39 +++++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/src/stream/manager.rs b/src/stream/manager.rs index 0e22a192..29faa0d5 100644 --- a/src/stream/manager.rs +++ b/src/stream/manager.rs @@ -115,7 +115,7 @@ pub fn remove_all_streams() -> Result<()> { } #[instrument(level = "debug")] -pub fn start_default() -> Result<()> { +pub async fn start_default() -> Result<()> { // Get streams from default settings, this needs to be done first because // remove_all_streams will modify the settings as its using the stream manager // to remove the streams, and the stream manager will save the state after @@ -138,9 +138,9 @@ pub fn start_default() -> Result<()> { debug!("Streams: {streams:#?}"); for stream in streams { - add_stream_and_start(stream).unwrap_or_else(|error| { + if let Err(error) = add_stream_and_start(stream).await { error!("Not possible to start stream: {error:?}"); - }); + }; } Ok(()) @@ -347,26 +347,29 @@ pub async fn get_jpeg_thumbnail_from_source( } #[instrument(level = "debug")] -pub fn add_stream_and_start(video_and_stream_information: VideoAndStreamInformation) -> Result<()> { - let manager = match MANAGER.lock() { - Ok(guard) => guard, - Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), - }; - for stream in manager.streams.values() { - let state = match stream.state.lock() { +pub async fn add_stream_and_start( + video_and_stream_information: VideoAndStreamInformation, +) -> Result<()> { + { + let manager = match MANAGER.read() { Ok(guard) => guard, - Err(error) => { - return Err(anyhow!("Failed locking a Mutex. Reason: {error}")); - } + Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")), }; + for stream in manager.streams.values() { + let state = match stream.state.read() { + Ok(guard) => guard, + Err(error) => { + return Err(anyhow!("Failed locking a Mutex. Reason: {error}")); + } + }; - state - .video_and_stream_information - .conflicts_with(&video_and_stream_information)?; + state + .video_and_stream_information + .conflicts_with(&video_and_stream_information)?; + } } - drop(manager); - let stream = Stream::try_new(&video_and_stream_information)?; + let stream = Stream::try_new(&video_and_stream_information).await?; Manager::add_stream(stream)?; Ok(()) From 562d27246b0c7872d2db9354c47df291db0febfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:29:10 -0300 Subject: [PATCH 12/27] src: main, server: await for add_stream_and_start and start_default --- src/main.rs | 2 +- src/server/pages.rs | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index 036fc0ed..d7ec69ff 100644 --- a/src/main.rs +++ b/src/main.rs @@ -44,7 +44,7 @@ async fn main() -> Result<(), std::io::Error> { stream::webrtc::signalling_server::SignallingServer::default(); - if let Err(error) = stream::manager::start_default() { + if let Err(error) = stream::manager::start_default().await { error!("Failed to start default streams. Reason: {error:?}") } diff --git a/src/server/pages.rs b/src/server/pages.rs index e4d7598c..305826d1 100644 --- a/src/server/pages.rs +++ b/src/server/pages.rs @@ -232,7 +232,7 @@ pub fn v4l_post(json: web::Json) -> HttpResponse { pub async fn reset_settings(query: web::Query) -> HttpResponse { if query.all.unwrap_or_default() { settings::manager::reset(); - if let Err(error) = stream_manager::start_default() { + if let Err(error) = stream_manager::start_default().await { return HttpResponse::InternalServerError() .content_type("text/plain") .body(format!("{error:#?}")); @@ -269,7 +269,7 @@ pub async fn streams() -> HttpResponse { #[api_v2_operation] /// Create a video stream -pub fn streams_post(json: web::Json) -> HttpResponse { +pub async fn streams_post(json: web::Json) -> HttpResponse { let json = json.into_inner(); let video_source = match video_source::get_video_source(&json.source) { @@ -285,7 +285,9 @@ pub fn streams_post(json: web::Json) -> HttpResponse { name: json.name, stream_information: json.stream_information, video_source, - }) { + }) + .await + { return HttpResponse::NotAcceptable() .content_type("text/plain") .body(format!("{error:#?}")); From eb59a5d589f5384375e42e997ef45b9fbecabe54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:31:56 -0300 Subject: [PATCH 13/27] src: stream: sink: Remove duplicated sync --- src/stream/sink/image_sink.rs | 5 ----- src/stream/sink/udp_sink.rs | 5 ----- 2 files changed, 10 deletions(-) diff --git a/src/stream/sink/image_sink.rs b/src/stream/sink/image_sink.rs index 99e3ede8..49ccc724 100644 --- a/src/stream/sink/image_sink.rs +++ b/src/stream/sink/image_sink.rs @@ -196,11 +196,6 @@ impl SinkInterface for ImageSink { return Err(anyhow!(msg)); } - // Syncronize SinkPipeline - if let Err(sync_err) = self.pipeline.sync_children_states() { - error!("Failed to syncronize children states: {sync_err:?}"); - } - // Unblock data to go through this added Tee src pad tee_src_pad.remove_probe(tee_src_pad_data_blocker); diff --git a/src/stream/sink/udp_sink.rs b/src/stream/sink/udp_sink.rs index 456ed55b..4830f15c 100644 --- a/src/stream/sink/udp_sink.rs +++ b/src/stream/sink/udp_sink.rs @@ -144,11 +144,6 @@ impl SinkInterface for UdpSink { return Err(anyhow!(msg)); } - // Syncronize SinkPipeline - if let Err(sync_err) = self.pipeline.sync_children_states() { - error!("Failed to syncronize children states: {sync_err:?}"); - } - // Unblock data to go through this added Tee src pad tee_src_pad.remove_probe(tee_src_pad_data_blocker); From 6783a8ee2a97c356ef6530a5d4afd7aa1a336251 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:32:14 -0300 Subject: [PATCH 14/27] src: stream: sink: Fix typo --- src/stream/sink/rtsp_sink.rs | 2 +- src/stream/sink/udp_sink.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/stream/sink/rtsp_sink.rs b/src/stream/sink/rtsp_sink.rs index 50c713d1..2f5faf18 100644 --- a/src/stream/sink/rtsp_sink.rs +++ b/src/stream/sink/rtsp_sink.rs @@ -60,7 +60,7 @@ impl SinkInterface for RtspSink { // Add the Sink elements to the Pipeline let elements = &[&self.queue, &self.sink]; if let Err(add_err) = pipeline.add_many(elements) { - let msg = format!("Failed to add WebRTCSink's elements to the Pipeline: {add_err:?}"); + let msg = format!("Failed to add RTSP's elements to the Pipeline: {add_err:?}"); error!(msg); if let Some(parent) = tee_src_pad.parent_element() { diff --git a/src/stream/sink/udp_sink.rs b/src/stream/sink/udp_sink.rs index 4830f15c..774ee6b0 100644 --- a/src/stream/sink/udp_sink.rs +++ b/src/stream/sink/udp_sink.rs @@ -82,7 +82,7 @@ impl SinkInterface for UdpSink { .expect("No sink pad found on ProxySink"); if let Err(link_err) = queue_src_pad.link(proxysink_sink_pad) { let msg = - format!("Failed to link Queue's src pad with WebRTCBin's sink pad: {link_err:?}"); + format!("Failed to link Queue's src pad with ProxySink's sink pad: {link_err:?}"); error!(msg); if let Some(parent) = tee_src_pad.parent_element() { @@ -337,15 +337,15 @@ impl UdpSink { // Add Sink elements to the Sink's Pipeline let elements = [&_proxysrc, &_udpsink]; - if let Err(add_err) = pipeline.add_many(&elements) { + if let Err(add_err) = pipeline.add_many(elements) { return Err(anyhow!( "Failed adding UdpSink's elements to Sink Pipeline: {add_err:?}" )); } // Link Sink's elements - if let Err(link_err) = gst::Element::link_many(&elements) { - if let Err(remove_err) = pipeline.remove_many(&elements) { + if let Err(link_err) = gst::Element::link_many(elements) { + if let Err(remove_err) = pipeline.remove_many(elements) { warn!("Failed removing elements from UdpSink Pipeline: {remove_err:?}") }; return Err(anyhow!("Failed linking UdpSink's elements: {link_err:?}")); From 322f558cf5dca903037ff1166b8af72dea851ae9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 18:33:12 -0300 Subject: [PATCH 15/27] src: mavlink: Refactor MAVLink reconnection --- src/mavlink/manager.rs | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/src/mavlink/manager.rs b/src/mavlink/manager.rs index 998bb262..cd2a7f13 100644 --- a/src/mavlink/manager.rs +++ b/src/mavlink/manager.rs @@ -20,7 +20,7 @@ pub struct Manager { struct Connection { address: String, - connection: Box + Sync + Send>, + connection: Option + Sync + Send>>, sender: broadcast::Sender, } @@ -36,14 +36,12 @@ impl Default for Manager { let address = settings::manager::mavlink_endpoint().expect("No configured mavlink endpoint"); - let connection = Connection::connect(&address); - let (sender, _receiver) = broadcast::channel(100); let this = Self { connection: Arc::new(RwLock::new(Connection { address, - connection, + connection: None, sender, })), ids: Arc::new(RwLock::new(vec![])), @@ -78,8 +76,15 @@ impl Manager { loop { std::thread::sleep(std::time::Duration::from_millis(10)); + let Ok(inner_guard) = inner.read() else { + break; // Break to trigger reconnection + }; + let Some(mavlink) = inner_guard.connection.as_deref() else { + break; // Break to trigger reconnection + }; + // Receive from the Mavlink network - let (header, message) = match inner.read().unwrap().connection.recv() { + let (header, message) = match mavlink.recv() { Ok(message) => message, Err(error) => { trace!("Failed receiving from mavlink: {error:?}"); @@ -99,9 +104,7 @@ impl Manager { trace!("Message received: {header:?}, {message:?}"); // Send the received message to the cameras - if let Err(error) = inner - .read() - .unwrap() + if let Err(error) = inner_guard .sender .send(Message::Received((header, message))) { @@ -113,7 +116,8 @@ impl Manager { // Reconnects { let mut inner = inner.write().unwrap(); - inner.connection = Connection::connect(&inner.address); + let address = inner.address.clone(); + inner.connection.replace(Connection::connect(&address)); } std::thread::sleep(std::time::Duration::from_millis(500)); @@ -141,20 +145,28 @@ impl Manager { _ => continue, }; + let Ok(inner_guard) = inner.read() else { + break; // Break to trigger reconnection + }; + let Some(mavlink) = inner_guard.connection.as_deref() else { + break; // Break to trigger reconnection + }; + // Send the response from the cameras to the Mavlink network - if let Err(error) = inner.read().unwrap().connection.send(&header, &message) { + if let Err(error) = mavlink.send(&header, &message) { error!("Failed sending message to Mavlink Connection: {error:?}"); break; // Break to trigger reconnection } - trace!("Message sent: {header:?}, {message:?}"); + debug!("Message sent: {header:?}, {message:?}"); } // Reconnects { let mut inner = inner.write().unwrap(); - inner.connection = Connection::connect(&inner.address); + let address = inner.address.clone(); + inner.connection.replace(Connection::connect(&address)); } std::thread::sleep(std::time::Duration::from_millis(500)); @@ -213,7 +225,7 @@ impl Connection { return connection; } Err(error) => { - error!("Failed to connect, trying again in one second. Reason: {error:#?}."); + error!("Failed to connect, trying again in one second. Reason: {error:?}."); } } } From 61d174868842b79ae51fe8fcaf686a203ec96479 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 20:25:52 -0300 Subject: [PATCH 16/27] src: mavlink: rename structs --- src/mavlink/mavlink_camera.rs | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/src/mavlink/mavlink_camera.rs b/src/mavlink/mavlink_camera.rs index a45736bf..d55b24e3 100644 --- a/src/mavlink/mavlink_camera.rs +++ b/src/mavlink/mavlink_camera.rs @@ -16,15 +16,15 @@ use super::manager::Message; use super::utils::*; #[derive(Debug)] -pub struct MavlinkCameraHandle { - inner: Arc, +pub struct MavlinkCamera { + inner: Arc, _runtime: tokio::runtime::Runtime, heartbeat_handle: tokio::task::JoinHandle<()>, messages_handle: tokio::task::JoinHandle<()>, } #[derive(Debug, Clone)] -struct MavlinkCamera { +struct MavlinkCameraInner { component: MavlinkCameraComponent, mavlink_stream_type: mavlink::common::VideoStreamType, video_stream_uri: Url, @@ -32,10 +32,10 @@ struct MavlinkCamera { video_source_type: VideoSourceType, } -impl MavlinkCameraHandle { +impl MavlinkCamera { #[instrument(level = "debug")] pub fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result { - let inner = Arc::new(MavlinkCamera::try_new(video_and_stream_information)?); + let inner = Arc::new(MavlinkCameraInner::try_new(video_and_stream_information)?); let sender = crate::mavlink::manager::Manager::get_sender(); @@ -67,7 +67,7 @@ impl MavlinkCameraHandle { } } -impl MavlinkCamera { +impl MavlinkCameraInner { #[instrument(level = "debug")] pub fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result { let video_stream_uri = video_and_stream_information @@ -128,7 +128,10 @@ impl MavlinkCamera { #[instrument(level = "trace", skip(sender))] #[instrument(level = "debug", skip_all, fields(component_id = camera.component.component_id))] - pub async fn heartbeat_loop(camera: Arc, sender: broadcast::Sender) { + pub async fn heartbeat_loop( + camera: Arc, + sender: broadcast::Sender, + ) { let component_id = camera.component.component_id; let system_id = camera.component.system_id; @@ -161,7 +164,10 @@ impl MavlinkCamera { #[instrument(level = "trace", skip(sender))] #[instrument(level = "debug", skip_all, fields(component_id = camera.component.component_id))] - pub async fn messages_loop(camera: Arc, sender: broadcast::Sender) { + pub async fn messages_loop( + camera: Arc, + sender: broadcast::Sender, + ) { let mut receiver = sender.subscribe(); loop { @@ -189,7 +195,7 @@ impl MavlinkCamera { #[instrument(level = "trace", skip(sender))] #[instrument(level = "debug", skip(sender, camera), fields(component_id = camera.component.component_id))] async fn handle_message( - camera: Arc, + camera: Arc, sender: broadcast::Sender, header: MavHeader, message: MavMessage, @@ -225,7 +231,7 @@ impl MavlinkCamera { #[instrument(level = "trace", skip(sender))] #[instrument(level = "debug", skip(sender, camera), fields(component_id = camera.component.component_id))] async fn handle_command_long( - camera: &MavlinkCamera, + camera: &MavlinkCameraInner, sender: broadcast::Sender, their_header: &MavHeader, data: &mavlink::common::COMMAND_LONG_DATA, @@ -449,7 +455,7 @@ impl MavlinkCamera { #[instrument(level = "trace", skip(sender))] #[instrument(level = "debug", skip(sender, camera), fields(component_id = camera.component.component_id))] async fn handle_param_ext_set( - camera: &MavlinkCamera, + camera: &MavlinkCameraInner, sender: broadcast::Sender, header: &MavHeader, data: &mavlink::common::PARAM_EXT_SET_DATA, @@ -510,7 +516,7 @@ impl MavlinkCamera { #[instrument(level = "trace", skip(sender))] #[instrument(level = "debug", skip(sender, camera), fields(component_id = camera.component.component_id))] async fn handle_param_ext_request_read( - camera: &MavlinkCamera, + camera: &MavlinkCameraInner, sender: broadcast::Sender, header: &MavHeader, data: &mavlink::common::PARAM_EXT_REQUEST_READ_DATA, @@ -561,7 +567,7 @@ impl MavlinkCamera { #[instrument(level = "trace", skip(sender))] #[instrument(level = "debug", skip(sender, camera), fields(component_id = camera.component.component_id))] async fn handle_param_ext_request_list( - camera: &MavlinkCamera, + camera: &MavlinkCameraInner, sender: broadcast::Sender, header: &MavHeader, data: &mavlink::common::PARAM_EXT_REQUEST_LIST_DATA, @@ -611,7 +617,7 @@ impl MavlinkCamera { } } -impl Drop for MavlinkCameraHandle { +impl Drop for MavlinkCamera { fn drop(&mut self) { self.heartbeat_handle.abort(); self.messages_handle.abort(); From 842d68d9aa3fcb565470d4ed5747bdbcb560680a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 20:27:30 -0300 Subject: [PATCH 17/27] src: mavlink: Move MavlinkCamera to async --- src/mavlink/mavlink_camera.rs | 77 ++++++++++++++++------------------- 1 file changed, 34 insertions(+), 43 deletions(-) diff --git a/src/mavlink/mavlink_camera.rs b/src/mavlink/mavlink_camera.rs index d55b24e3..2dd69534 100644 --- a/src/mavlink/mavlink_camera.rs +++ b/src/mavlink/mavlink_camera.rs @@ -18,9 +18,8 @@ use super::utils::*; #[derive(Debug)] pub struct MavlinkCamera { inner: Arc, - _runtime: tokio::runtime::Runtime, - heartbeat_handle: tokio::task::JoinHandle<()>, - messages_handle: tokio::task::JoinHandle<()>, + heartbeat_handle: Option>, + messages_handle: Option>, } #[derive(Debug, Clone)] @@ -34,33 +33,37 @@ struct MavlinkCameraInner { impl MavlinkCamera { #[instrument(level = "debug")] - pub fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result { + pub async fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result { let inner = Arc::new(MavlinkCameraInner::try_new(video_and_stream_information)?); let sender = crate::mavlink::manager::Manager::get_sender(); - let runtime = tokio::runtime::Builder::new_multi_thread() - .on_thread_start(|| debug!("Thread started")) - .on_thread_stop(|| debug!("Thread stopped")) - .thread_name_fn(|| { - static ATOMIC_ID: std::sync::atomic::AtomicUsize = - std::sync::atomic::AtomicUsize::new(0); - let id = ATOMIC_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - format!("MavlinkCamera-{id}") - }) - .worker_threads(2) - .enable_all() - .build() - .expect("Failed building a new tokio runtime"); - - let heartbeat_handle = - runtime.spawn(MavlinkCamera::heartbeat_loop(inner.clone(), sender.clone())); - let messages_handle = - runtime.spawn(MavlinkCamera::messages_loop(inner.clone(), sender.clone())); + debug!("Starting MAVLink HeartBeat task..."); + + let inner_cloned = inner.clone(); + let sender_cloned = sender.clone(); + let heartbeat_handle = Some(tokio::spawn(async move { + debug!("MAVLink HeartBeat task started!"); + match MavlinkCameraInner::heartbeat_loop(inner_cloned, sender_cloned).await { + Ok(_) => debug!("MAVLink HeartBeat task eneded with no errors"), + Err(error) => warn!("MAVLink HeartBeat task ended with error: {error:#?}"), + }; + })); + + debug!("Starting MAVLink Message task..."); + + let inner_cloned = inner.clone(); + let sender_cloned = sender.clone(); + let messages_handle = Some(tokio::spawn(async move { + debug!("MAVLink Message task started!"); + match MavlinkCameraInner::messages_loop(inner_cloned, sender_cloned).await { + Ok(_) => debug!("MAVLink Message task eneded with no errors"), + Err(error) => warn!("MAVLink Message task ended with error: {error:#?}"), + }; + })); Ok(Self { inner, - _runtime: runtime, heartbeat_handle, messages_handle, }) @@ -131,7 +134,7 @@ impl MavlinkCameraInner { pub async fn heartbeat_loop( camera: Arc, sender: broadcast::Sender, - ) { + ) -> Result<()> { let component_id = camera.component.component_id; let system_id = camera.component.system_id; @@ -157,8 +160,6 @@ impl MavlinkCameraInner { error!("Failed to send message: {error:?}"); continue; } - - debug!("Heartbeat sent"); } } @@ -167,28 +168,18 @@ impl MavlinkCameraInner { pub async fn messages_loop( camera: Arc, sender: broadcast::Sender, - ) { + ) -> Result<()> { let mut receiver = sender.subscribe(); + use crate::mavlink::mavlink_camera::Message::Received; loop { - let (header, message) = match receiver.recv().await { - Ok(Message::Received(message)) => message, - Err(broadcast::error::RecvError::Closed) => { - unreachable!( - "Closed channel: This should never happen, this channel is static!" - ); - } - Ok(Message::ToBeSent(_)) | Err(broadcast::error::RecvError::Lagged(_)) => continue, - }; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - trace!("Message received: {header:?}, {message:?}"); + if let Ok(Received((header, message))) = receiver.recv().await { + trace!("Message received: {header:?}, {message:?}"); - tokio::spawn(Self::handle_message( - camera.clone(), - sender.clone(), - header, - message, - )); + Self::handle_message(camera.clone(), sender.clone(), header, message).await; + } } } From 3a7e25325c1681e1d980a9c0af2c66715bb0be0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 20:27:58 -0300 Subject: [PATCH 18/27] src: mavlink: Implement Drop for MavinkCamera --- src/mavlink/mavlink_camera.rs | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/src/mavlink/mavlink_camera.rs b/src/mavlink/mavlink_camera.rs index 2dd69534..d3b58162 100644 --- a/src/mavlink/mavlink_camera.rs +++ b/src/mavlink/mavlink_camera.rs @@ -609,9 +609,36 @@ impl MavlinkCameraInner { } impl Drop for MavlinkCamera { + #[instrument(level = "debug", skip(self))] fn drop(&mut self) { - self.heartbeat_handle.abort(); - self.messages_handle.abort(); - super::manager::Manager::drop_id(self.inner.component.component_id) + debug!("Dropping MavlinkCameraHandle..."); + + if let Some(handle) = self.heartbeat_handle.take() { + if !handle.is_finished() { + handle.abort(); + tokio::spawn(async move { + let _ = handle.await; + debug!("Mavlink Heartbeat task aborted"); + }); + } else { + debug!("Mavlink Heartbeat task nicely finished!"); + } + } + + if let Some(handle) = self.messages_handle.take() { + if !handle.is_finished() { + handle.abort(); + tokio::spawn(async move { + let _ = handle.await; + debug!("Mavlink Message task aborted"); + }); + } else { + debug!("Mavlink Message task nicely finished!"); + } + } + + super::manager::Manager::drop_id(self.inner.component.component_id); + + debug!("MavlinkCameraHandle Dropped!"); } } From fdba2efda104c7d300444aaf27348f3123add0f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 20:29:12 -0300 Subject: [PATCH 19/27] src: stream: update MAVLink struct names --- src/stream/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 0a960787..eefbb8f8 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -8,7 +8,7 @@ pub mod webrtc; use std::sync::{Arc, Mutex}; -use crate::mavlink::mavlink_camera::MavlinkCameraHandle; +use crate::mavlink::mavlink_camera::MavlinkCamera; use crate::video::types::{VideoEncodeType, VideoSourceType}; use crate::video::video_source::cameras_available; use crate::video_stream::types::VideoAndStreamInformation; @@ -42,7 +42,7 @@ pub struct StreamState { pub pipeline_id: PeerId, pub pipeline: Pipeline, pub video_and_stream_information: VideoAndStreamInformation, - pub mavlink_camera: Option, + pub mavlink_camera: Option, } impl Stream { From ff0fdbbf3ac5c2a780b91437dd8b209998264247 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 20:34:35 -0300 Subject: [PATCH 20/27] src: stream: Move Manager to RWLock --- src/stream/mod.rs | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index eefbb8f8..7e38f752 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -6,7 +6,7 @@ pub mod sink; pub mod types; pub mod webrtc; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use crate::mavlink::mavlink_camera::MavlinkCamera; use crate::video::types::{VideoEncodeType, VideoSourceType}; @@ -32,8 +32,8 @@ use ::gst::prelude::*; #[derive(Debug)] pub struct Stream { - state: Arc>, - terminated: Arc>, + state: Arc>, + terminated: Arc>, _watcher_thread_handle: std::thread::JoinHandle<()>, } @@ -50,12 +50,11 @@ impl Stream { pub fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result { let pipeline_id = Manager::generate_uuid(); - let state = Arc::new(Mutex::new(StreamState::try_new( - video_and_stream_information, - &pipeline_id, - )?)); + let state = Arc::new(RwLock::new( + StreamState::try_new(video_and_stream_information, &pipeline_id).await?, + )); - let terminated = Arc::new(Mutex::new(false)); + let terminated = Arc::new(RwLock::new(false)); let terminated_cloned = terminated.clone(); let video_and_stream_information_cloned = video_and_stream_information.clone(); @@ -85,8 +84,8 @@ impl Stream { fn watcher( video_and_stream_information: VideoAndStreamInformation, pipeline_id: uuid::Uuid, - state: Arc>, - terminated: Arc>, + state: Arc>, + terminated: Arc>, ) { // To reduce log size, each report we raise the report interval geometrically until a maximum value is reached: let report_interval_mult = 2; @@ -100,8 +99,8 @@ impl Stream { std::thread::sleep(std::time::Duration::from_millis(100)); if !state - .lock() - .unwrap() + .read() + .map_err(|e| anyhow::Error::msg(e.to_string()))? .pipeline .inner_state_as_ref() .pipeline_runner @@ -161,7 +160,7 @@ impl Stream { } // Try to recreate the stream - if let Ok(mut state) = state.lock() { + if let Ok(mut state) = state.write() { *state = match StreamState::try_new(&video_and_stream_information, &pipeline_id) { Ok(state) => state, @@ -174,7 +173,7 @@ impl Stream { } } - if *terminated.lock().unwrap() { + if *terminated.read().unwrap() { debug!("Ending stream {pipeline_id:?}."); break; } @@ -184,7 +183,7 @@ impl Stream { impl Drop for Stream { fn drop(&mut self) { - *self.terminated.lock().unwrap() = true; + *self.terminated.write().unwrap() = true; } } From cbc7df785513ad6b63a75d124082195feac0cdc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 20:38:37 -0300 Subject: [PATCH 21/27] src: mavlink: Move Stream and watcher to async --- src/stream/mod.rs | 56 +++++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 7e38f752..e7686d79 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -20,7 +20,7 @@ use types::*; use webrtc::signalling_protocol::PeerId; use webrtc::signalling_server::StreamManagementInterface; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Result}; use tracing::*; @@ -34,7 +34,7 @@ use ::gst::prelude::*; pub struct Stream { state: Arc>, terminated: Arc>, - _watcher_thread_handle: std::thread::JoinHandle<()>, + watcher_handle: Option>, } #[derive(Debug)] @@ -47,7 +47,7 @@ pub struct StreamState { impl Stream { #[instrument(level = "debug")] - pub fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result { + pub async fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result { let pipeline_id = Manager::generate_uuid(); let state = Arc::new(RwLock::new( @@ -57,36 +57,39 @@ impl Stream { let terminated = Arc::new(RwLock::new(false)); let terminated_cloned = terminated.clone(); + debug!("Starting StreamWatcher task..."); + let video_and_stream_information_cloned = video_and_stream_information.clone(); let state_cloned = state.clone(); - let _watcher_thread_handle = std::thread::Builder::new() - .name(format!("Stream-{pipeline_id}")) - .spawn(move || { - Self::watcher( - video_and_stream_information_cloned, - pipeline_id, - state_cloned, - terminated_cloned, - ) - }) - .context(format!( - "Failed when spawing PipelineRunner thread for Pipeline {pipeline_id:#?}" - ))?; + let watcher_handle = Some(tokio::spawn(async move { + debug!("StreamWatcher task started!"); + match Self::watcher( + video_and_stream_information_cloned, + pipeline_id, + state_cloned, + terminated_cloned, + ) + .await + { + Ok(_) => debug!("StreamWatcher task eneded with no errors"), + Err(error) => warn!("StreamWatcher task ended with error: {error:#?}"), + }; + })); Ok(Self { state, terminated, - _watcher_thread_handle, + watcher_handle, }) } #[instrument(level = "debug")] - fn watcher( + async fn watcher( video_and_stream_information: VideoAndStreamInformation, pipeline_id: uuid::Uuid, state: Arc>, terminated: Arc>, - ) { + ) -> Result<()> { // To reduce log size, each report we raise the report interval geometrically until a maximum value is reached: let report_interval_mult = 2; let report_interval_max = 60; @@ -96,7 +99,7 @@ impl Stream { let mut video_and_stream_information = video_and_stream_information; loop { - std::thread::sleep(std::time::Duration::from_millis(100)); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; if !state .read() @@ -153,7 +156,7 @@ impl Stream { } } - std::thread::sleep(std::time::Duration::from_secs(1)); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; continue; } } @@ -162,11 +165,12 @@ impl Stream { // Try to recreate the stream if let Ok(mut state) = state.write() { *state = match StreamState::try_new(&video_and_stream_information, &pipeline_id) + .await { Ok(state) => state, Err(error) => { error!("Failed to recreate the stream {pipeline_id:?}: {error:#?}. Trying again in one second..."); - std::thread::sleep(std::time::Duration::from_secs(1)); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; continue; } }; @@ -178,6 +182,8 @@ impl Stream { break; } } + + Ok(()) } } @@ -189,7 +195,7 @@ impl Drop for Stream { impl StreamState { #[instrument(level = "debug")] - pub fn try_new( + pub async fn try_new( video_and_stream_information: &VideoAndStreamInformation, pipeline_id: &uuid::Uuid, ) -> Result { @@ -208,7 +214,9 @@ impl StreamState { thermal: _, disable_mavlink: true, }) => None, - _ => MavlinkCameraHandle::try_new(video_and_stream_information).ok(), + _ => MavlinkCamera::try_new(video_and_stream_information) + .await + .ok(), }; let mut stream = StreamState { From 7a5da8dc971861d27164cf1fe1b89d88b72307bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 20:43:35 -0300 Subject: [PATCH 22/27] src: mavlink: Reimplement Drop for Stream --- src/stream/mod.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index e7686d79..e8408c32 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -188,8 +188,25 @@ impl Stream { } impl Drop for Stream { + #[instrument(level = "debug", skip(self))] fn drop(&mut self) { + debug!("Dropping Stream..."); + *self.terminated.write().unwrap() = true; + + if let Some(handle) = self.watcher_handle.take() { + if !handle.is_finished() { + handle.abort(); + tokio::spawn(async move { + let _ = handle.await; + debug!("PipelineWatcher task aborted"); + }); + } else { + debug!("PipelineWatcher task nicely finished!"); + } + } + + debug!("Stream Dropped!"); } } From 70b327b0b1bedffee261fb82e2410bd2f8edaffa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 20:45:19 -0300 Subject: [PATCH 23/27] src: mavlink: Rewrite match into a block --- src/stream/mod.rs | 88 ++++++++++++++++++++++++----------------------- 1 file changed, 45 insertions(+), 43 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index e8408c32..3f568134 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -243,63 +243,65 @@ impl StreamState { mavlink_camera, }; - match &video_and_stream_information.video_source { - VideoSourceType::Redirect(_) => return Ok(stream), // Do not add any Sink if it's a redirect Pipeline - VideoSourceType::Gst(_) | VideoSourceType::Local(_) => (), - } - - let endpoints = &video_and_stream_information.stream_information.endpoints; - - // Disable concurrent RTSP and UDP sinks creation, as it is failing. - if endpoints.iter().any(|endpoint| endpoint.scheme() == "udp") - && endpoints.iter().any(|endpoint| endpoint.scheme() == "rtsp") - { - return Err(anyhow!( - "UDP endpoints won't work together with RTSP endpoints. You need to choose one. This is a (temporary) software limitation, if this is a feature you need, please, contact us." - )); - } + // Do not add any Sink if it's a redirect Pipeline + if !matches!( + &video_and_stream_information.video_source, + VideoSourceType::Redirect(_) + ) { + let endpoints = &video_and_stream_information.stream_information.endpoints; - if endpoints.iter().any(|endpoint| endpoint.scheme() == "udp") { - if let Err(reason) = - create_udp_sink(Manager::generate_uuid(), video_and_stream_information) - .and_then(|sink| stream.pipeline.add_sink(sink)) + // Disable concurrent RTSP and UDP sinks creation, as it is failing. + if endpoints.iter().any(|endpoint| endpoint.scheme() == "udp") + && endpoints.iter().any(|endpoint| endpoint.scheme() == "rtsp") { return Err(anyhow!( - "Failed to add Sink of type UDP to the Pipeline. Reason: {reason}" + "UDP endpoints won't work together with RTSP endpoints. You need to choose one. This is a (temporary) software limitation, if this is a feature you need, please, contact us." )); } - } - if endpoints.iter().any(|endpoint| endpoint.scheme() == "rtsp") { + if endpoints.iter().any(|endpoint| endpoint.scheme() == "udp") { + if let Err(reason) = + create_udp_sink(Manager::generate_uuid(), video_and_stream_information) + .and_then(|sink| stream.pipeline.add_sink(sink)) + { + return Err(anyhow!( + "Failed to add Sink of type UDP to the Pipeline. Reason: {reason}" + )); + } + } + + if endpoints.iter().any(|endpoint| endpoint.scheme() == "rtsp") { + if let Err(reason) = + create_rtsp_sink(Manager::generate_uuid(), video_and_stream_information) + .and_then(|sink| stream.pipeline.add_sink(sink)) + { + return Err(anyhow!( + "Failed to add Sink of type RTSP to the Pipeline. Reason: {reason}" + )); + } + } + if let Err(reason) = - create_rtsp_sink(Manager::generate_uuid(), video_and_stream_information) + create_image_sink(Manager::generate_uuid(), video_and_stream_information) .and_then(|sink| stream.pipeline.add_sink(sink)) { return Err(anyhow!( - "Failed to add Sink of type RTSP to the Pipeline. Reason: {reason}" + "Failed to add Sink of type Image to the Pipeline. Reason: {reason}" )); } - } - - if let Err(reason) = - create_image_sink(Manager::generate_uuid(), video_and_stream_information) - .and_then(|sink| stream.pipeline.add_sink(sink)) - { - return Err(anyhow!( - "Failed to add Sink of type Image to the Pipeline. Reason: {reason}" - )); - } - // Start the pipeline. This will automatically start sinks with linked proxy-isolated pipelines - stream - .pipeline - .inner_state_as_ref() - .pipeline_runner - .start()?; + // Start the pipeline. This will automatically start sinks with linked proxy-isolated pipelines + stream + .pipeline + .inner_state_as_ref() + .pipeline_runner + .start()?; - // Start all the sinks - for sink in stream.pipeline.inner_state_mut().sinks.values() { - sink.start()? + // Start all the sinks + for sink in stream.pipeline.inner_state_mut().sinks.values() { + sink.start()? + } + } } Ok(stream) From 58366329c73ba120b70724bd606a840f1b21de72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 20:56:56 -0300 Subject: [PATCH 24/27] src: stream: Only acquire state Write lock when needed --- src/stream/mod.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 3f568134..c1a0d2ba 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -162,18 +162,23 @@ impl Stream { } } + let new_state = match StreamState::try_new( + &video_and_stream_information, + &pipeline_id, + ) + .await + { + Ok(state) => state, + Err(error) => { + error!("Failed to recreate the stream {pipeline_id:?}: {error:#?}. Trying again in one second..."); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + continue; + } + }; + // Try to recreate the stream if let Ok(mut state) = state.write() { - *state = match StreamState::try_new(&video_and_stream_information, &pipeline_id) - .await - { - Ok(state) => state, - Err(error) => { - error!("Failed to recreate the stream {pipeline_id:?}: {error:#?}. Trying again in one second..."); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - continue; - } - }; + *state = new_state } } From 87e80d0f3e1facda647acf7243a2ffa331c74b5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 20:58:34 -0300 Subject: [PATCH 25/27] src: stream: Finish MAVLink tasks before start the stream recreation process --- src/stream/mod.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index c1a0d2ba..f1b9fc95 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -109,6 +109,13 @@ impl Stream { .pipeline_runner .is_running() { + // First, finish the MAVLink tasks + { + if let Some(mavlink) = state.write().unwrap().mavlink_camera.take() { + drop(mavlink); + } + } + // If it's a camera, try to update the device if let VideoSourceType::Local(_) = video_and_stream_information.video_source { let mut streams = vec![video_and_stream_information.clone()]; From 74d5f62eb1a7edb5ae35dced82d4b37f5dd6c216 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 21:00:41 -0300 Subject: [PATCH 26/27] src: stream: Only create the MavlinkCamera when MAVLink is not disabled --- src/stream/mod.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f1b9fc95..fa1db696 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -234,25 +234,11 @@ impl StreamState { let pipeline = Pipeline::try_new(video_and_stream_information, pipeline_id)?; - // Only create the Mavlink Handle when mavlink is not disabled - let mavlink_camera = match video_and_stream_information - .stream_information - .extended_configuration - { - Some(ExtendedConfiguration { - thermal: _, - disable_mavlink: true, - }) => None, - _ => MavlinkCamera::try_new(video_and_stream_information) - .await - .ok(), - }; - let mut stream = StreamState { pipeline_id: *pipeline_id, pipeline, video_and_stream_information: video_and_stream_information.clone(), - mavlink_camera, + mavlink_camera: None, }; // Do not add any Sink if it's a redirect Pipeline @@ -314,6 +300,20 @@ impl StreamState { sink.start()? } } + + // Only create the MavlinkCamera when MAVLink is not disabled + if matches!( + video_and_stream_information + .stream_information + .extended_configuration, + Some(ExtendedConfiguration { + thermal: _, + disable_mavlink: false, + }) + ) { + stream.mavlink_camera = MavlinkCamera::try_new(video_and_stream_information) + .await + .ok(); } Ok(stream) From eb5a91685a225ea4579b6917f0c4e8eb551516a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Fri, 5 Jan 2024 21:49:34 -0300 Subject: [PATCH 27/27] src: Change main runtime to explicitly Tokio --- src/main.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index d7ec69ff..bd3ddf98 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,7 @@ mod stream; mod video; mod video_stream; -#[actix_web::main] +#[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> Result<(), std::io::Error> { // CLI should be started before logger to allow control over verbosity cli::manager::init(); @@ -42,11 +42,13 @@ async fn main() -> Result<(), std::io::Error> { helper::develop::start_check_tasks_on_webrtc_reconnects(); } - stream::webrtc::signalling_server::SignallingServer::default(); + let _signalling_server = stream::webrtc::signalling_server::SignallingServer::default(); if let Err(error) = stream::manager::start_default().await { error!("Failed to start default streams. Reason: {error:?}") } - server::manager::run(&cli::manager::server_address()).await + server::manager::run(&cli::manager::server_address()).await?; + + Ok(()) }