diff --git a/misc/allow-block-list/src/lib.rs b/misc/allow-block-list/src/lib.rs index 42d3c0ef613..9f2524733e6 100644 --- a/misc/allow-block-list/src/lib.rs +++ b/misc/allow-block-list/src/lib.rs @@ -284,7 +284,7 @@ mod tests { async fn cannot_dial_blocked_peer() { let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); - listener.listen().await; + listener.listen().with_memory_addr_external().await; dialer.behaviour_mut().block_peer(*listener.local_peer_id()); @@ -298,7 +298,7 @@ mod tests { async fn can_dial_unblocked_peer() { let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); - listener.listen().await; + listener.listen().with_memory_addr_external().await; dialer.behaviour_mut().block_peer(*listener.local_peer_id()); dialer @@ -312,7 +312,7 @@ mod tests { async fn blocked_peer_cannot_dial_us() { let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); - listener.listen().await; + listener.listen().with_memory_addr_external().await; listener.behaviour_mut().block_peer(*dialer.local_peer_id()); dial(&mut dialer, &listener).unwrap(); @@ -334,7 +334,7 @@ mod tests { async fn connections_get_closed_upon_blocked() { let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); - listener.listen().await; + listener.listen().with_memory_addr_external().await; dialer.connect(&mut listener).await; dialer.behaviour_mut().block_peer(*listener.local_peer_id()); @@ -360,7 +360,7 @@ mod tests { async fn cannot_dial_peer_unless_allowed() { let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); - listener.listen().await; + listener.listen().with_memory_addr_external().await; let DialError::Denied { cause } = dial(&mut dialer, &listener).unwrap_err() else { panic!("unexpected dial error") @@ -375,7 +375,7 @@ mod tests { async fn cannot_dial_disallowed_peer() { let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); - listener.listen().await; + listener.listen().with_memory_addr_external().await; dialer.behaviour_mut().allow_peer(*listener.local_peer_id()); dialer @@ -392,7 +392,7 @@ mod tests { async fn not_allowed_peer_cannot_dial_us() { let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); - listener.listen().await; + listener.listen().with_memory_addr_external().await; dialer .dial( @@ -429,7 +429,7 @@ mod tests { async fn connections_get_closed_upon_disallow() { let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); - listener.listen().await; + listener.listen().with_memory_addr_external().await; dialer.behaviour_mut().allow_peer(*listener.local_peer_id()); listener.behaviour_mut().allow_peer(*dialer.local_peer_id()); diff --git a/misc/connection-limits/src/lib.rs b/misc/connection-limits/src/lib.rs index b873da76be7..d0ea3436177 100644 --- a/misc/connection-limits/src/lib.rs +++ b/misc/connection-limits/src/lib.rs @@ -448,7 +448,7 @@ mod tests { }); async_std::task::block_on(async { - let (listen_addr, _) = swarm1.listen().await; + let (listen_addr, _) = swarm1.listen().with_memory_addr_external().await; for _ in 0..limit { swarm2.connect(&mut swarm1).await; diff --git a/protocols/autonat/tests/test_client.rs b/protocols/autonat/tests/test_client.rs index 743f4cc1b51..7509d3ef425 100644 --- a/protocols/autonat/tests/test_client.rs +++ b/protocols/autonat/tests/test_client.rs @@ -155,7 +155,7 @@ async fn test_confidence() { // Randomly test either for public or for private status the confidence. let test_public = rand::random::(); if test_public { - client.listen().await; + client.listen().with_memory_addr_external().await; } else { let unreachable_addr = "/ip4/127.0.0.1/tcp/42".parse().unwrap(); client.behaviour_mut().probe_address(unreachable_addr); diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 6078b101fa2..93661f1cba5 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -40,15 +40,9 @@ async fn connect() { let mut src = build_client(); // Have all swarms listen on a local TCP address. - let (memory_addr, relay_addr) = relay.listen().await; - relay.remove_external_address(&memory_addr); - relay.add_external_address(relay_addr.clone()); - - let (dst_mem_addr, dst_tcp_addr) = dst.listen().await; - let (src_mem_addr, _) = src.listen().await; - - dst.remove_external_address(&dst_mem_addr); - src.remove_external_address(&src_mem_addr); + let (_, relay_tcp_addr) = relay.listen().with_tcp_addr_external().await; + let (_, dst_tcp_addr) = dst.listen().await; + src.listen().await; assert!(src.external_addresses().next().is_none()); assert!(dst.external_addresses().next().is_none()); @@ -58,7 +52,7 @@ async fn connect() { async_std::task::spawn(relay.loop_on_next()); - let dst_relayed_addr = relay_addr + let dst_relayed_addr = relay_tcp_addr .with(Protocol::P2p(relay_peer_id)) .with(Protocol::P2pCircuit) .with(Protocol::P2p(dst_peer_id)); diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index e4e4c90d768..e8577bc78cf 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -122,7 +122,7 @@ async fn build_node() -> Swarm { .unwrap(); gossipsub::Behaviour::new(MessageAuthenticity::Author(peer_id), config).unwrap() }); - swarm.listen().await; + swarm.listen().with_memory_addr_external().await; swarm } diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index c1926b4125f..8d11ef96d50 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -24,7 +24,8 @@ async fn periodic_identify() { }); let swarm2_peer_id = *swarm2.local_peer_id(); - let (swarm1_memory_listen, swarm1_tcp_listen_addr) = swarm1.listen().await; + let (swarm1_memory_listen, swarm1_tcp_listen_addr) = + swarm1.listen().with_memory_addr_external().await; let (swarm2_memory_listen, swarm2_tcp_listen_addr) = swarm2.listen().await; swarm2.connect(&mut swarm1).await; @@ -92,7 +93,7 @@ async fn identify_push() { ) }); - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; // First, let the periodic identify do its thing. @@ -142,7 +143,7 @@ async fn discover_peer_after_disconnect() { ) }); - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; let swarm1_peer_id = *swarm1.local_peer_id(); diff --git a/protocols/kad/tests/client_mode.rs b/protocols/kad/tests/client_mode.rs index 13bf08bd288..5324e679ab9 100644 --- a/protocols/kad/tests/client_mode.rs +++ b/protocols/kad/tests/client_mode.rs @@ -14,7 +14,7 @@ async fn server_gets_added_to_routing_table_by_client() { let mut client = Swarm::new_ephemeral(MyBehaviour::new); let mut server = Swarm::new_ephemeral(MyBehaviour::new); - server.listen().await; + server.listen().with_memory_addr_external().await; client.connect(&mut server).await; let server_peer_id = *server.local_peer_id(); @@ -37,7 +37,7 @@ async fn two_servers_add_each_other_to_routing_table() { let mut server1 = Swarm::new_ephemeral(MyBehaviour::new); let mut server2 = Swarm::new_ephemeral(MyBehaviour::new); - server2.listen().await; + server2.listen().with_memory_addr_external().await; server1.connect(&mut server2).await; let server1_peer_id = *server1.local_peer_id(); @@ -54,7 +54,7 @@ async fn two_servers_add_each_other_to_routing_table() { other => panic!("Unexpected events: {other:?}"), } - server1.listen().await; + server1.listen().with_memory_addr_external().await; server2.connect(&mut server1).await; async_std::task::spawn(server1.loop_on_next()); @@ -79,14 +79,11 @@ async fn adding_an_external_addresses_activates_server_mode_on_existing_connecti let (memory_addr, _) = server.listen().await; - // Remove memory address to simulate a server that doesn't know its external address. - server.remove_external_address(&memory_addr); client.dial(memory_addr.clone()).unwrap(); - // Do the usual identify send/receive dance. This triggers a mode change to Mode::Client. + + // Do the usual identify send/receive dance. match libp2p_swarm_test::drive(&mut client, &mut server).await { - ([Identify(_), Identify(_)], [Kad(ModeChanged { new_mode }), Identify(_), Identify(_)]) => { - assert_eq!(new_mode, Mode::Client); - } + ([Identify(_), Identify(_)], [Identify(_), Identify(_)]) => {} other => panic!("Unexpected events: {other:?}"), } @@ -115,7 +112,7 @@ async fn set_client_to_server_mode() { let mut server = Swarm::new_ephemeral(MyBehaviour::new); - server.listen().await; + server.listen().with_memory_addr_external().await; client.connect(&mut server).await; let server_peer_id = *server.local_peer_id(); diff --git a/protocols/perf/tests/lib.rs b/protocols/perf/tests/lib.rs index a79e8dd36b3..447d8a06110 100644 --- a/protocols/perf/tests/lib.rs +++ b/protocols/perf/tests/lib.rs @@ -33,7 +33,7 @@ async fn perf() { let server_peer_id = *server.local_peer_id(); let mut client = Swarm::new_ephemeral(|_| client::Behaviour::new()); - server.listen().await; + server.listen().with_memory_addr_external().await; client.connect(&mut server).await; tokio::task::spawn(server.loop_on_next()); diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 946a2daadb6..3ca469f16a8 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -36,7 +36,7 @@ fn ping_pong() { let mut swarm2 = Swarm::new_ephemeral(|_| ping::Behaviour::new(cfg.clone())); async_std::task::block_on(async { - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; for _ in 0..count.get() { @@ -67,7 +67,7 @@ fn unsupported_doesnt_fail() { let mut swarm2 = Swarm::new_ephemeral(|_| ping::Behaviour::new(ping::Config::new())); let result = async_std::task::block_on(async { - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; let swarm1_peer_id = *swarm1.local_peer_id(); async_std::task::spawn(swarm1.loop_on_next()); diff --git a/protocols/rendezvous/tests/rendezvous.rs b/protocols/rendezvous/tests/rendezvous.rs index 67b4bc6ad57..fec56365768 100644 --- a/protocols/rendezvous/tests/rendezvous.rs +++ b/protocols/rendezvous/tests/rendezvous.rs @@ -429,7 +429,7 @@ async fn new_server_with_connected_clients( async fn new_client() -> Swarm { let mut client = Swarm::new_ephemeral(rendezvous::client::Behaviour::new); - client.listen().await; // we need to listen otherwise we don't have addresses to register + client.listen().with_memory_addr_external().await; // we need to listen otherwise we don't have addresses to register client } @@ -437,7 +437,7 @@ async fn new_client() -> Swarm { async fn new_server(config: rendezvous::server::Config) -> Swarm { let mut server = Swarm::new_ephemeral(|_| rendezvous::server::Behaviour::new(config)); - server.listen().await; + server.listen().with_memory_addr_external().await; server } @@ -447,7 +447,7 @@ async fn new_combined_node() -> Swarm { client: rendezvous::client::Behaviour::new(identity), server: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()), }); - node.listen().await; + node.listen().with_memory_addr_external().await; node } @@ -458,7 +458,7 @@ async fn new_impersonating_client() -> Swarm { // As such, the best we can do is hand eve a completely different keypair from what she is using to authenticate her connection. let someone_else = identity::Keypair::generate_ed25519(); let mut eve = Swarm::new_ephemeral(move |_| rendezvous::client::Behaviour::new(someone_else)); - eve.listen().await; + eve.listen().with_memory_addr_external().await; eve } diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index cf651d395f5..2256403c0e4 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -21,7 +21,7 @@ async fn report_outbound_failure_on_read_response() { let (peer1_id, mut swarm1) = new_swarm(); let (peer2_id, mut swarm2) = new_swarm(); - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; let server_task = async move { @@ -75,7 +75,7 @@ async fn report_outbound_failure_on_write_request() { let (peer1_id, mut swarm1) = new_swarm(); let (_peer2_id, mut swarm2) = new_swarm(); - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; // Expects no events because `Event::Request` is produced after `read_request`. @@ -117,7 +117,7 @@ async fn report_outbound_timeout_on_read_response() { let (peer1_id, mut swarm1) = new_swarm_with_timeout(Duration::from_millis(200)); let (peer2_id, mut swarm2) = new_swarm_with_timeout(Duration::from_millis(100)); - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; let server_task = async move { @@ -161,7 +161,7 @@ async fn report_inbound_failure_on_read_request() { let (peer1_id, mut swarm1) = new_swarm(); let (_peer2_id, mut swarm2) = new_swarm(); - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; // Expects no events because `Event::Request` is produced after `read_request`. @@ -196,7 +196,7 @@ async fn report_inbound_failure_on_write_response() { let (peer1_id, mut swarm1) = new_swarm(); let (peer2_id, mut swarm2) = new_swarm(); - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; // Expects OutboundFailure::Io failure with `FailOnWriteResponse` error @@ -261,7 +261,7 @@ async fn report_inbound_timeout_on_write_response() { let (peer1_id, mut swarm1) = new_swarm_with_timeout(Duration::from_millis(100)); let (peer2_id, mut swarm2) = new_swarm_with_timeout(Duration::from_millis(200)); - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; // Expects InboundFailure::Timeout diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 37f21264d49..c751dc2b3dd 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -97,7 +97,7 @@ async fn ping_protocol() { }); let peer2_id = *swarm2.local_peer_id(); - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; let expected_ping = ping.clone(); @@ -190,7 +190,7 @@ async fn emits_inbound_connection_closed_failure() { }); let peer2_id = *swarm2.local_peer_id(); - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); @@ -255,7 +255,7 @@ async fn emits_inbound_connection_closed_if_channel_is_dropped() { }); let peer2_id = *swarm2.local_peer_id(); - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); diff --git a/swarm-test/src/lib.rs b/swarm-test/src/lib.rs index 41a606b300c..85bd9c22e9a 100644 --- a/swarm-test/src/lib.rs +++ b/swarm-test/src/lib.rs @@ -19,8 +19,8 @@ // DEALINGS IN THE SOFTWARE. use async_trait::async_trait; -use futures::future::Either; -use futures::StreamExt; +use futures::future::{BoxFuture, Either}; +use futures::{FutureExt, StreamExt}; use libp2p_core::{ multiaddr::Protocol, transport::MemoryTransport, upgrade::Version, Multiaddr, Transport, }; @@ -32,6 +32,7 @@ use libp2p_swarm::{ }; use libp2p_yamux as yamux; use std::fmt::Debug; +use std::future::IntoFuture; use std::time::Duration; /// An extension trait for [`Swarm`] that makes it easier to set up a network of [`Swarm`]s for tests. @@ -49,6 +50,10 @@ pub trait SwarmExt { Self: Sized; /// Establishes a connection to the given [`Swarm`], polling both of them until the connection is established. + /// + /// This will take addresses from the `other` [`Swarm`] via [`Swarm::external_addresses`]. + /// By default, this iterator will not yield any addresses. + /// To add listen addresses as external addresses, use [`ListenFuture::with_memory_addr_external`] or [`ListenFuture::with_tcp_addr_external`]. async fn connect(&mut self, other: &mut Swarm) where T: NetworkBehaviour + Send, @@ -73,7 +78,7 @@ pub trait SwarmExt { /// Listens for incoming connections, polling the [`Swarm`] until the transport is ready to accept connections. /// /// The first address is for the memory transport, the second one for the TCP transport. - async fn listen(&mut self) -> (Multiaddr, Multiaddr); + fn listen(&mut self) -> ListenFuture<&mut Self>; /// Returns the next [`SwarmEvent`] or times out after 10 seconds. /// @@ -292,53 +297,12 @@ where } } - async fn listen(&mut self) -> (Multiaddr, Multiaddr) { - let memory_addr_listener_id = self.listen_on(Protocol::Memory(0).into()).unwrap(); - - // block until we are actually listening - let memory_multiaddr = self - .wait(|e| match e { - SwarmEvent::NewListenAddr { - address, - listener_id, - } => (listener_id == memory_addr_listener_id).then_some(address), - other => { - log::debug!( - "Ignoring {:?} while waiting for listening to succeed", - other - ); - None - } - }) - .await; - - // Memory addresses are externally reachable because they all share the same memory-space. - self.add_external_address(memory_multiaddr.clone()); - - let tcp_addr_listener_id = self - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap(); - - let tcp_multiaddr = self - .wait(|e| match e { - SwarmEvent::NewListenAddr { - address, - listener_id, - } => (listener_id == tcp_addr_listener_id).then_some(address), - other => { - log::debug!( - "Ignoring {:?} while waiting for listening to succeed", - other - ); - None - } - }) - .await; - - // We purposely don't add the TCP addr as an external one because we want to only use the memory transport for making connections in here. - // The TCP transport is only supported for protocols that manage their own connections. - - (memory_multiaddr, tcp_multiaddr) + fn listen(&mut self) -> ListenFuture<&mut Self> { + ListenFuture { + add_memory_external: false, + add_tcp_external: false, + swarm: self, + } } async fn next_swarm_event( @@ -373,3 +337,87 @@ where } } } + +pub struct ListenFuture { + add_memory_external: bool, + add_tcp_external: bool, + swarm: S, +} + +impl ListenFuture { + /// Adds the memory address we are starting to listen on as an external address using [`Swarm::add_external_address`]. + /// + /// This is typically "safe" for tests because within a process, memory addresses are "globally" reachable. + /// However, some tests depend on which addresses are external and need this to be configurable so it is not a good default. + pub fn with_memory_addr_external(mut self) -> Self { + self.add_memory_external = true; + + self + } + + /// Adds the TCP address we are starting to listen on as an external address using [`Swarm::add_external_address`]. + /// + /// This is typically "safe" for tests because on the same machine, 127.0.0.1 is reachable for other [`Swarm`]s. + /// However, some tests depend on which addresses are external and need this to be configurable so it is not a good default. + pub fn with_tcp_addr_external(mut self) -> Self { + self.add_tcp_external = true; + + self + } +} + +impl<'s, B> IntoFuture for ListenFuture<&'s mut Swarm> +where + B: NetworkBehaviour + Send, + ::ToSwarm: Debug, +{ + type Output = (Multiaddr, Multiaddr); + type IntoFuture = BoxFuture<'s, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + async move { + let swarm = self.swarm; + + let memory_addr_listener_id = swarm.listen_on(Protocol::Memory(0).into()).unwrap(); + + // block until we are actually listening + let memory_multiaddr = swarm + .wait(|e| match e { + SwarmEvent::NewListenAddr { + address, + listener_id, + } => (listener_id == memory_addr_listener_id).then_some(address), + other => { + panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}") + } + }) + .await; + + let tcp_addr_listener_id = swarm + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) + .unwrap(); + + let tcp_multiaddr = swarm + .wait(|e| match e { + SwarmEvent::NewListenAddr { + address, + listener_id, + } => (listener_id == tcp_addr_listener_id).then_some(address), + other => { + panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}") + } + }) + .await; + + if self.add_memory_external { + swarm.add_external_address(memory_multiaddr.clone()); + } + if self.add_tcp_external { + swarm.add_external_address(tcp_multiaddr.clone()); + } + + (memory_multiaddr, tcp_multiaddr) + } + .boxed() + } +} diff --git a/swarm/tests/connection_close.rs b/swarm/tests/connection_close.rs index a44518fa4ad..305e33c1804 100644 --- a/swarm/tests/connection_close.rs +++ b/swarm/tests/connection_close.rs @@ -16,7 +16,7 @@ async fn sends_remaining_events_to_behaviour_on_connection_close() { let mut swarm1 = Swarm::new_ephemeral(|_| Behaviour::new(3)); let mut swarm2 = Swarm::new_ephemeral(|_| Behaviour::new(3)); - swarm2.listen().await; + swarm2.listen().with_memory_addr_external().await; swarm1.connect(&mut swarm2).await; swarm1.disconnect_peer_id(*swarm2.local_peer_id()).unwrap();