diff --git a/aquadoggo/src/network/behaviour.rs b/aquadoggo/src/network/behaviour.rs index c1e25ba66..fb9a9f20c 100644 --- a/aquadoggo/src/network/behaviour.rs +++ b/aquadoggo/src/network/behaviour.rs @@ -7,13 +7,9 @@ use libp2p::swarm::NetworkBehaviour; use libp2p::{autonat, connection_limits, identify, mdns, ping, relay, rendezvous, PeerId}; use log::debug; -use crate::bus::ServiceSender; -use crate::db::SqlStore; use crate::network::config::NODE_NAMESPACE; use crate::network::replication; use crate::network::NetworkConfiguration; -use crate::replication::SyncIngest; -use crate::schema::SchemaProvider; /// Network behaviour for the aquadoggo node. #[derive(NetworkBehaviour)] @@ -60,9 +56,6 @@ impl Behaviour { pub fn new( network_config: &NetworkConfiguration, peer_id: PeerId, - store: &SqlStore, - schema_provider: &SchemaProvider, - tx: ServiceSender, key_pair: Keypair, relay_client: Option, ) -> Result { @@ -143,9 +136,7 @@ impl Behaviour { None }; - let ingest = SyncIngest::new(schema_provider.clone(), tx); - let replication = - replication::Behaviour::new(store, ingest, schema_provider.clone(), &peer_id); + let replication = replication::Behaviour::new(); Ok(Self { autonat: autonat.into(), diff --git a/aquadoggo/src/network/replication/behaviour.rs b/aquadoggo/src/network/replication/behaviour.rs index 3eccf4437..6bd614e17 100644 --- a/aquadoggo/src/network/replication/behaviour.rs +++ b/aquadoggo/src/network/replication/behaviour.rs @@ -11,10 +11,8 @@ use libp2p::swarm::{ }; use libp2p::{Multiaddr, PeerId}; -use crate::db::SqlStore; use crate::network::replication::handler::{Handler, HandlerInEvent, HandlerOutEvent}; -use crate::replication::{Message, SyncIngest, SyncManager, SyncMessage, TargetSet}; -use crate::schema::SchemaProvider; +use crate::replication::SyncMessage; #[derive(Debug)] pub enum BehaviourOutEvent { @@ -25,21 +23,12 @@ pub enum BehaviourOutEvent { #[derive(Debug)] pub struct Behaviour { events: VecDeque>, - manager: SyncManager, - schema_provider: SchemaProvider, } impl Behaviour { - pub fn new( - store: &SqlStore, - ingest: SyncIngest, - schema_provider: SchemaProvider, - peer_id: &PeerId, - ) -> Self { + pub fn new() -> Self { Self { events: VecDeque::new(), - manager: SyncManager::new(store.clone(), ingest, peer_id.clone()), - schema_provider, } } } @@ -62,11 +51,7 @@ impl Behaviour { } fn handle_established_connection(&mut self, remote_peer_id: &PeerId) { - // @TODO: Have an async backend - self.send_message( - *remote_peer_id, - SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))), - ) + // @TODO } } @@ -144,207 +129,144 @@ impl NetworkBehaviour for Behaviour { mod tests { use futures::FutureExt; use libp2p::swarm::{keep_alive, Swarm}; - use libp2p::PeerId; use libp2p_swarm_test::SwarmExt; - use tokio::sync::broadcast; - use crate::replication::{Message, SyncIngest, SyncMessage, TargetSet}; - use crate::test_utils::{test_runner, test_runner_with_manager, TestNode, TestNodeManager}; + use crate::replication::{Message, SyncMessage, TargetSet}; use super::{Behaviour as ReplicationBehaviour, BehaviourOutEvent}; #[tokio::test] async fn peers_connect() { - let (tx, _rx) = broadcast::channel(8); - - test_runner_with_manager(|manager: TestNodeManager| async move { - let node_a = manager.create().await; - let node_b = manager.create().await; - - let peer_id_a = PeerId::random(); - let peer_id_b = PeerId::random(); - - // Create two swarms - let mut swarm1 = Swarm::new_ephemeral(|_| { - ReplicationBehaviour::new( - &node_a.context.store, - SyncIngest::new(node_a.context.schema_provider.clone(), tx.clone()), - node_a.context.schema_provider.clone(), - &peer_id_a, - ) - }); - let mut swarm2 = Swarm::new_ephemeral(|_| { - ReplicationBehaviour::new( - &node_a.context.store, - SyncIngest::new(node_b.context.schema_provider.clone(), tx.clone()), - node_b.context.schema_provider.clone(), - &peer_id_b, - ) - }); - - // Listen on swarm1 and connect from swarm2, this should establish a bi-directional - // connection. - swarm1.listen().await; - swarm2.connect(&mut swarm1).await; - - let swarm1_peer_id = *swarm1.local_peer_id(); - let swarm2_peer_id = *swarm2.local_peer_id(); - - let info1 = swarm1.network_info(); - let info2 = swarm2.network_info(); - - // Peers should be connected. - assert!(swarm2.is_connected(&swarm1_peer_id)); - assert!(swarm1.is_connected(&swarm2_peer_id)); - - // Each swarm should have exactly one connected peer. - assert_eq!(info1.num_peers(), 1); - assert_eq!(info2.num_peers(), 1); - - // Each swarm should have one established connection. - assert_eq!(info1.connection_counters().num_established(), 1); - assert_eq!(info2.connection_counters().num_established(), 1); - }); + // Create two swarms + let mut swarm1 = Swarm::new_ephemeral(|_| ReplicationBehaviour::new()); + let mut swarm2 = Swarm::new_ephemeral(|_| ReplicationBehaviour::new()); + + // Listen on swarm1 and connect from swarm2, this should establish a bi-directional + // connection. + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + let swarm1_peer_id = *swarm1.local_peer_id(); + let swarm2_peer_id = *swarm2.local_peer_id(); + + let info1 = swarm1.network_info(); + let info2 = swarm2.network_info(); + + // Peers should be connected. + assert!(swarm2.is_connected(&swarm1_peer_id)); + assert!(swarm1.is_connected(&swarm2_peer_id)); + + // Each swarm should have exactly one connected peer. + assert_eq!(info1.num_peers(), 1); + assert_eq!(info2.num_peers(), 1); + + // Each swarm should have one established connection. + assert_eq!(info1.connection_counters().num_established(), 1); + assert_eq!(info2.connection_counters().num_established(), 1); } #[tokio::test] async fn incompatible_network_behaviour() { - test_runner(|node: TestNode| async move { - let (tx, _rx) = broadcast::channel(8); - let peer_id = PeerId::random(); - - // Create two swarms - let mut swarm1 = Swarm::new_ephemeral(|_| { - ReplicationBehaviour::new( - &node.context.store, - SyncIngest::new(node.context.schema_provider.clone(), tx.clone()), - node.context.schema_provider.clone(), - &peer_id, - ) - }); - - let mut swarm2 = Swarm::new_ephemeral(|_| keep_alive::Behaviour); - - // Listen on swarm1 and connect from swarm2, this should establish a bi-directional connection. - swarm1.listen().await; - swarm2.connect(&mut swarm1).await; - - let swarm1_peer_id = *swarm1.local_peer_id(); - let swarm2_peer_id = *swarm2.local_peer_id(); - - let info1 = swarm1.network_info(); - let info2 = swarm2.network_info(); - - // Even though the network behaviours of our two peers are incompatible they still - // establish a connection. - - // Peers should be connected. - assert!(swarm2.is_connected(&swarm1_peer_id)); - assert!(swarm1.is_connected(&swarm2_peer_id)); - - // Each swarm should have exactly one connected peer. - assert_eq!(info1.num_peers(), 1); - assert_eq!(info2.num_peers(), 1); - - // Each swarm should have one established connection. - assert_eq!(info1.connection_counters().num_established(), 1); - assert_eq!(info2.connection_counters().num_established(), 1); - - // Send a message from to swarm1 local peer from swarm2 local peer. - swarm1.behaviour_mut().send_message( - swarm2_peer_id, - SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))), - ); - - // Await a swarm event on swarm2. - // - // We expect a timeout panic as no event will occur. - let result = std::panic::AssertUnwindSafe(swarm2.next_swarm_event()) - .catch_unwind() - .await; - - assert!(result.is_err()) - }); + // Create two swarms + let mut swarm1 = Swarm::new_ephemeral(|_| ReplicationBehaviour::new()); + + let mut swarm2 = Swarm::new_ephemeral(|_| keep_alive::Behaviour); + + // Listen on swarm1 and connect from swarm2, this should establish a bi-directional connection. + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + let swarm1_peer_id = *swarm1.local_peer_id(); + let swarm2_peer_id = *swarm2.local_peer_id(); + + let info1 = swarm1.network_info(); + let info2 = swarm2.network_info(); + + // Even though the network behaviours of our two peers are incompatible they still + // establish a connection. + + // Peers should be connected. + assert!(swarm2.is_connected(&swarm1_peer_id)); + assert!(swarm1.is_connected(&swarm2_peer_id)); + + // Each swarm should have exactly one connected peer. + assert_eq!(info1.num_peers(), 1); + assert_eq!(info2.num_peers(), 1); + + // Each swarm should have one established connection. + assert_eq!(info1.connection_counters().num_established(), 1); + assert_eq!(info2.connection_counters().num_established(), 1); + + // Send a message from to swarm1 local peer from swarm2 local peer. + swarm1.behaviour_mut().send_message( + swarm2_peer_id, + SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))), + ); + + // Await a swarm event on swarm2. + // + // We expect a timeout panic as no event will occur. + let result = std::panic::AssertUnwindSafe(swarm2.next_swarm_event()) + .catch_unwind() + .await; + + assert!(result.is_err()) } #[tokio::test] async fn swarm_behaviour_events() { - let (tx, _rx) = broadcast::channel(8); - - test_runner_with_manager(|manager: TestNodeManager| async move { - let node_a = manager.create().await; - let node_b = manager.create().await; - - let peer_id_a = PeerId::random(); - let peer_id_b = PeerId::random(); - - // Create two swarms - let mut swarm1 = Swarm::new_ephemeral(|_| { - ReplicationBehaviour::new( - &node_a.context.store, - SyncIngest::new(node_a.context.schema_provider.clone(), tx.clone()), - node_a.context.schema_provider.clone(), - &peer_id_a, - ) - }); - let mut swarm2 = Swarm::new_ephemeral(|_| { - ReplicationBehaviour::new( - &node_a.context.store, - SyncIngest::new(node_b.context.schema_provider.clone(), tx.clone()), - node_b.context.schema_provider.clone(), - &peer_id_b, - ) - }); - - // Listen on swarm1 and connect from swarm2, this should establish a bi-directional connection. - swarm1.listen().await; - swarm2.connect(&mut swarm1).await; - - let mut res1 = Vec::new(); - let mut res2 = Vec::new(); - - let swarm1_peer_id = *swarm1.local_peer_id(); - let swarm2_peer_id = *swarm2.local_peer_id(); - - // Send a message from to swarm1 local peer from swarm2 local peer. - swarm1.behaviour_mut().send_message( - swarm2_peer_id, - SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))), - ); - - // Send a message from to swarm2 local peer from swarm1 local peer. - swarm2.behaviour_mut().send_message( - swarm1_peer_id, - SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))), - ); - - // Collect the next 2 behaviour events which occur in either swarms. - for _ in 0..2 { - tokio::select! { - BehaviourOutEvent::MessageReceived(peer_id, message) = swarm1.next_behaviour_event() => res1.push((peer_id, message)), - BehaviourOutEvent::MessageReceived(peer_id, message) = swarm2.next_behaviour_event() => res2.push((peer_id, message)), - } + // Create two swarms + let mut swarm1 = Swarm::new_ephemeral(|_| ReplicationBehaviour::new()); + let mut swarm2 = Swarm::new_ephemeral(|_| ReplicationBehaviour::new()); + + // Listen on swarm1 and connect from swarm2, this should establish a bi-directional connection. + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + let mut res1 = Vec::new(); + let mut res2 = Vec::new(); + + let swarm1_peer_id = *swarm1.local_peer_id(); + let swarm2_peer_id = *swarm2.local_peer_id(); + + // Send a message from to swarm1 local peer from swarm2 local peer. + swarm1.behaviour_mut().send_message( + swarm2_peer_id, + SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))), + ); + + // Send a message from to swarm2 local peer from swarm1 local peer. + swarm2.behaviour_mut().send_message( + swarm1_peer_id, + SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))), + ); + + // Collect the next 2 behaviour events which occur in either swarms. + for _ in 0..2 { + tokio::select! { + BehaviourOutEvent::MessageReceived(peer_id, message) = swarm1.next_behaviour_event() => res1.push((peer_id, message)), + BehaviourOutEvent::MessageReceived(peer_id, message) = swarm2.next_behaviour_event() => res2.push((peer_id, message)), } + } - // Each swarm should have emitted exactly one event. - assert_eq!(res1.len(), 1); - assert_eq!(res2.len(), 1); - - // swarm1 should have received the message from swarm2 peer. - let (peer_id, message) = &res1[0]; - assert_eq!(peer_id, &swarm2_peer_id); - assert_eq!( - message, - &SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))) - ); - - // swarm2 should have received the message from swarm1 peer. - let (peer_id, message) = &res2[0]; - assert_eq!(peer_id, &swarm1_peer_id); - assert_eq!( - message, - &SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))) - ); - }); + // Each swarm should have emitted exactly one event. + assert_eq!(res1.len(), 1); + assert_eq!(res2.len(), 1); + + // swarm1 should have received the message from swarm2 peer. + let (peer_id, message) = &res1[0]; + assert_eq!(peer_id, &swarm2_peer_id); + assert_eq!( + message, + &SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))) + ); + + // swarm2 should have received the message from swarm1 peer. + let (peer_id, message) = &res2[0]; + assert_eq!(peer_id, &swarm1_peer_id); + assert_eq!( + message, + &SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))) + ); } } diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 2ad80055a..d7d8a22c0 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -37,14 +37,7 @@ pub async fn network_service( let network_config = context.config.network.clone(); // Build the network swarm and retrieve the local peer ID - let (mut swarm, local_peer_id) = swarm::build_swarm( - &network_config, - &context.store, - &context.schema_provider, - tx.clone(), - key_pair, - ) - .await?; + let (mut swarm, local_peer_id) = swarm::build_swarm(&network_config, key_pair).await?; // Define the QUIC multiaddress on which the swarm will listen for connections let quic_multiaddr = diff --git a/aquadoggo/src/network/swarm.rs b/aquadoggo/src/network/swarm.rs index 92f3b9f0e..2891871ce 100644 --- a/aquadoggo/src/network/swarm.rs +++ b/aquadoggo/src/network/swarm.rs @@ -9,18 +9,12 @@ use libp2p::PeerId; use libp2p::Swarm; use log::info; -use crate::bus::ServiceSender; -use crate::db::SqlStore; use crate::network::behaviour::Behaviour; use crate::network::transport; use crate::network::NetworkConfiguration; -use crate::schema::SchemaProvider; pub async fn build_swarm( network_config: &NetworkConfiguration, - store: &SqlStore, - schema_provider: &SchemaProvider, - tx: ServiceSender, key_pair: Keypair, ) -> Result<(Swarm, PeerId)> { // Read the peer ID (public key) from the key pair @@ -34,15 +28,7 @@ pub async fn build_swarm( // Instantiate the custom network behaviour with default configuration // and the libp2p peer ID - let behaviour = Behaviour::new( - network_config, - peer_id, - store, - schema_provider, - tx, - key_pair, - relay_client, - )?; + let behaviour = Behaviour::new(network_config, peer_id, key_pair, relay_client)?; // Initialise a swarm with QUIC transports, our composed network behaviour // and the default configuration parameters diff --git a/aquadoggo/src/node.rs b/aquadoggo/src/node.rs index 8573b5142..0da6886fc 100644 --- a/aquadoggo/src/node.rs +++ b/aquadoggo/src/node.rs @@ -11,6 +11,7 @@ use crate::http::http_service; use crate::manager::ServiceManager; use crate::materializer::materializer_service; use crate::network::network_service; +use crate::replication::replication_service; use crate::schema::SchemaProvider; /// Capacity of the internal broadcast channel used to communicate between services. @@ -81,6 +82,15 @@ impl Node { panic!("Failed starting network service"); } + // Start replication service syncing data with other nodes + if manager + .add("replication", replication_service) + .await + .is_err() + { + panic!("Failed starting replication service"); + } + Self { pool, manager } } diff --git a/aquadoggo/src/replication/mod.rs b/aquadoggo/src/replication/mod.rs index bb943f877..e3584fedc 100644 --- a/aquadoggo/src/replication/mod.rs +++ b/aquadoggo/src/replication/mod.rs @@ -5,6 +5,7 @@ mod ingest; mod manager; mod message; mod mode; +mod service; mod session; mod strategies; mod target_set; @@ -14,6 +15,7 @@ pub use ingest::SyncIngest; pub use manager::SyncManager; pub use message::{LiveMode, LogHeight, Message, SyncMessage}; pub use mode::Mode; +pub use service::replication_service; pub use session::{Session, SessionId, SessionState}; pub use strategies::{NaiveStrategy, SetReconciliationStrategy, StrategyResult}; pub use target_set::TargetSet; diff --git a/aquadoggo/src/replication/service.rs b/aquadoggo/src/replication/service.rs new file mode 100644 index 000000000..786b579ab --- /dev/null +++ b/aquadoggo/src/replication/service.rs @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use anyhow::Result; + +use crate::bus::ServiceSender; +use crate::context::Context; +use crate::manager::{ServiceReadySender, Shutdown}; + +pub async fn replication_service( + context: Context, + signal: Shutdown, + tx: ServiceSender, + tx_ready: ServiceReadySender, +) -> Result<()> { + Ok(()) +}