From d57ae7ba626a7e09221f83cbfbe2e354556c9892 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 23 Aug 2023 12:17:53 +0200 Subject: [PATCH 01/29] Revert "refactor(perf): use libp2p-request-response" This reverts commit 92af0d1281d9cb37d0fe0d98b170fe1c3c6f7ed0. --- Cargo.lock | 2 - protocols/perf/Cargo.toml | 7 +- protocols/perf/Dockerfile | 10 +- protocols/perf/src/bin/perf.rs | 15 +- protocols/perf/src/client.rs | 234 ++----------------- protocols/perf/src/client/behaviour.rs | 5 +- protocols/perf/src/client/handler.rs | 9 +- protocols/perf/src/lib.rs | 1 + protocols/perf/src/protocol.rs | 311 +++++++++++++------------ protocols/perf/src/server.rs | 149 +----------- protocols/perf/src/server/behaviour.rs | 5 +- protocols/perf/src/server/handler.rs | 6 +- protocols/perf/tests/lib.rs | 7 +- 13 files changed, 211 insertions(+), 550 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a6fd9cb4f1b..b830b31cf4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2980,7 +2980,6 @@ name = "libp2p-perf" version = "0.2.0" dependencies = [ "anyhow", - "async-trait", "clap", "env_logger 0.10.0", "futures", @@ -2989,7 +2988,6 @@ dependencies = [ "libp2p-dns", "libp2p-identity", "libp2p-quic", - "libp2p-request-response", "libp2p-swarm", "libp2p-swarm-test", "libp2p-tcp", diff --git a/protocols/perf/Cargo.toml b/protocols/perf/Cargo.toml index 41b91ea1209..cc5d41c1dfc 100644 --- a/protocols/perf/Cargo.toml +++ b/protocols/perf/Cargo.toml @@ -12,7 +12,6 @@ categories = ["network-programming", "asynchronous"] [dependencies] anyhow = "1" -async-trait = "0.1" clap = { version = "4.3.23", features = ["derive"] } env_logger = "0.10.0" futures = "0.3.28" @@ -22,16 +21,16 @@ libp2p-dns = { workspace = true, features = ["tokio"] } libp2p-identity = { workspace = true } libp2p-tls = { workspace = true } libp2p-quic = { workspace = true, features = ["tokio"] } -libp2p-request-response = { workspace = true } libp2p-swarm = { workspace = true, features = ["macros", "tokio"] } libp2p-tcp = { workspace = true, features = ["tokio"] } libp2p-yamux = { workspace = true } log = "0.4" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" thiserror = "1.0" +# TODO: Full needed? tokio = { version = "1.32.0", features = ["full"] } void = "1" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" [dev-dependencies] rand = "0.8" diff --git a/protocols/perf/Dockerfile b/protocols/perf/Dockerfile index 6523e3bede1..aef8eed1cad 100644 --- a/protocols/perf/Dockerfile +++ b/protocols/perf/Dockerfile @@ -9,10 +9,14 @@ RUN --mount=type=cache,target=./target \ cargo build --release --package libp2p-perf RUN --mount=type=cache,target=./target \ - mv ./target/release/perf /usr/local/bin/perf + mv ./target/release/perf-server /usr/local/bin/perf-server + +RUN --mount=type=cache,target=./target \ + mv ./target/release/perf-client /usr/local/bin/perf-client FROM debian:bullseye-slim -COPY --from=builder /usr/local/bin/perf /app/perf +COPY --from=builder /usr/local/bin/perf-server /usr/local/bin/perf-server +COPY --from=builder /usr/local/bin/perf-client /usr/local/bin/perf-client -ENTRYPOINT [ "/app/perf" ] +ENTRYPOINT [ "perf-server"] diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index b6b090608f5..c2ce8692e09 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -30,6 +30,7 @@ use libp2p_core::{ Transport as _, }; use libp2p_identity::PeerId; +use libp2p_perf::server::Event; use libp2p_perf::{Run, RunDuration, RunParams}; use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; use log::{error, info}; @@ -135,7 +136,7 @@ async fn server(server_address: SocketAddr) -> Result<()> { info!("Established connection to {:?} via {:?}", peer_id, endpoint); } SwarmEvent::ConnectionClosed { .. } => {} - SwarmEvent::Behaviour(()) => { + SwarmEvent::Behaviour(Event { .. }) => { info!("Finished run",) } e => panic!("{e:?}"), @@ -450,18 +451,18 @@ async fn perf( swarm: &mut Swarm, server_peer_id: PeerId, params: RunParams, -) -> Result { +) -> Result { swarm.behaviour_mut().perf(server_peer_id, params)?; - let duration = match swarm.next().await.unwrap() { + let run = match swarm.next().await.unwrap() { SwarmEvent::Behaviour(libp2p_perf::client::Event { id: _, - result: Ok(duration), - }) => duration, + result: Ok(run), + }) => run, e => panic!("{e:?}"), }; - info!("{}", Run { params, duration }); + info!("{run}"); - Ok(duration) + Ok(run) } diff --git a/protocols/perf/src/client.rs b/protocols/perf/src/client.rs index 93c2086a49e..5220c4838af 100644 --- a/protocols/perf/src/client.rs +++ b/protocols/perf/src/client.rs @@ -18,232 +18,24 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use instant::Duration; +mod behaviour; +mod handler; -use std::{ - collections::HashSet, - task::{Context, Poll}, -}; +use instant::Instant; +use std::sync::atomic::{AtomicUsize, Ordering}; +use crate::RunParams; -use libp2p_core::Multiaddr; -use libp2p_identity::PeerId; -use libp2p_request_response as request_response; -use libp2p_swarm::{ - derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionId, FromSwarm, - NetworkBehaviour, PollParameters, THandlerInEvent, THandlerOutEvent, ToSwarm, -}; +pub use behaviour::{Behaviour, Event}; -use crate::{protocol::Response, RunDuration, RunParams}; +static NEXT_RUN_ID: AtomicUsize = AtomicUsize::new(1); /// Connection identifier. -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] -pub struct RunId(request_response::RequestId); +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct RunId(usize); -impl From for RunId { - fn from(value: request_response::RequestId) -> Self { - Self(value) - } -} - -#[derive(Debug)] -pub struct Event { - pub id: RunId, - pub result: Result, -} - -pub struct Behaviour { - connected: HashSet, - request_response: request_response::Behaviour, -} - -impl Default for Behaviour { - fn default() -> Self { - let mut req_resp_config = request_response::Config::default(); - req_resp_config.set_connection_keep_alive(Duration::from_secs(60 * 5)); - req_resp_config.set_request_timeout(Duration::from_secs(60 * 5)); - Self { - connected: Default::default(), - request_response: request_response::Behaviour::new( - std::iter::once(( - crate::PROTOCOL_NAME, - request_response::ProtocolSupport::Outbound, - )), - req_resp_config, - ), - } - } -} - -impl Behaviour { - pub fn new() -> Self { - Self::default() - } - - pub fn perf(&mut self, server: PeerId, params: RunParams) -> Result { - if !self.connected.contains(&server) { - return Err(PerfError::NotConnected); - } - - let id = self.request_response.send_request(&server, params).into(); - - Ok(id) - } -} - -#[derive(thiserror::Error, Debug)] -pub enum PerfError { - #[error("Not connected to peer")] - NotConnected, -} - -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = - as NetworkBehaviour>::ConnectionHandler; - type ToSwarm = Event; - - fn handle_pending_outbound_connection( - &mut self, - connection_id: ConnectionId, - maybe_peer: Option, - addresses: &[Multiaddr], - effective_role: libp2p_core::Endpoint, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.request_response.handle_pending_outbound_connection( - connection_id, - maybe_peer, - addresses, - effective_role, - ) - } - - fn handle_established_outbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - addr: &Multiaddr, - role_override: libp2p_core::Endpoint, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.request_response - .handle_established_outbound_connection(connection_id, peer, addr, role_override) - } - - fn handle_pending_inbound_connection( - &mut self, - connection_id: ConnectionId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result<(), libp2p_swarm::ConnectionDenied> { - self.request_response.handle_pending_inbound_connection( - connection_id, - local_addr, - remote_addr, - ) - } - - fn handle_established_inbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.request_response.handle_established_inbound_connection( - connection_id, - peer, - local_addr, - remote_addr, - ) - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => { - self.connected.insert(peer_id); - } - FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id: _, - endpoint: _, - handler: _, - remaining_established, - }) => { - if remaining_established == 0 { - assert!(self.connected.remove(&peer_id)); - } - } - FromSwarm::AddressChange(_) - | FromSwarm::DialFailure(_) - | FromSwarm::ListenFailure(_) - | FromSwarm::NewListener(_) - | FromSwarm::NewListenAddr(_) - | FromSwarm::ExpiredListenAddr(_) - | FromSwarm::ListenerError(_) - | FromSwarm::ListenerClosed(_) - | FromSwarm::NewExternalAddrCandidate(_) - | FromSwarm::ExternalAddrConfirmed(_) - | FromSwarm::ExternalAddrExpired(_) => {} - }; - - self.request_response.on_swarm_event(event); - } - - fn on_connection_handler_event( - &mut self, - peer_id: PeerId, - connection_id: ConnectionId, - event: THandlerOutEvent, - ) { - self.request_response - .on_connection_handler_event(peer_id, connection_id, event); - } - - fn poll( - &mut self, - cx: &mut Context<'_>, - params: &mut impl PollParameters, - ) -> Poll>> { - self.request_response.poll(cx, params).map(|to_swarm| { - to_swarm.map_out(|m| match m { - request_response::Event::Message { - peer: _, - message: - request_response::Message::Response { - request_id, - response: Response::Receiver(run_duration), - }, - } => Event { - id: request_id.into(), - result: Ok(run_duration), - }, - request_response::Event::Message { - peer: _, - message: - request_response::Message::Response { - response: Response::Sender(_), - .. - }, - } => unreachable!(), - request_response::Event::Message { - peer: _, - message: request_response::Message::Request { .. }, - } => { - unreachable!() - } - request_response::Event::OutboundFailure { - peer: _, - request_id, - error, - } => Event { - id: request_id.into(), - result: Err(error), - }, - request_response::Event::InboundFailure { - peer: _, - request_id: _, - error: _, - } => unreachable!(), - request_response::Event::ResponseSent { .. } => unreachable!(), - }) - }) +impl RunId { + /// Returns the next available [`RunId`]. + pub(crate) fn next() -> Self { + Self(NEXT_RUN_ID.fetch_add(1, Ordering::SeqCst)) } } diff --git a/protocols/perf/src/client/behaviour.rs b/protocols/perf/src/client/behaviour.rs index 912f6d5bb9e..1fbf4ccf695 100644 --- a/protocols/perf/src/client/behaviour.rs +++ b/protocols/perf/src/client/behaviour.rs @@ -35,13 +35,14 @@ use libp2p_swarm::{ use void::Void; use crate::client::handler::Handler; +use crate::{Run, RunDuration, RunParams}; -use super::{RunId, RunParams, RunStats}; +use super::RunId; #[derive(Debug)] pub struct Event { pub id: RunId, - pub result: Result>, + pub result: Result>, } #[derive(Default)] diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index 8a6df43d198..9cc52a4cc97 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -36,7 +36,8 @@ use libp2p_swarm::{ }; use void::Void; -use super::{RunId, RunParams, RunStats}; +use super::RunId; +use crate::{Run, RunDuration, RunParams}; #[derive(Debug)] pub struct Command { @@ -47,7 +48,7 @@ pub struct Command { #[derive(Debug)] pub struct Event { pub(crate) id: RunId, - pub(crate) result: Result>, + pub(crate) result: Result>, } pub struct Handler { @@ -129,9 +130,9 @@ impl ConnectionHandler for Handler { .expect("opened a stream without a pending command"); self.outbound.push( crate::protocol::send_receive(params, protocol) - .map_ok(move |timers| Event { + .map_ok(move |duration| Event { id, - result: Ok(RunStats { params, timers }), + result: Ok(Run { params, duration }), }) .boxed(), ); diff --git a/protocols/perf/src/lib.rs b/protocols/perf/src/lib.rs index b2b12244341..ac7d0963237 100644 --- a/protocols/perf/src/lib.rs +++ b/protocols/perf/src/lib.rs @@ -52,6 +52,7 @@ pub struct RunDuration { pub download: Duration, } +#[derive(Debug, Clone, Copy)] pub struct Run { pub params: RunParams, pub duration: RunDuration, diff --git a/protocols/perf/src/protocol.rs b/protocols/perf/src/protocol.rs index 4b454fb88b7..bbfc69e40f6 100644 --- a/protocols/perf/src/protocol.rs +++ b/protocols/perf/src/protocol.rs @@ -18,184 +18,187 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use async_trait::async_trait; -use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use instant::Instant; -use libp2p_request_response as request_response; -use libp2p_swarm::StreamProtocol; -use std::io; -use crate::{RunDuration, RunParams}; +use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -const BUF: [u8; 65536] = [0; 64 << 10]; +use crate::{client, server, RunParams, RunDuration, Run}; -#[derive(Debug)] -pub enum Response { - Sender(usize), - Receiver(RunDuration), -} +const BUF: [u8; 1024] = [0; 1024]; -#[derive(Default)] -pub struct Codec { - to_receive: Option, +pub(crate) async fn send_receive( + params: RunParams, + mut stream: S, +) -> Result { + let RunParams { + to_send, + to_receive, + } = params; - write_start: Option, - read_start: Option, - read_done: Option, -} + let mut receive_buf = vec![0; 1024]; + + stream.write_all(&(to_receive as u64).to_be_bytes()).await?; + + let write_start = Instant::now(); + + let mut sent = 0; + while sent < to_send { + let n = std::cmp::min(to_send - sent, BUF.len()); + let buf = &BUF[..n]; + + sent += stream.write(buf).await?; + } + + stream.close().await?; -impl Clone for Codec { - fn clone(&self) -> Self { - Default::default() + let write_done = Instant::now(); + + let mut received = 0; + while received < to_receive { + received += stream.read(&mut receive_buf).await?; } + + let read_done = Instant::now(); + + Ok(RunDuration { + upload: write_done.duration_since(write_start), + download: read_done.duration_since(write_done), + }) } -#[async_trait] -impl request_response::Codec for Codec { - /// The type of protocol(s) or protocol versions being negotiated. - type Protocol = StreamProtocol; - /// The type of inbound and outbound requests. - type Request = RunParams; - /// The type of inbound and outbound responses. - type Response = Response; - - /// Reads a request from the given I/O stream according to the - /// negotiated protocol. - async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut receive_buf = vec![0; 64 << 10]; - - let to_send = { - let mut buf = [0; 8]; - io.read_exact(&mut buf).await?; - - u64::from_be_bytes(buf) as usize - }; - - let mut received = 0; - loop { - let n = io.read(&mut receive_buf).await?; - received += n; - if n == 0 { - break; - } +pub(crate) async fn receive_send( + mut stream: S, +) -> Result { + let to_send = { + let mut buf = [0; 8]; + stream.read_exact(&mut buf).await?; + + u64::from_be_bytes(buf) as usize + }; + + let read_start = Instant::now(); + + let mut receive_buf = vec![0; 1024]; + let mut received = 0; + loop { + let n = stream.read(&mut receive_buf).await?; + received += n; + if n == 0 { + break; } + } + + let read_done = Instant::now(); - Ok(RunParams { - to_receive: received, - to_send, - }) + let mut sent = 0; + while sent < to_send { + let n = std::cmp::min(to_send - sent, BUF.len()); + let buf = &BUF[..n]; + + sent += stream.write(buf).await?; } - /// Reads a response from the given I/O stream according to the - /// negotiated protocol. - async fn read_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - assert!(self.write_start.is_some()); - assert_eq!(self.read_start, None); - assert_eq!(self.read_done, None); - - self.read_start = Some(Instant::now()); - - let mut receive_buf = vec![0; 64 << 10]; - - let mut received = 0; - loop { - let n = io.read(&mut receive_buf).await?; - received += n; - // Make sure to wait for the remote to close the stream. Otherwise with `to_receive` of `0` - // one does not measure the full round-trip of the previous write. - if n == 0 { - break; - } - } + stream.close().await?; + let write_done = Instant::now(); + + Ok(Run { + params: RunParams { to_send: sent, to_receive: received }, + duration: RunDuration { + upload: write_done.duration_since(read_done), + download: read_done.duration_since(read_start), + }, + }) +} - self.read_done = Some(Instant::now()); +#[cfg(test)] +mod tests { + use super::*; + use futures::{executor::block_on, AsyncRead, AsyncWrite}; + use std::{ + pin::Pin, + sync::{Arc, Mutex}, + task::Poll, + }; + + #[derive(Clone)] + struct DummyStream { + inner: Arc>, + } - assert_eq!(received, self.to_receive.unwrap()); + struct DummyStreamInner { + read: Vec, + write: Vec, + } - Ok(Response::Receiver(RunDuration { - upload: self - .read_start - .unwrap() - .duration_since(self.write_start.unwrap()), - download: self - .read_done - .unwrap() - .duration_since(self.read_start.unwrap()), - })) + impl DummyStream { + fn new(read: Vec) -> Self { + Self { + inner: Arc::new(Mutex::new(DummyStreamInner { + read, + write: Vec::new(), + })), + } + } } - /// Writes a request to the given I/O stream according to the - /// negotiated protocol. - async fn write_request( - &mut self, - _: &Self::Protocol, - io: &mut T, - req: Self::Request, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - assert_eq!(self.to_receive, None); - assert_eq!(self.write_start, None); - assert_eq!(self.read_start, None); - assert_eq!(self.read_done, None); - - self.write_start = Some(Instant::now()); - - let RunParams { - to_send, - to_receive, - } = req; - - self.to_receive = Some(to_receive); - - io.write_all(&(to_receive as u64).to_be_bytes()).await?; - - let mut sent = 0; - while sent < to_send { - let n = std::cmp::min(to_send - sent, BUF.len()); - let buf = &BUF[..n]; - - sent += io.write(buf).await?; + impl Unpin for DummyStream {} + + impl AsyncWrite for DummyStream { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + Pin::new(&mut self.inner.lock().unwrap().write).poll_write(cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.inner.lock().unwrap().write).poll_flush(cx) } - Ok(()) + fn poll_close( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.inner.lock().unwrap().write).poll_close(cx) + } } - /// Writes a response to the given I/O stream according to the - /// negotiated protocol. - async fn write_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - response: Self::Response, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - let to_send = match response { - Response::Sender(to_send) => to_send, - Response::Receiver(_) => unreachable!(), - }; - - let mut sent = 0; - while sent < to_send { - let n = std::cmp::min(to_send - sent, BUF.len()); - let buf = &BUF[..n]; - - sent += io.write(buf).await?; + impl AsyncRead for DummyStream { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + let amt = std::cmp::min(buf.len(), self.inner.lock().unwrap().read.len()); + let new = self.inner.lock().unwrap().read.split_off(amt); + + buf[..amt].copy_from_slice(self.inner.lock().unwrap().read.as_slice()); + + self.inner.lock().unwrap().read = new; + Poll::Ready(Ok(amt)) } + } - Ok(()) + #[test] + fn test_client() { + let stream = DummyStream::new(vec![0]); + + block_on(send_receive( + RunParams { + to_send: 0, + to_receive: 0, + }, + stream.clone(), + )) + .unwrap(); + + assert_eq!( + stream.inner.lock().unwrap().write, + 0u64.to_be_bytes().to_vec() + ); } } diff --git a/protocols/perf/src/server.rs b/protocols/perf/src/server.rs index 79f77c74650..2a6b5d60774 100644 --- a/protocols/perf/src/server.rs +++ b/protocols/perf/src/server.rs @@ -18,150 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use std::task::{Context, Poll}; +mod behaviour; +mod handler; -use instant::Duration; -use libp2p_core::Multiaddr; -use libp2p_identity::PeerId; -use libp2p_request_response as request_response; -use libp2p_swarm::{ - ConnectionId, FromSwarm, NetworkBehaviour, PollParameters, THandlerInEvent, THandlerOutEvent, - ToSwarm, -}; +use instant::Instant; -use crate::protocol::Response; - -pub struct Behaviour { - request_response: request_response::Behaviour, -} - -impl Default for Behaviour { - fn default() -> Self { - let mut req_resp_config = request_response::Config::default(); - req_resp_config.set_connection_keep_alive(Duration::from_secs(60 * 5)); - req_resp_config.set_request_timeout(Duration::from_secs(60 * 5)); - - Self { - request_response: request_response::Behaviour::new( - std::iter::once(( - crate::PROTOCOL_NAME, - request_response::ProtocolSupport::Inbound, - )), - req_resp_config, - ), - } - } -} - -impl Behaviour { - pub fn new() -> Self { - Self::default() - } -} - -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = - as NetworkBehaviour>::ConnectionHandler; - type ToSwarm = (); - - fn handle_pending_outbound_connection( - &mut self, - connection_id: ConnectionId, - maybe_peer: Option, - addresses: &[Multiaddr], - effective_role: libp2p_core::Endpoint, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.request_response.handle_pending_outbound_connection( - connection_id, - maybe_peer, - addresses, - effective_role, - ) - } - - fn handle_established_outbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - addr: &Multiaddr, - role_override: libp2p_core::Endpoint, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.request_response - .handle_established_outbound_connection(connection_id, peer, addr, role_override) - } - - fn handle_pending_inbound_connection( - &mut self, - connection_id: ConnectionId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result<(), libp2p_swarm::ConnectionDenied> { - self.request_response.handle_pending_inbound_connection( - connection_id, - local_addr, - remote_addr, - ) - } - - fn handle_established_inbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.request_response.handle_established_inbound_connection( - connection_id, - peer, - local_addr, - remote_addr, - ) - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - self.request_response.on_swarm_event(event); - } - - fn on_connection_handler_event( - &mut self, - peer_id: PeerId, - connection_id: ConnectionId, - event: THandlerOutEvent, - ) { - self.request_response - .on_connection_handler_event(peer_id, connection_id, event); - } - - fn poll( - &mut self, - cx: &mut Context<'_>, - params: &mut impl PollParameters, - ) -> Poll>> { - self.request_response.poll(cx, params).map(|to_swarm| { - to_swarm.map_out(|m| match m { - request_response::Event::Message { - peer: _, - message: request_response::Message::Response { .. }, - } => { - unreachable!() - } - request_response::Event::Message { - peer: _, - message: - request_response::Message::Request { - request_id: _, - request, - channel, - }, - } => { - let _ = self - .request_response - .send_response(channel, Response::Sender(request.to_send)); - } - request_response::Event::OutboundFailure { .. } => unreachable!(), - request_response::Event::InboundFailure { .. } => {} - request_response::Event::ResponseSent { .. } => {} - }) - }) - } -} +pub use behaviour::{Behaviour, Event}; diff --git a/protocols/perf/src/server/behaviour.rs b/protocols/perf/src/server/behaviour.rs index b15cb70110d..49942e07396 100644 --- a/protocols/perf/src/server/behaviour.rs +++ b/protocols/perf/src/server/behaviour.rs @@ -32,13 +32,12 @@ use libp2p_swarm::{ }; use crate::server::handler::Handler; - -use super::RunStats; +use crate::Run; #[derive(Debug)] pub struct Event { pub remote_peer_id: PeerId, - pub stats: RunStats, + pub stats: Run, } #[derive(Default)] diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index e8f7b72e605..53163720010 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -35,15 +35,15 @@ use libp2p_swarm::{ use log::error; use void::Void; -use super::RunStats; +use crate::Run; #[derive(Debug)] pub struct Event { - pub stats: RunStats, + pub stats: Run, } pub struct Handler { - inbound: FuturesUnordered>>, + inbound: FuturesUnordered>>, keep_alive: KeepAlive, } diff --git a/protocols/perf/tests/lib.rs b/protocols/perf/tests/lib.rs index af5bc2c35a2..a79e8dd36b3 100644 --- a/protocols/perf/tests/lib.rs +++ b/protocols/perf/tests/lib.rs @@ -18,7 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use libp2p_perf::{client, server, RunParams}; +use libp2p_perf::{ + client::{self}, + server, RunParams, +}; use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; @@ -33,7 +36,7 @@ async fn perf() { server.listen().await; client.connect(&mut server).await; - tokio::spawn(server.loop_on_next()); + tokio::task::spawn(server.loop_on_next()); client .behaviour_mut() From f45070c4d0a88e27a8acb6ee27a7eab3e0e1aad8 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 23 Aug 2023 14:20:18 +0200 Subject: [PATCH 02/29] Implement reporting via genawaiter --- Cargo.lock | 80 +++++++++++- protocols/perf/Cargo.toml | 8 +- protocols/perf/src/bin/perf.rs | 10 +- protocols/perf/src/client/behaviour.rs | 4 +- protocols/perf/src/client/handler.rs | 29 ++--- protocols/perf/src/lib.rs | 8 ++ protocols/perf/src/protocol.rs | 165 +++++++++++++++++-------- 7 files changed, 227 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b830b31cf4c..a9fe1f3ad44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1906,6 +1906,37 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "genawaiter" +version = "0.99.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c86bd0361bcbde39b13475e6e36cb24c329964aa2611be285289d1e4b751c1a0" +dependencies = [ + "futures-core", + "genawaiter-macro", + "genawaiter-proc-macro", + "proc-macro-hack", +] + +[[package]] +name = "genawaiter-macro" +version = "0.99.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b32dfe1fdfc0bbde1f22a5da25355514b5e450c33a6af6770884c8750aedfbc" + +[[package]] +name = "genawaiter-proc-macro" +version = "0.99.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "784f84eebc366e15251c4a8c3acee82a6a6f427949776ecb88377362a9621738" +dependencies = [ + "proc-macro-error 0.4.12", + "proc-macro-hack", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -2983,6 +3014,8 @@ dependencies = [ "clap", "env_logger 0.10.0", "futures", + "futures-timer", + "genawaiter", "instant", "libp2p-core", "libp2p-dns", @@ -4243,16 +4276,42 @@ dependencies = [ "elliptic-curve 0.13.5", ] +[[package]] +name = "proc-macro-error" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" +dependencies = [ + "proc-macro-error-attr 0.4.12", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + [[package]] name = "proc-macro-error" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" dependencies = [ - "proc-macro-error-attr", + "proc-macro-error-attr 1.0.4", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" +dependencies = [ "proc-macro2", "quote", "syn 1.0.109", + "syn-mid", "version_check", ] @@ -4267,6 +4326,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.20+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" + [[package]] name = "proc-macro-warning" version = "0.4.1" @@ -5485,6 +5550,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn-mid" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea305d57546cc8cd04feb14b62ec84bf17f50e3f7b12560d7bfa9265f39d9ed" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "sync_wrapper" version = "0.1.2" @@ -5593,7 +5669,7 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cae91d1c7c61ec65817f1064954640ee350a50ae6548ff9a1bdd2489d6ffbb0" dependencies = [ - "proc-macro-error", + "proc-macro-error 1.0.4", "proc-macro2", "quote", "syn 1.0.109", diff --git a/protocols/perf/Cargo.toml b/protocols/perf/Cargo.toml index cc5d41c1dfc..d47ab12f915 100644 --- a/protocols/perf/Cargo.toml +++ b/protocols/perf/Cargo.toml @@ -15,22 +15,24 @@ anyhow = "1" clap = { version = "4.3.23", features = ["derive"] } env_logger = "0.10.0" futures = "0.3.28" +futures-timer = "3.0" +genawaiter = { version = "0.99.1", features = ["futures03"] } instant = "0.1.12" libp2p-core = { workspace = true } libp2p-dns = { workspace = true, features = ["tokio"] } libp2p-identity = { workspace = true } -libp2p-tls = { workspace = true } libp2p-quic = { workspace = true, features = ["tokio"] } libp2p-swarm = { workspace = true, features = ["macros", "tokio"] } libp2p-tcp = { workspace = true, features = ["tokio"] } +libp2p-tls = { workspace = true } libp2p-yamux = { workspace = true } log = "0.4" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" thiserror = "1.0" # TODO: Full needed? tokio = { version = "1.32.0", features = ["full"] } void = "1" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" [dev-dependencies] rand = "0.8" diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index c2ce8692e09..f6bef8f0aa8 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -31,7 +31,7 @@ use libp2p_core::{ }; use libp2p_identity::PeerId; use libp2p_perf::server::Event; -use libp2p_perf::{Run, RunDuration, RunParams}; +use libp2p_perf::{Run, RunDuration, RunParams, RunUpdate}; use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; use log::{error, info}; use serde::{Deserialize, Serialize}; @@ -454,14 +454,16 @@ async fn perf( ) -> Result { swarm.behaviour_mut().perf(server_peer_id, params)?; - let run = match swarm.next().await.unwrap() { + let duration = match swarm.next().await.unwrap() { SwarmEvent::Behaviour(libp2p_perf::client::Event { id: _, - result: Ok(run), - }) => run, + result: Ok(RunUpdate::Finished { duration }), + }) => duration, e => panic!("{e:?}"), }; + let run = Run { params, duration }; + info!("{run}"); Ok(run) diff --git a/protocols/perf/src/client/behaviour.rs b/protocols/perf/src/client/behaviour.rs index 1fbf4ccf695..84d7c84d3e9 100644 --- a/protocols/perf/src/client/behaviour.rs +++ b/protocols/perf/src/client/behaviour.rs @@ -34,7 +34,7 @@ use libp2p_swarm::{ }; use void::Void; -use crate::client::handler::Handler; +use crate::{client::handler::Handler, RunUpdate}; use crate::{Run, RunDuration, RunParams}; use super::RunId; @@ -42,7 +42,7 @@ use super::RunId; #[derive(Debug)] pub struct Event { pub id: RunId, - pub result: Result>, + pub result: Result>, } #[derive(Default)] diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index 9cc52a4cc97..66bc488469a 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -24,7 +24,11 @@ use std::{ time::{Duration, Instant}, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt}; +use futures::{ + future::BoxFuture, + stream::{BoxStream, FuturesUnordered, LocalBoxStream, SelectAll}, + FutureExt, StreamExt, TryFutureExt, +}; use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; use libp2p_swarm::{ handler::{ @@ -37,7 +41,7 @@ use libp2p_swarm::{ use void::Void; use super::RunId; -use crate::{Run, RunDuration, RunParams}; +use crate::{Run, RunDuration, RunParams, RunUpdate}; #[derive(Debug)] pub struct Command { @@ -48,7 +52,7 @@ pub struct Command { #[derive(Debug)] pub struct Event { pub(crate) id: RunId, - pub(crate) result: Result>, + pub(crate) result: Result>, } pub struct Handler { @@ -64,7 +68,7 @@ pub struct Handler { requested_streams: VecDeque, - outbound: FuturesUnordered>>, + outbound: SelectAll>>, keep_alive: KeepAlive, } @@ -128,14 +132,8 @@ impl ConnectionHandler for Handler { .requested_streams .pop_front() .expect("opened a stream without a pending command"); - self.outbound.push( - crate::protocol::send_receive(params, protocol) - .map_ok(move |duration| Event { - id, - result: Ok(Run { params, duration }), - }) - .boxed(), - ); + self.outbound + .push(crate::protocol::send_receive(params, protocol)); } ConnectionEvent::AddressChange(_) @@ -180,9 +178,12 @@ impl ConnectionHandler for Handler { while let Poll::Ready(Some(result)) = self.outbound.poll_next_unpin(cx) { match result { - Ok(event) => return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)), + Ok(event) => return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { + id: todo!(), + result: Ok(event), + })), Err(e) => { - panic!("{e:?}") + todo!("{e:?}") } } } diff --git a/protocols/perf/src/lib.rs b/protocols/perf/src/lib.rs index ac7d0963237..d578c736cd0 100644 --- a/protocols/perf/src/lib.rs +++ b/protocols/perf/src/lib.rs @@ -35,6 +35,14 @@ pub mod server; pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/perf/1.0.0"); +#[derive(Debug, Clone, Copy)] +pub enum RunUpdate { + Progressed, + Finished { + duration: RunDuration, + }, +} + /// Parameters for a single run, i.e. one stream, sending and receiving data. /// /// Property names are from the perspective of the actor. E.g. `to_send` is the amount of data to diff --git a/protocols/perf/src/protocol.rs b/protocols/perf/src/protocol.rs index bbfc69e40f6..9346160363e 100644 --- a/protocols/perf/src/protocol.rs +++ b/protocols/perf/src/protocol.rs @@ -19,51 +19,106 @@ // DEALINGS IN THE SOFTWARE. use instant::Instant; +use std::time::Duration; -use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures::{ + future::LocalBoxFuture, + stream::{BoxStream, LocalBoxStream}, + AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt, +}; -use crate::{client, server, RunParams, RunDuration, Run}; +use crate::{client, server, Run, RunDuration, RunParams, RunUpdate}; + +use genawaiter::yield_; const BUF: [u8; 1024] = [0; 1024]; -pub(crate) async fn send_receive( +pub(crate) fn send_receive( params: RunParams, mut stream: S, -) -> Result { - let RunParams { - to_send, - to_receive, - } = params; - - let mut receive_buf = vec![0; 1024]; - - stream.write_all(&(to_receive as u64).to_be_bytes()).await?; - - let write_start = Instant::now(); - - let mut sent = 0; - while sent < to_send { - let n = std::cmp::min(to_send - sent, BUF.len()); - let buf = &BUF[..n]; +) -> BoxStream<'static, Result> { + let generator = genawaiter::sync::gen!({ + let mut delay = futures_timer::Delay::new(Duration::from_secs(1)); + + let RunParams { + to_send, + to_receive, + } = params; + + let mut receive_buf = vec![0; 1024]; + let to_receive_bytes = (to_receive as u64).to_be_bytes(); + + let mut write_to_receive = stream.write_all(&to_receive_bytes); + loop { + match futures::future::select(&mut delay, &mut write_to_receive).await { + futures::future::Either::Left((_, _)) => { + delay = futures_timer::Delay::new(Duration::from_secs(1)); + // yield_!() + } + futures::future::Either::Right((Ok(_), _)) => break, + futures::future::Either::Right((Err(_), _)) => todo!("yield"), + } + } - sent += stream.write(buf).await?; - } + let write_start = Instant::now(); + + let mut sent = 0; + while sent < to_send { + let n = std::cmp::min(to_send - sent, BUF.len()); + let buf = &BUF[..n]; + + let mut write = stream.write(buf); + sent += loop { + match futures::future::select(&mut delay, &mut write).await { + futures::future::Either::Left((_, _)) => { + delay = futures_timer::Delay::new(Duration::from_secs(1)); + // yield_!() + } + futures::future::Either::Right((Ok(n), _)) => break n, + futures::future::Either::Right((Err(_), _)) => todo!("yield"), + } + } + } - stream.close().await?; + loop { + match futures::future::select(&mut delay, stream.close()).await { + futures::future::Either::Left((_, _)) => { + delay = futures_timer::Delay::new(Duration::from_secs(1)); + // yield_!() + } + futures::future::Either::Right((Ok(_), _)) => break, + futures::future::Either::Right((Err(_), _)) => todo!("yield"), + } + } - let write_done = Instant::now(); + let write_done = Instant::now(); + + let mut received = 0; + while received < to_receive { + let mut read = stream.read(&mut receive_buf); + received += loop { + match futures::future::select(&mut delay, &mut read).await { + futures::future::Either::Left((_, _)) => { + delay = futures_timer::Delay::new(Duration::from_secs(1)); + // yield_!() + } + futures::future::Either::Right((Ok(n), _)) => break n, + futures::future::Either::Right((Err(_), _)) => todo!("yield"), + } + } + } - let mut received = 0; - while received < to_receive { - received += stream.read(&mut receive_buf).await?; - } + let read_done = Instant::now(); - let read_done = Instant::now(); + yield_!(Ok(RunUpdate::Finished { + duration: RunDuration { + upload: write_done.duration_since(write_start), + download: read_done.duration_since(write_done), + } + })); + }); - Ok(RunDuration { - upload: write_done.duration_since(write_start), - download: read_done.duration_since(write_done), - }) + StreamExt::boxed(generator) } pub(crate) async fn receive_send( @@ -102,7 +157,10 @@ pub(crate) async fn receive_send( let write_done = Instant::now(); Ok(Run { - params: RunParams { to_send: sent, to_receive: received }, + params: RunParams { + to_send: sent, + to_receive: received, + }, duration: RunDuration { upload: write_done.duration_since(read_done), download: read_done.duration_since(read_start), @@ -113,7 +171,10 @@ pub(crate) async fn receive_send( #[cfg(test)] mod tests { use super::*; - use futures::{executor::block_on, AsyncRead, AsyncWrite}; + use futures::{ + executor::{block_on, block_on_stream}, + AsyncRead, AsyncWrite, + }; use std::{ pin::Pin, sync::{Arc, Mutex}, @@ -183,22 +244,22 @@ mod tests { } } - #[test] - fn test_client() { - let stream = DummyStream::new(vec![0]); - - block_on(send_receive( - RunParams { - to_send: 0, - to_receive: 0, - }, - stream.clone(), - )) - .unwrap(); - - assert_eq!( - stream.inner.lock().unwrap().write, - 0u64.to_be_bytes().to_vec() - ); - } + // #[test] + // fn test_client() { + // let stream = DummyStream::new(vec![0]); + + // block_on_stream(send_receive( + // RunParams { + // to_send: 0, + // to_receive: 0, + // }, + // stream.clone(), + // )) + // .collect::>() + + // assert_eq!( + // stream.inner.lock().unwrap().write, + // 0u64.to_be_bytes().to_vec() + // ); + // } } From 3f49d8eabbe5088faab97c2c023218cc55d667a2 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 23 Aug 2023 19:50:36 +0200 Subject: [PATCH 03/29] Replace genawaiter with plain channel --- Cargo.lock | 79 +----- protocols/perf/Cargo.toml | 1 - protocols/perf/src/bin/perf.rs | 22 +- protocols/perf/src/client.rs | 2 - protocols/perf/src/client/behaviour.rs | 2 +- protocols/perf/src/client/handler.rs | 31 +-- protocols/perf/src/lib.rs | 109 +++++--- protocols/perf/src/protocol.rs | 372 ++++++++++++++----------- protocols/perf/src/server.rs | 1 - 9 files changed, 313 insertions(+), 306 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9fe1f3ad44..2e222b78406 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1906,37 +1906,6 @@ dependencies = [ "rustc_version", ] -[[package]] -name = "genawaiter" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c86bd0361bcbde39b13475e6e36cb24c329964aa2611be285289d1e4b751c1a0" -dependencies = [ - "futures-core", - "genawaiter-macro", - "genawaiter-proc-macro", - "proc-macro-hack", -] - -[[package]] -name = "genawaiter-macro" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b32dfe1fdfc0bbde1f22a5da25355514b5e450c33a6af6770884c8750aedfbc" - -[[package]] -name = "genawaiter-proc-macro" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784f84eebc366e15251c4a8c3acee82a6a6f427949776ecb88377362a9621738" -dependencies = [ - "proc-macro-error 0.4.12", - "proc-macro-hack", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "generic-array" version = "0.14.7" @@ -3015,7 +2984,6 @@ dependencies = [ "env_logger 0.10.0", "futures", "futures-timer", - "genawaiter", "instant", "libp2p-core", "libp2p-dns", @@ -4276,42 +4244,16 @@ dependencies = [ "elliptic-curve 0.13.5", ] -[[package]] -name = "proc-macro-error" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" -dependencies = [ - "proc-macro-error-attr 0.4.12", - "proc-macro2", - "quote", - "syn 1.0.109", - "version_check", -] - [[package]] name = "proc-macro-error" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" dependencies = [ - "proc-macro-error-attr 1.0.4", - "proc-macro2", - "quote", - "syn 1.0.109", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" -dependencies = [ + "proc-macro-error-attr", "proc-macro2", "quote", "syn 1.0.109", - "syn-mid", "version_check", ] @@ -4326,12 +4268,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro-warning" version = "0.4.1" @@ -5550,17 +5486,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "syn-mid" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea305d57546cc8cd04feb14b62ec84bf17f50e3f7b12560d7bfa9265f39d9ed" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "sync_wrapper" version = "0.1.2" @@ -5669,7 +5594,7 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cae91d1c7c61ec65817f1064954640ee350a50ae6548ff9a1bdd2489d6ffbb0" dependencies = [ - "proc-macro-error 1.0.4", + "proc-macro-error", "proc-macro2", "quote", "syn 1.0.109", diff --git a/protocols/perf/Cargo.toml b/protocols/perf/Cargo.toml index d47ab12f915..8c11c3345a2 100644 --- a/protocols/perf/Cargo.toml +++ b/protocols/perf/Cargo.toml @@ -16,7 +16,6 @@ clap = { version = "4.3.23", features = ["derive"] } env_logger = "0.10.0" futures = "0.3.28" futures-timer = "3.0" -genawaiter = { version = "0.99.1", features = ["futures03"] } instant = "0.1.12" libp2p-core = { workspace = true } libp2p-dns = { workspace = true, features = ["tokio"] } diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index f6bef8f0aa8..41b76102b73 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -31,7 +31,7 @@ use libp2p_core::{ }; use libp2p_identity::PeerId; use libp2p_perf::server::Event; -use libp2p_perf::{Run, RunDuration, RunParams, RunUpdate}; +use libp2p_perf::{Finished, Run, RunParams, RunUpdate}; use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; use log::{error, info}; use serde::{Deserialize, Serialize}; @@ -454,12 +454,20 @@ async fn perf( ) -> Result { swarm.behaviour_mut().perf(server_peer_id, params)?; - let duration = match swarm.next().await.unwrap() { - SwarmEvent::Behaviour(libp2p_perf::client::Event { - id: _, - result: Ok(RunUpdate::Finished { duration }), - }) => duration, - e => panic!("{e:?}"), + let duration = loop { + match swarm.next().await.unwrap() { + SwarmEvent::Behaviour(libp2p_perf::client::Event { + id: _, + result: Ok(RunUpdate::Progressed(progressed)), + }) => { + info!("{progressed}"); + } + SwarmEvent::Behaviour(libp2p_perf::client::Event { + id: _, + result: Ok(RunUpdate::Finished(Finished { duration })), + }) => break duration, + e => panic!("{e:?}"), + }; }; let run = Run { params, duration }; diff --git a/protocols/perf/src/client.rs b/protocols/perf/src/client.rs index 5220c4838af..21a60ccde1b 100644 --- a/protocols/perf/src/client.rs +++ b/protocols/perf/src/client.rs @@ -21,9 +21,7 @@ mod behaviour; mod handler; -use instant::Instant; use std::sync::atomic::{AtomicUsize, Ordering}; -use crate::RunParams; pub use behaviour::{Behaviour, Event}; diff --git a/protocols/perf/src/client/behaviour.rs b/protocols/perf/src/client/behaviour.rs index 84d7c84d3e9..2415fd91764 100644 --- a/protocols/perf/src/client/behaviour.rs +++ b/protocols/perf/src/client/behaviour.rs @@ -34,8 +34,8 @@ use libp2p_swarm::{ }; use void::Void; +use crate::RunParams; use crate::{client::handler::Handler, RunUpdate}; -use crate::{Run, RunDuration, RunParams}; use super::RunId; diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index 66bc488469a..0d93fe52a8c 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -25,9 +25,8 @@ use std::{ }; use futures::{ - future::BoxFuture, - stream::{BoxStream, FuturesUnordered, LocalBoxStream, SelectAll}, - FutureExt, StreamExt, TryFutureExt, + stream::{BoxStream, SelectAll}, + StreamExt, }; use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; use libp2p_swarm::{ @@ -41,7 +40,7 @@ use libp2p_swarm::{ use void::Void; use super::RunId; -use crate::{Run, RunDuration, RunParams, RunUpdate}; +use crate::{RunParams, RunUpdate}; #[derive(Debug)] pub struct Command { @@ -68,7 +67,7 @@ pub struct Handler { requested_streams: VecDeque, - outbound: SelectAll>>, + outbound: SelectAll)>>, keep_alive: KeepAlive, } @@ -133,7 +132,12 @@ impl ConnectionHandler for Handler { .pop_front() .expect("opened a stream without a pending command"); self.outbound - .push(crate::protocol::send_receive(params, protocol)); + // TODO: can we get around the box? + .push( + crate::protocol::send_receive(params, protocol) + .map(move |result| (id, result)) + .boxed(), + ); } ConnectionEvent::AddressChange(_) @@ -176,16 +180,11 @@ impl ConnectionHandler for Handler { return Poll::Ready(event); } - while let Poll::Ready(Some(result)) = self.outbound.poll_next_unpin(cx) { - match result { - Ok(event) => return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { - id: todo!(), - result: Ok(event), - })), - Err(e) => { - todo!("{e:?}") - } - } + while let Poll::Ready(Some((id, result))) = self.outbound.poll_next_unpin(cx) { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { + id, + result: result.map_err(|e| todo!("{e:?}")), + })); } if self.outbound.is_empty() { diff --git a/protocols/perf/src/lib.rs b/protocols/perf/src/lib.rs index d578c736cd0..18212819136 100644 --- a/protocols/perf/src/lib.rs +++ b/protocols/perf/src/lib.rs @@ -37,10 +37,41 @@ pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/perf/1.0.0"); #[derive(Debug, Clone, Copy)] pub enum RunUpdate { - Progressed, - Finished { - duration: RunDuration, - }, + Progressed(Progressed), + Finished(Finished), +} + +#[derive(Debug, Clone, Copy)] +pub struct Progressed { + pub duration: Duration, + pub sent: usize, + pub received: usize, +} + +impl Display for Progressed { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Progressed { + // TODO: A single duration for both up and down is not ideal. + duration, + sent, + received, + } = self; + write!( + f, + "{:4} s uploaded {} downloaded {} ({})", + duration.as_secs_f64(), + format_bytes(*sent), + format_bytes(*received), + format_bandwidth(*duration, sent + received), + )?; + + Ok(()) + } +} + +#[derive(Debug, Clone, Copy)] +pub struct Finished { + pub duration: RunDuration, } /// Parameters for a single run, i.e. one stream, sending and receiving data. @@ -66,43 +97,43 @@ pub struct Run { pub duration: RunDuration, } +const KILO: f64 = 1024.0; +const MEGA: f64 = KILO * 1024.0; +const GIGA: f64 = MEGA * 1024.0; + +fn format_bytes(bytes: usize) -> String { + let bytes = bytes as f64; + if bytes >= GIGA { + format!("{:.2} GiB", bytes / GIGA) + } else if bytes >= MEGA { + format!("{:.2} MiB", bytes / MEGA) + } else if bytes >= KILO { + format!("{:.2} KiB", bytes / KILO) + } else { + format!("{} B", bytes) + } +} + +fn format_bandwidth(duration: Duration, bytes: usize) -> String { + const KILO: f64 = 1024.0; + const MEGA: f64 = KILO * 1024.0; + const GIGA: f64 = MEGA * 1024.0; + + let bandwidth = (bytes as f64 * 8.0) / duration.as_secs_f64(); + + if bandwidth >= GIGA { + format!("{:.2} Gbit/s", bandwidth / GIGA) + } else if bandwidth >= MEGA { + format!("{:.2} Mbit/s", bandwidth / MEGA) + } else if bandwidth >= KILO { + format!("{:.2} Kbit/s", bandwidth / KILO) + } else { + format!("{:.2} bit/s", bandwidth) + } +} + impl Display for Run { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - const KILO: f64 = 1024.0; - const MEGA: f64 = KILO * 1024.0; - const GIGA: f64 = MEGA * 1024.0; - - fn format_bytes(bytes: usize) -> String { - let bytes = bytes as f64; - if bytes >= GIGA { - format!("{:.2} GiB", bytes / GIGA) - } else if bytes >= MEGA { - format!("{:.2} MiB", bytes / MEGA) - } else if bytes >= KILO { - format!("{:.2} KiB", bytes / KILO) - } else { - format!("{} B", bytes) - } - } - - fn format_bandwidth(duration: Duration, bytes: usize) -> String { - const KILO: f64 = 1024.0; - const MEGA: f64 = KILO * 1024.0; - const GIGA: f64 = MEGA * 1024.0; - - let bandwidth = (bytes as f64 * 8.0) / duration.as_secs_f64(); - - if bandwidth >= GIGA { - format!("{:.2} Gbit/s", bandwidth / GIGA) - } else if bandwidth >= MEGA { - format!("{:.2} Mbit/s", bandwidth / MEGA) - } else if bandwidth >= KILO { - format!("{:.2} Kbit/s", bandwidth / KILO) - } else { - format!("{:.2} bit/s", bandwidth) - } - } - let Run { params: RunParams { to_send, diff --git a/protocols/perf/src/protocol.rs b/protocols/perf/src/protocol.rs index 9346160363e..09d6787f132 100644 --- a/protocols/perf/src/protocol.rs +++ b/protocols/perf/src/protocol.rs @@ -22,103 +22,155 @@ use instant::Instant; use std::time::Duration; use futures::{ - future::LocalBoxFuture, - stream::{BoxStream, LocalBoxStream}, - AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt, + future::select, stream::BoxStream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, + FutureExt, SinkExt, StreamExt, }; -use crate::{client, server, Run, RunDuration, RunParams, RunUpdate}; - -use genawaiter::yield_; +use crate::{Finished, Progressed, Run, RunDuration, RunParams, RunUpdate}; const BUF: [u8; 1024] = [0; 1024]; pub(crate) fn send_receive( params: RunParams, - mut stream: S, + stream: S, + // TODO: Could return impl Stream ) -> BoxStream<'static, Result> { - let generator = genawaiter::sync::gen!({ - let mut delay = futures_timer::Delay::new(Duration::from_secs(1)); + let (sender, receiver) = futures::channel::mpsc::channel(0); - let RunParams { - to_send, - to_receive, - } = params; + let receiver = receiver.fuse(); - let mut receive_buf = vec![0; 1024]; - let to_receive_bytes = (to_receive as u64).to_be_bytes(); + // TODO: Do we need the box? + let inner = send_receive_inner(params, stream, sender).fuse().boxed(); - let mut write_to_receive = stream.write_all(&to_receive_bytes); - loop { - match futures::future::select(&mut delay, &mut write_to_receive).await { - futures::future::Either::Left((_, _)) => { - delay = futures_timer::Delay::new(Duration::from_secs(1)); - // yield_!() - } - futures::future::Either::Right((Ok(_), _)) => break, - futures::future::Either::Right((Err(_), _)) => todo!("yield"), - } - } + futures::stream::select( + receiver.map(|progressed| Ok(RunUpdate::Progressed(progressed))), + inner + .map(|finished| finished.map(RunUpdate::Finished)) + .into_stream(), + ) + .boxed() +} + +async fn send_receive_inner( + params: RunParams, + mut stream: S, + mut progress: futures::channel::mpsc::Sender, +) -> Result { + let mut delay = futures_timer::Delay::new(Duration::from_secs(1)); - let write_start = Instant::now(); + let RunParams { + to_send, + to_receive, + } = params; - let mut sent = 0; - while sent < to_send { - let n = std::cmp::min(to_send - sent, BUF.len()); - let buf = &BUF[..n]; + let mut receive_buf = vec![0; 1024]; + let to_receive_bytes = (to_receive as u64).to_be_bytes(); - let mut write = stream.write(buf); - sent += loop { - match futures::future::select(&mut delay, &mut write).await { - futures::future::Either::Left((_, _)) => { - delay = futures_timer::Delay::new(Duration::from_secs(1)); - // yield_!() - } - futures::future::Either::Right((Ok(n), _)) => break n, - futures::future::Either::Right((Err(_), _)) => todo!("yield"), - } + let mut write_to_receive = stream.write_all(&to_receive_bytes); + loop { + match select(&mut delay, &mut write_to_receive).await { + futures::future::Either::Left((_, _)) => { + delay = futures_timer::Delay::new(Duration::from_secs(1)); + progress + .send(Progressed { + duration: Duration::ZERO, + sent: 0, + received: 0, + }) + .await + .expect("receiver not to be dropped"); } + futures::future::Either::Right((result, _)) => break result?, } + } + + let write_start = Instant::now(); + let mut intermittant_start = Instant::now(); + + let mut sent = 0; + let mut intermittent_sent = 0; + while sent < to_send { + let n = std::cmp::min(to_send - sent, BUF.len()); + let buf = &BUF[..n]; - loop { - match futures::future::select(&mut delay, stream.close()).await { + let mut write = stream.write(buf); + sent += loop { + match select(&mut delay, &mut write).await { futures::future::Either::Left((_, _)) => { delay = futures_timer::Delay::new(Duration::from_secs(1)); - // yield_!() + progress + .send(Progressed { + duration: intermittant_start.elapsed(), + sent: sent - intermittent_sent, + received: 0, + }) + .await + .expect("receiver not to be dropped"); + intermittant_start = Instant::now(); + intermittent_sent = sent; } - futures::future::Either::Right((Ok(_), _)) => break, + futures::future::Either::Right((Ok(n), _)) => break n, futures::future::Either::Right((Err(_), _)) => todo!("yield"), } } + } - let write_done = Instant::now(); - - let mut received = 0; - while received < to_receive { - let mut read = stream.read(&mut receive_buf); - received += loop { - match futures::future::select(&mut delay, &mut read).await { - futures::future::Either::Left((_, _)) => { - delay = futures_timer::Delay::new(Duration::from_secs(1)); - // yield_!() - } - futures::future::Either::Right((Ok(n), _)) => break n, - futures::future::Either::Right((Err(_), _)) => todo!("yield"), - } + loop { + match select(&mut delay, stream.close()).await { + futures::future::Either::Left((_, _)) => { + delay = futures_timer::Delay::new(Duration::from_secs(1)); + progress + .send(Progressed { + duration: intermittant_start.elapsed(), + sent: sent - intermittent_sent, + received: 0, + }) + .await + .expect("receiver not to be dropped"); + intermittant_start = Instant::now(); + intermittent_sent = sent; } + futures::future::Either::Right((Ok(_), _)) => break, + futures::future::Either::Right((Err(_), _)) => todo!("yield"), } + } - let read_done = Instant::now(); + let write_done = Instant::now(); - yield_!(Ok(RunUpdate::Finished { - duration: RunDuration { - upload: write_done.duration_since(write_start), - download: read_done.duration_since(write_done), + let mut received = 0; + let mut intermittend_received = 0; + while received < to_receive { + let mut read = stream.read(&mut receive_buf); + received += loop { + match select(&mut delay, &mut read).await { + futures::future::Either::Left((_, _)) => { + delay = futures_timer::Delay::new(Duration::from_secs(1)); + progress + .send(Progressed { + duration: intermittant_start.elapsed(), + sent: sent - intermittent_sent, + received: received - intermittend_received, + }) + .await + .expect("receiver not to be dropped"); + intermittant_start = Instant::now(); + intermittent_sent = sent; + intermittend_received = received; + } + futures::future::Either::Right((Ok(n), _)) => break n, + futures::future::Either::Right((Err(_), _)) => todo!("yield"), } - })); - }); + } + } - StreamExt::boxed(generator) + let read_done = Instant::now(); + + Ok(Finished { + duration: RunDuration { + upload: write_done.duration_since(write_start), + download: read_done.duration_since(write_done), + }, + }) } pub(crate) async fn receive_send( @@ -168,98 +220,94 @@ pub(crate) async fn receive_send( }) } -#[cfg(test)] -mod tests { - use super::*; - use futures::{ - executor::{block_on, block_on_stream}, - AsyncRead, AsyncWrite, - }; - use std::{ - pin::Pin, - sync::{Arc, Mutex}, - task::Poll, - }; - - #[derive(Clone)] - struct DummyStream { - inner: Arc>, - } - - struct DummyStreamInner { - read: Vec, - write: Vec, - } - - impl DummyStream { - fn new(read: Vec) -> Self { - Self { - inner: Arc::new(Mutex::new(DummyStreamInner { - read, - write: Vec::new(), - })), - } - } - } - - impl Unpin for DummyStream {} - - impl AsyncWrite for DummyStream { - fn poll_write( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - Pin::new(&mut self.inner.lock().unwrap().write).poll_write(cx, buf) - } - - fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.inner.lock().unwrap().write).poll_flush(cx) - } - - fn poll_close( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.inner.lock().unwrap().write).poll_close(cx) - } - } - - impl AsyncRead for DummyStream { - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - buf: &mut [u8], - ) -> std::task::Poll> { - let amt = std::cmp::min(buf.len(), self.inner.lock().unwrap().read.len()); - let new = self.inner.lock().unwrap().read.split_off(amt); - - buf[..amt].copy_from_slice(self.inner.lock().unwrap().read.as_slice()); - - self.inner.lock().unwrap().read = new; - Poll::Ready(Ok(amt)) - } - } - - // #[test] - // fn test_client() { - // let stream = DummyStream::new(vec![0]); - - // block_on_stream(send_receive( - // RunParams { - // to_send: 0, - // to_receive: 0, - // }, - // stream.clone(), - // )) - // .collect::>() - - // assert_eq!( - // stream.inner.lock().unwrap().write, - // 0u64.to_be_bytes().to_vec() - // ); - // } -} +// #[cfg(test)] +// mod tests { +// use futures::{AsyncRead, AsyncWrite}; +// use std::{ +// pin::Pin, +// sync::{Arc, Mutex}, +// task::Poll, +// }; +// +// #[derive(Clone)] +// struct DummyStream { +// inner: Arc>, +// } +// +// struct DummyStreamInner { +// read: Vec, +// write: Vec, +// } +// +// impl DummyStream { +// fn new(read: Vec) -> Self { +// Self { +// inner: Arc::new(Mutex::new(DummyStreamInner { +// read, +// write: Vec::new(), +// })), +// } +// } +// } +// +// impl Unpin for DummyStream {} +// +// impl AsyncWrite for DummyStream { +// fn poll_write( +// self: std::pin::Pin<&mut Self>, +// cx: &mut std::task::Context<'_>, +// buf: &[u8], +// ) -> std::task::Poll> { +// Pin::new(&mut self.inner.lock().unwrap().write).poll_write(cx, buf) +// } +// +// fn poll_flush( +// self: std::pin::Pin<&mut Self>, +// cx: &mut std::task::Context<'_>, +// ) -> std::task::Poll> { +// Pin::new(&mut self.inner.lock().unwrap().write).poll_flush(cx) +// } +// +// fn poll_close( +// self: std::pin::Pin<&mut Self>, +// cx: &mut std::task::Context<'_>, +// ) -> std::task::Poll> { +// Pin::new(&mut self.inner.lock().unwrap().write).poll_close(cx) +// } +// } +// +// impl AsyncRead for DummyStream { +// fn poll_read( +// self: Pin<&mut Self>, +// _cx: &mut std::task::Context<'_>, +// buf: &mut [u8], +// ) -> std::task::Poll> { +// let amt = std::cmp::min(buf.len(), self.inner.lock().unwrap().read.len()); +// let new = self.inner.lock().unwrap().read.split_off(amt); +// +// buf[..amt].copy_from_slice(self.inner.lock().unwrap().read.as_slice()); +// +// self.inner.lock().unwrap().read = new; +// Poll::Ready(Ok(amt)) +// } +// } +// +// // #[test] +// // fn test_client() { +// // let stream = DummyStream::new(vec![0]); +// +// // block_on_stream(send_receive( +// // RunParams { +// // to_send: 0, +// // to_receive: 0, +// // }, +// // stream.clone(), +// // )) +// // .collect::>() +// +// // assert_eq!( +// // stream.inner.lock().unwrap().write, +// // 0u64.to_be_bytes().to_vec() +// // ); +// // } +// } diff --git a/protocols/perf/src/server.rs b/protocols/perf/src/server.rs index 2a6b5d60774..cad4853e6cf 100644 --- a/protocols/perf/src/server.rs +++ b/protocols/perf/src/server.rs @@ -21,6 +21,5 @@ mod behaviour; mod handler; -use instant::Instant; pub use behaviour::{Behaviour, Event}; From a5df071b1fd2627b3f4040ee8e9dfd552681902f Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 23 Aug 2023 20:03:20 +0200 Subject: [PATCH 04/29] Basic printing --- protocols/perf/src/bin/perf.rs | 39 +++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index 41b76102b73..ab1f53fa3d6 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -31,7 +31,7 @@ use libp2p_core::{ }; use libp2p_identity::PeerId; use libp2p_perf::server::Event; -use libp2p_perf::{Finished, Run, RunParams, RunUpdate}; +use libp2p_perf::{Finished, Progressed, Run, RunParams, RunUpdate}; use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; use log::{error, info}; use serde::{Deserialize, Serialize}; @@ -195,6 +195,15 @@ async fn client( Ok(()) } +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct CustomResult { + r#type: String, + time_seconds: f64, + upload_bytes: usize, + download_bytes: usize, +} + async fn custom(server_address: Multiaddr, params: RunParams) -> Result<()> { info!("start benchmark: custom"); let mut swarm = swarm().await?; @@ -205,16 +214,13 @@ async fn custom(server_address: Multiaddr, params: RunParams) -> Result<()> { perf(&mut swarm, server_peer_id, params).await?; - #[derive(Serialize, Deserialize)] - #[serde(rename_all = "camelCase")] - struct CustomResult { - latency: f64, - } - println!( "{}", serde_json::to_string(&CustomResult { - latency: start.elapsed().as_secs_f64(), + upload_bytes: params.to_send, + download_bytes: params.to_receive, + r#type: "final".to_string(), + time_seconds: start.elapsed().as_secs_f64(), }) .unwrap() ); @@ -461,6 +467,23 @@ async fn perf( result: Ok(RunUpdate::Progressed(progressed)), }) => { info!("{progressed}"); + + let Progressed { + duration, + sent, + received, + } = progressed; + + println!( + "{}", + serde_json::to_string(&CustomResult { + r#type: "intermittent".to_string(), + time_seconds: duration.as_secs_f64(), + upload_bytes: sent, + download_bytes: received, + }) + .unwrap() + ); } SwarmEvent::Behaviour(libp2p_perf::client::Event { id: _, From 9788b53bad52a2b0831b13c1c38e3523659a66af Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 24 Aug 2023 13:57:18 +0200 Subject: [PATCH 05/29] intermittent -> intermediate --- protocols/perf/src/bin/perf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index ab1f53fa3d6..30aab780cb2 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -477,7 +477,7 @@ async fn perf( println!( "{}", serde_json::to_string(&CustomResult { - r#type: "intermittent".to_string(), + r#type: "intermediate".to_string(), time_seconds: duration.as_secs_f64(), upload_bytes: sent, download_bytes: received, From 066d44ce95dabf390251e55a30f538cb6c05c9d7 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 25 Aug 2023 13:02:24 +0200 Subject: [PATCH 06/29] Max out quic connection and stream data --- protocols/perf/src/bin/perf.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index 30aab780cb2..57476b8f6f0 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -409,6 +409,11 @@ async fn swarm() -> Result> { let quic = { let mut config = libp2p_quic::Config::new(&local_key); config.support_draft_29 = true; + + // TODO: Revert. But could potentially inform a better default. + config.max_connection_data = u32::MAX; + config.max_stream_data = u32::MAX; + libp2p_quic::tokio::Transport::new(config) }; From 9a1865e458a3effe4abd63ab989d26733cb04823 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 25 Aug 2023 13:53:32 +0200 Subject: [PATCH 07/29] Set send_window --- transports/quic/src/config.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/transports/quic/src/config.rs b/transports/quic/src/config.rs index 201594e247c..a21d0d3512f 100644 --- a/transports/quic/src/config.rs +++ b/transports/quic/src/config.rs @@ -116,6 +116,8 @@ impl From for QuinnConfig { transport.allow_spin(false); transport.stream_receive_window(max_stream_data.into()); transport.receive_window(max_connection_data.into()); + // TODO + transport.send_window(max_connection_data.into()); let transport = Arc::new(transport); let mut server_config = quinn::ServerConfig::with_crypto(server_tls_config); From 14db268a0e5b662ef721fdc7326bc4fff598c092 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 25 Aug 2023 13:55:16 +0200 Subject: [PATCH 08/29] Log config --- protocols/perf/src/bin/perf.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index 57476b8f6f0..9d6424b6e45 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -414,6 +414,8 @@ async fn swarm() -> Result> { config.max_connection_data = u32::MAX; config.max_stream_data = u32::MAX; + log::info!("{config:?}"); + libp2p_quic::tokio::Transport::new(config) }; From d85382c2bd8bea733e194c8872ae0506fe918e9b Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 25 Aug 2023 14:02:59 +0200 Subject: [PATCH 09/29] Revert logging --- protocols/perf/src/bin/perf.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index 9d6424b6e45..57476b8f6f0 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -414,8 +414,6 @@ async fn swarm() -> Result> { config.max_connection_data = u32::MAX; config.max_stream_data = u32::MAX; - log::info!("{config:?}"); - libp2p_quic::tokio::Transport::new(config) }; From 0f99dd444f2b9b8e903c5be2db5b420086114c16 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 25 Aug 2023 16:43:07 +0200 Subject: [PATCH 10/29] Be reasonable --- protocols/perf/src/bin/perf.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index 57476b8f6f0..57acf37afcb 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -411,8 +411,10 @@ async fn swarm() -> Result> { config.support_draft_29 = true; // TODO: Revert. But could potentially inform a better default. - config.max_connection_data = u32::MAX; - config.max_stream_data = u32::MAX; + // + // see test-plans user-data.sh for rational + config.max_connection_data = 200000000; + config.max_stream_data = 200000000; libp2p_quic::tokio::Transport::new(config) }; From 257c1012fbf49083aafb55c2cccbc14295115afb Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 19 Oct 2023 12:41:09 +0200 Subject: [PATCH 11/29] Simplify binary --- protocols/perf/src/bin/perf.rs | 239 ++++----------------------------- 1 file changed, 24 insertions(+), 215 deletions(-) diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index 6bfac221054..e7e0e697fbf 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -22,7 +22,6 @@ use std::{net::SocketAddr, str::FromStr}; use anyhow::{bail, Result}; use clap::Parser; -use futures::FutureExt; use futures::{future::Either, StreamExt}; use instant::{Duration, Instant}; use libp2p_core::{ @@ -164,29 +163,31 @@ async fn client( .with(Protocol::Udp(server_address.port())) .with(Protocol::QuicV1), }; - - let benchmarks = if upload_bytes.is_some() { - vec![custom( - server_address, - RunParams { - to_send: upload_bytes.unwrap(), - to_receive: download_bytes.unwrap(), - }, - ) - .boxed()] - } else { - vec![ - latency(server_address.clone()).boxed(), - throughput(server_address.clone()).boxed(), - requests_per_second(server_address.clone()).boxed(), - sequential_connections_per_second(server_address.clone()).boxed(), - ] + let params = RunParams { + to_send: upload_bytes.unwrap(), + to_receive: download_bytes.unwrap(), }; + let mut swarm = swarm().await?; tokio::spawn(async move { - for benchmark in benchmarks { - benchmark.await?; - } + info!("start benchmark: custom"); + + let start = Instant::now(); + + let server_peer_id = connect(&mut swarm, server_address.clone()).await?; + + perf(&mut swarm, server_peer_id, params).await?; + + println!( + "{}", + serde_json::to_string(&BenchmarkResult { + upload_bytes: params.to_send, + download_bytes: params.to_receive, + r#type: "final".to_string(), + time_seconds: start.elapsed().as_secs_f64(), + }) + .unwrap() + ); anyhow::Ok(()) }) @@ -197,205 +198,13 @@ async fn client( #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -struct CustomResult { +struct BenchmarkResult { r#type: String, time_seconds: f64, upload_bytes: usize, download_bytes: usize, } -async fn custom(server_address: Multiaddr, params: RunParams) -> Result<()> { - info!("start benchmark: custom"); - let mut swarm = swarm().await?; - - let start = Instant::now(); - - let server_peer_id = connect(&mut swarm, server_address.clone()).await?; - - perf(&mut swarm, server_peer_id, params).await?; - - println!( - "{}", - serde_json::to_string(&CustomResult { - upload_bytes: params.to_send, - download_bytes: params.to_receive, - r#type: "final".to_string(), - time_seconds: start.elapsed().as_secs_f64(), - }) - .unwrap() - ); - - Ok(()) -} - -async fn latency(server_address: Multiaddr) -> Result<()> { - info!("start benchmark: round-trip-time latency"); - let mut swarm = swarm().await?; - - let server_peer_id = connect(&mut swarm, server_address.clone()).await?; - - let mut rounds = 0; - let start = Instant::now(); - let mut latencies = Vec::new(); - - loop { - if start.elapsed() > Duration::from_secs(30) { - break; - } - - let start = Instant::now(); - - perf( - &mut swarm, - server_peer_id, - RunParams { - to_send: 1, - to_receive: 1, - }, - ) - .await?; - - latencies.push(start.elapsed().as_secs_f64()); - rounds += 1; - } - - latencies.sort_by(|a, b| a.partial_cmp(b).unwrap()); - - info!( - "Finished: {rounds} pings in {:.4}s", - start.elapsed().as_secs_f64() - ); - info!("- {:.4} s median", percentile(&latencies, 0.50),); - info!("- {:.4} s 95th percentile\n", percentile(&latencies, 0.95),); - Ok(()) -} - -fn percentile(values: &[V], percentile: f64) -> V { - let n: usize = (values.len() as f64 * percentile).ceil() as usize - 1; - values[n] -} - -async fn throughput(server_address: Multiaddr) -> Result<()> { - info!("start benchmark: single connection single channel throughput"); - let mut swarm = swarm().await?; - - let server_peer_id = connect(&mut swarm, server_address.clone()).await?; - - let params = RunParams { - to_send: 10 * 1024 * 1024, - to_receive: 10 * 1024 * 1024, - }; - - perf(&mut swarm, server_peer_id, params).await?; - - Ok(()) -} - -async fn requests_per_second(server_address: Multiaddr) -> Result<()> { - info!("start benchmark: single connection parallel requests per second"); - let mut swarm = swarm().await?; - - let server_peer_id = connect(&mut swarm, server_address.clone()).await?; - - let num = 1_000; - let to_send = 1; - let to_receive = 1; - - for _ in 0..num { - swarm.behaviour_mut().perf( - server_peer_id, - RunParams { - to_send, - to_receive, - }, - )?; - } - - let mut finished = 0; - let start = Instant::now(); - - loop { - match swarm.next().await.unwrap() { - SwarmEvent::Behaviour(libp2p_perf::client::Event { - id: _, - result: Ok(_), - }) => { - finished += 1; - - if finished == num { - break; - } - } - e => panic!("{e:?}"), - } - } - - let duration = start.elapsed().as_secs_f64(); - let requests_per_second = num as f64 / duration; - - info!( - "Finished: sent {num} {to_send} bytes requests with {to_receive} bytes response each within {duration:.2} s", - ); - info!("- {requests_per_second:.2} req/s\n"); - - Ok(()) -} - -async fn sequential_connections_per_second(server_address: Multiaddr) -> Result<()> { - info!("start benchmark: sequential connections with single request per second"); - let mut rounds = 0; - let to_send = 1; - let to_receive = 1; - let start = Instant::now(); - - let mut latency_connection_establishment = Vec::new(); - let mut latency_connection_establishment_plus_request = Vec::new(); - - loop { - if start.elapsed() > Duration::from_secs(30) { - break; - } - - let mut swarm = swarm().await?; - - let start = Instant::now(); - - let server_peer_id = connect(&mut swarm, server_address.clone()).await?; - - latency_connection_establishment.push(start.elapsed().as_secs_f64()); - - perf( - &mut swarm, - server_peer_id, - RunParams { - to_send, - to_receive, - }, - ) - .await?; - - latency_connection_establishment_plus_request.push(start.elapsed().as_secs_f64()); - rounds += 1; - } - - let duration = start.elapsed().as_secs_f64(); - - latency_connection_establishment.sort_by(|a, b| a.partial_cmp(b).unwrap()); - latency_connection_establishment_plus_request.sort_by(|a, b| a.partial_cmp(b).unwrap()); - - let connection_establishment_95th = percentile(&latency_connection_establishment, 0.95); - let connection_establishment_plus_request_95th = - percentile(&latency_connection_establishment_plus_request, 0.95); - - info!( - "Finished: established {rounds} connections with one {to_send} bytes request and one {to_receive} bytes response within {duration:.2} s", - ); - info!("- {connection_establishment_95th:.4} s 95th percentile connection establishment"); - info!("- {connection_establishment_plus_request_95th:.4} s 95th percentile connection establishment + one request"); - - Ok(()) -} - async fn swarm() -> Result> { let local_key = libp2p_identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); @@ -486,7 +295,7 @@ async fn perf( println!( "{}", - serde_json::to_string(&CustomResult { + serde_json::to_string(&BenchmarkResult { r#type: "intermediate".to_string(), time_seconds: duration.as_secs_f64(), upload_bytes: sent, From f89729764f57dcc7df33d2dba5ad9595de1ce349 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 19 Oct 2023 12:44:05 +0200 Subject: [PATCH 12/29] Reduce tokio features --- protocols/perf/Cargo.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/protocols/perf/Cargo.toml b/protocols/perf/Cargo.toml index e590d263131..de9b7ec3982 100644 --- a/protocols/perf/Cargo.toml +++ b/protocols/perf/Cargo.toml @@ -29,8 +29,7 @@ log = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" -# TODO: Full needed? -tokio = { version = "1.33.0", features = ["full"] } +tokio = { version = "1.33", default-features = false, features = ["macros", "rt", "rt-multi-thread"] } void = "1" [dev-dependencies] From 1ac5543d4d4d041ad2faee9df817d31aaad5bc64 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 19 Oct 2023 12:49:28 +0200 Subject: [PATCH 13/29] Revert changes to dockerfile --- protocols/perf/Dockerfile | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/protocols/perf/Dockerfile b/protocols/perf/Dockerfile index aef8eed1cad..6523e3bede1 100644 --- a/protocols/perf/Dockerfile +++ b/protocols/perf/Dockerfile @@ -9,14 +9,10 @@ RUN --mount=type=cache,target=./target \ cargo build --release --package libp2p-perf RUN --mount=type=cache,target=./target \ - mv ./target/release/perf-server /usr/local/bin/perf-server - -RUN --mount=type=cache,target=./target \ - mv ./target/release/perf-client /usr/local/bin/perf-client + mv ./target/release/perf /usr/local/bin/perf FROM debian:bullseye-slim -COPY --from=builder /usr/local/bin/perf-server /usr/local/bin/perf-server -COPY --from=builder /usr/local/bin/perf-client /usr/local/bin/perf-client +COPY --from=builder /usr/local/bin/perf /app/perf -ENTRYPOINT [ "perf-server"] +ENTRYPOINT [ "/app/perf" ] From ce4384de43c7b58a50c198c792c748115250981e Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 19 Oct 2023 12:55:42 +0200 Subject: [PATCH 14/29] Remove quic window bumps --- protocols/perf/src/bin/perf.rs | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index e7e0e697fbf..e4269dc47c3 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -214,20 +214,7 @@ async fn swarm() -> Result> { .upgrade(upgrade::Version::V1Lazy) .authenticate(libp2p_tls::Config::new(&local_key)?) .multiplex(libp2p_yamux::Config::default()); - - let quic = { - let mut config = libp2p_quic::Config::new(&local_key); - config.support_draft_29 = true; - - // TODO: Revert. But could potentially inform a better default. - // - // see test-plans user-data.sh for rational - config.max_connection_data = 200000000; - config.max_stream_data = 200000000; - - libp2p_quic::tokio::Transport::new(config) - }; - + let quic = libp2p_quic::tokio::Transport::new(libp2p_quic::Config::new(&local_key)); let dns = libp2p_dns::tokio::Transport::system(OrTransport::new(quic, tcp))?; dns.map(|either_output, _| match either_output { From b82b6846ec06b0b3c09fe9738ae0f1e1e4d53a5b Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 19 Oct 2023 12:56:50 +0200 Subject: [PATCH 15/29] Revert quic transport send_window hack --- transports/quic/src/config.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/transports/quic/src/config.rs b/transports/quic/src/config.rs index 3429391df63..5351a537c76 100644 --- a/transports/quic/src/config.rs +++ b/transports/quic/src/config.rs @@ -120,8 +120,6 @@ impl From for QuinnConfig { transport.allow_spin(false); transport.stream_receive_window(max_stream_data.into()); transport.receive_window(max_connection_data.into()); - // TODO - transport.send_window(max_connection_data.into()); let transport = Arc::new(transport); let mut server_config = quinn::ServerConfig::with_crypto(server_tls_config); From 2126bf63d448d45d0150357398b61e0b3996c843 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 19 Oct 2023 13:19:00 +0200 Subject: [PATCH 16/29] Handle client error --- protocols/perf/src/client.rs | 10 ++++++++++ protocols/perf/src/client/behaviour.rs | 7 +++---- protocols/perf/src/client/handler.rs | 13 ++++++------- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/protocols/perf/src/client.rs b/protocols/perf/src/client.rs index 21a60ccde1b..c4614e979db 100644 --- a/protocols/perf/src/client.rs +++ b/protocols/perf/src/client.rs @@ -24,6 +24,8 @@ mod handler; use std::sync::atomic::{AtomicUsize, Ordering}; pub use behaviour::{Behaviour, Event}; +use libp2p_swarm::StreamUpgradeError; +use void::Void; static NEXT_RUN_ID: AtomicUsize = AtomicUsize::new(1); @@ -37,3 +39,11 @@ impl RunId { Self(NEXT_RUN_ID.fetch_add(1, Ordering::SeqCst)) } } + +#[derive(thiserror::Error, Debug)] +pub enum RunError { + #[error(transparent)] + Upgrade(#[from] StreamUpgradeError), + #[error("Failed to execute perf run: {0}")] + Io(#[from] std::io::Error), +} diff --git a/protocols/perf/src/client/behaviour.rs b/protocols/perf/src/client/behaviour.rs index 2415fd91764..64dc2680167 100644 --- a/protocols/perf/src/client/behaviour.rs +++ b/protocols/perf/src/client/behaviour.rs @@ -29,20 +29,19 @@ use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_swarm::{ derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionId, FromSwarm, - NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, THandlerInEvent, + NetworkBehaviour, NotifyHandler, PollParameters, THandlerInEvent, THandlerOutEvent, ToSwarm, }; -use void::Void; use crate::RunParams; use crate::{client::handler::Handler, RunUpdate}; -use super::RunId; +use super::{RunId, RunError}; #[derive(Debug)] pub struct Event { pub id: RunId, - pub result: Result>, + pub result: Result, } #[derive(Default)] diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index c502eb214a1..005c6af862f 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -33,12 +33,11 @@ use libp2p_swarm::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }, - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError, - SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, SubstreamProtocol, }; use void::Void; -use super::RunId; +use super::{RunError, RunId}; use crate::{RunParams, RunUpdate}; #[derive(Debug)] @@ -50,7 +49,7 @@ pub struct Command { #[derive(Debug)] pub struct Event { pub(crate) id: RunId, - pub(crate) result: Result>, + pub(crate) result: Result, } pub struct Handler { @@ -150,7 +149,7 @@ impl ConnectionHandler for Handler { self.queued_events .push_back(ConnectionHandlerEvent::NotifyBehaviour(Event { id, - result: Err(error), + result: Err(error.into()), })); } ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => { @@ -179,10 +178,10 @@ impl ConnectionHandler for Handler { return Poll::Ready(event); } - while let Poll::Ready(Some((id, result))) = self.outbound.poll_next_unpin(cx) { + if let Poll::Ready(Some((id, result))) = self.outbound.poll_next_unpin(cx) { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { id, - result: result.map_err(|e| todo!("{e:?}")), + result: result.map_err(Into::into), })); } From b3ab27049c048ac1f8a1748edee44474cbc95ade Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 19 Oct 2023 13:36:02 +0200 Subject: [PATCH 17/29] Simplify client protocol --- protocols/perf/src/client/handler.rs | 12 +-- protocols/perf/src/protocol.rs | 156 ++++----------------------- 2 files changed, 27 insertions(+), 141 deletions(-) diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index 005c6af862f..b8626e4a58d 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -129,13 +129,11 @@ impl ConnectionHandler for Handler { .requested_streams .pop_front() .expect("opened a stream without a pending command"); - self.outbound - // TODO: can we get around the box? - .push( - crate::protocol::send_receive(params, protocol) - .map(move |result| (id, result)) - .boxed(), - ); + self.outbound.push( + crate::protocol::send_receive(params, protocol) + .map(move |result| (id, result)) + .boxed(), + ); } ConnectionEvent::AddressChange(_) diff --git a/protocols/perf/src/protocol.rs b/protocols/perf/src/protocol.rs index 09d6787f132..82fb3f4a01e 100644 --- a/protocols/perf/src/protocol.rs +++ b/protocols/perf/src/protocol.rs @@ -18,29 +18,29 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use futures_timer::Delay; use instant::Instant; use std::time::Duration; use futures::{ - future::select, stream::BoxStream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, - FutureExt, SinkExt, StreamExt, + future::{select, Either}, + AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt, Stream, StreamExt, }; use crate::{Finished, Progressed, Run, RunDuration, RunParams, RunUpdate}; const BUF: [u8; 1024] = [0; 1024]; +const REPORT_INTERVAL: Duration = Duration::from_secs(1); pub(crate) fn send_receive( params: RunParams, stream: S, - // TODO: Could return impl Stream -) -> BoxStream<'static, Result> { +) -> impl Stream> { + // Use a channel to simulate a generator. `send_receive_inner` can `yield` events through the + // channel. let (sender, receiver) = futures::channel::mpsc::channel(0); - let receiver = receiver.fuse(); - - // TODO: Do we need the box? - let inner = send_receive_inner(params, stream, sender).fuse().boxed(); + let inner = send_receive_inner(params, stream, sender).fuse(); futures::stream::select( receiver.map(|progressed| Ok(RunUpdate::Progressed(progressed))), @@ -48,7 +48,6 @@ pub(crate) fn send_receive( .map(|finished| finished.map(RunUpdate::Finished)) .into_stream(), ) - .boxed() } async fn send_receive_inner( @@ -56,7 +55,7 @@ async fn send_receive_inner( mut stream: S, mut progress: futures::channel::mpsc::Sender, ) -> Result { - let mut delay = futures_timer::Delay::new(Duration::from_secs(1)); + let mut delay = Delay::new(REPORT_INTERVAL); let RunParams { to_send, @@ -65,30 +64,13 @@ async fn send_receive_inner( let mut receive_buf = vec![0; 1024]; let to_receive_bytes = (to_receive as u64).to_be_bytes(); - - let mut write_to_receive = stream.write_all(&to_receive_bytes); - loop { - match select(&mut delay, &mut write_to_receive).await { - futures::future::Either::Left((_, _)) => { - delay = futures_timer::Delay::new(Duration::from_secs(1)); - progress - .send(Progressed { - duration: Duration::ZERO, - sent: 0, - received: 0, - }) - .await - .expect("receiver not to be dropped"); - } - futures::future::Either::Right((result, _)) => break result?, - } - } + stream.write_all(&to_receive_bytes).await?; let write_start = Instant::now(); let mut intermittant_start = Instant::now(); - let mut sent = 0; let mut intermittent_sent = 0; + while sent < to_send { let n = std::cmp::min(to_send - sent, BUF.len()); let buf = &BUF[..n]; @@ -96,8 +78,8 @@ async fn send_receive_inner( let mut write = stream.write(buf); sent += loop { match select(&mut delay, &mut write).await { - futures::future::Either::Left((_, _)) => { - delay = futures_timer::Delay::new(Duration::from_secs(1)); + Either::Left((_, _)) => { + delay.reset(REPORT_INTERVAL); progress .send(Progressed { duration: intermittant_start.elapsed(), @@ -109,16 +91,15 @@ async fn send_receive_inner( intermittant_start = Instant::now(); intermittent_sent = sent; } - futures::future::Either::Right((Ok(n), _)) => break n, - futures::future::Either::Right((Err(_), _)) => todo!("yield"), + Either::Right((n, _)) => break n?, } } } loop { match select(&mut delay, stream.close()).await { - futures::future::Either::Left((_, _)) => { - delay = futures_timer::Delay::new(Duration::from_secs(1)); + Either::Left((_, _)) => { + delay.reset(REPORT_INTERVAL); progress .send(Progressed { duration: intermittant_start.elapsed(), @@ -130,21 +111,21 @@ async fn send_receive_inner( intermittant_start = Instant::now(); intermittent_sent = sent; } - futures::future::Either::Right((Ok(_), _)) => break, - futures::future::Either::Right((Err(_), _)) => todo!("yield"), + Either::Right((Ok(_), _)) => break, + Either::Right((Err(e), _)) => return Err(e), } } let write_done = Instant::now(); - let mut received = 0; let mut intermittend_received = 0; + while received < to_receive { let mut read = stream.read(&mut receive_buf); received += loop { match select(&mut delay, &mut read).await { - futures::future::Either::Left((_, _)) => { - delay = futures_timer::Delay::new(Duration::from_secs(1)); + Either::Left((_, _)) => { + delay.reset(REPORT_INTERVAL); progress .send(Progressed { duration: intermittant_start.elapsed(), @@ -157,8 +138,7 @@ async fn send_receive_inner( intermittent_sent = sent; intermittend_received = received; } - futures::future::Either::Right((Ok(n), _)) => break n, - futures::future::Either::Right((Err(_), _)) => todo!("yield"), + Either::Right((n, _)) => break n?, } } } @@ -219,95 +199,3 @@ pub(crate) async fn receive_send( }, }) } - -// #[cfg(test)] -// mod tests { -// use futures::{AsyncRead, AsyncWrite}; -// use std::{ -// pin::Pin, -// sync::{Arc, Mutex}, -// task::Poll, -// }; -// -// #[derive(Clone)] -// struct DummyStream { -// inner: Arc>, -// } -// -// struct DummyStreamInner { -// read: Vec, -// write: Vec, -// } -// -// impl DummyStream { -// fn new(read: Vec) -> Self { -// Self { -// inner: Arc::new(Mutex::new(DummyStreamInner { -// read, -// write: Vec::new(), -// })), -// } -// } -// } -// -// impl Unpin for DummyStream {} -// -// impl AsyncWrite for DummyStream { -// fn poll_write( -// self: std::pin::Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// buf: &[u8], -// ) -> std::task::Poll> { -// Pin::new(&mut self.inner.lock().unwrap().write).poll_write(cx, buf) -// } -// -// fn poll_flush( -// self: std::pin::Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// ) -> std::task::Poll> { -// Pin::new(&mut self.inner.lock().unwrap().write).poll_flush(cx) -// } -// -// fn poll_close( -// self: std::pin::Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// ) -> std::task::Poll> { -// Pin::new(&mut self.inner.lock().unwrap().write).poll_close(cx) -// } -// } -// -// impl AsyncRead for DummyStream { -// fn poll_read( -// self: Pin<&mut Self>, -// _cx: &mut std::task::Context<'_>, -// buf: &mut [u8], -// ) -> std::task::Poll> { -// let amt = std::cmp::min(buf.len(), self.inner.lock().unwrap().read.len()); -// let new = self.inner.lock().unwrap().read.split_off(amt); -// -// buf[..amt].copy_from_slice(self.inner.lock().unwrap().read.as_slice()); -// -// self.inner.lock().unwrap().read = new; -// Poll::Ready(Ok(amt)) -// } -// } -// -// // #[test] -// // fn test_client() { -// // let stream = DummyStream::new(vec![0]); -// -// // block_on_stream(send_receive( -// // RunParams { -// // to_send: 0, -// // to_receive: 0, -// // }, -// // stream.clone(), -// // )) -// // .collect::>() -// -// // assert_eq!( -// // stream.inner.lock().unwrap().write, -// // 0u64.to_be_bytes().to_vec() -// // ); -// // } -// } From 61c83be6b3ba4e1ea5dee71bf06dfb74cfb942dc Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 19 Oct 2023 13:38:41 +0200 Subject: [PATCH 18/29] Remove outdated todo --- protocols/perf/src/client/handler.rs | 1 - protocols/perf/src/lib.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index b8626e4a58d..2bdb5426f6c 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -171,7 +171,6 @@ impl ConnectionHandler for Handler { Self::Error, >, > { - // Return queued events. if let Some(event) = self.queued_events.pop_front() { return Poll::Ready(event); } diff --git a/protocols/perf/src/lib.rs b/protocols/perf/src/lib.rs index 18212819136..9cb086754fc 100644 --- a/protocols/perf/src/lib.rs +++ b/protocols/perf/src/lib.rs @@ -51,7 +51,6 @@ pub struct Progressed { impl Display for Progressed { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let Progressed { - // TODO: A single duration for both up and down is not ideal. duration, sent, received, From fd9443ffb2309620b91fd347a1ae8180b4d1e215 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sun, 22 Oct 2023 18:33:29 +0200 Subject: [PATCH 19/29] Address minor review comments --- protocols/perf/src/bin/perf.rs | 20 ++++++++++---------- protocols/perf/src/client/handler.rs | 17 ++++++----------- protocols/perf/src/lib.rs | 12 ++++++------ protocols/perf/src/protocol.rs | 18 +++++++++--------- protocols/perf/src/server/handler.rs | 14 +++++--------- 5 files changed, 36 insertions(+), 45 deletions(-) diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index e4269dc47c3..7904acc370b 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -29,8 +29,8 @@ use libp2p_core::{ Transport as _, }; use libp2p_identity::PeerId; -use libp2p_perf::server::Event; -use libp2p_perf::{Finished, Progressed, Run, RunParams, RunUpdate}; +use libp2p_perf::{client, server}; +use libp2p_perf::{Final, Intermediate, Run, RunParams, RunUpdate}; use libp2p_swarm::{Config, NetworkBehaviour, Swarm, SwarmEvent}; use log::{error, info}; use serde::{Deserialize, Serialize}; @@ -135,7 +135,7 @@ async fn server(server_address: SocketAddr) -> Result<()> { info!("Established connection to {:?} via {:?}", peer_id, endpoint); } SwarmEvent::ConnectionClosed { .. } => {} - SwarmEvent::Behaviour(Event { .. }) => { + SwarmEvent::Behaviour(server::Event { .. }) => { info!("Finished run",) } e => panic!("{e:?}"), @@ -237,7 +237,7 @@ async fn swarm() -> Result> { } async fn connect( - swarm: &mut Swarm, + swarm: &mut Swarm, server_address: Multiaddr, ) -> Result { let start = Instant::now(); @@ -260,7 +260,7 @@ async fn connect( } async fn perf( - swarm: &mut Swarm, + swarm: &mut Swarm, server_peer_id: PeerId, params: RunParams, ) -> Result { @@ -268,13 +268,13 @@ async fn perf( let duration = loop { match swarm.next().await.unwrap() { - SwarmEvent::Behaviour(libp2p_perf::client::Event { + SwarmEvent::Behaviour(client::Event { id: _, - result: Ok(RunUpdate::Progressed(progressed)), + result: Ok(RunUpdate::Intermediate(progressed)), }) => { info!("{progressed}"); - let Progressed { + let Intermediate { duration, sent, received, @@ -291,9 +291,9 @@ async fn perf( .unwrap() ); } - SwarmEvent::Behaviour(libp2p_perf::client::Event { + SwarmEvent::Behaviour(client::Event { id: _, - result: Ok(RunUpdate::Finished(Finished { duration })), + result: Ok(RunUpdate::Final(Final { duration })), }) => break duration, e => panic!("{e:?}"), }; diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index 2bdb5426f6c..e4aed182bad 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -37,7 +37,7 @@ use libp2p_swarm::{ }; use void::Void; -use super::{RunError, RunId}; +use crate::client::{RunError, RunId}; use crate::{RunParams, RunUpdate}; #[derive(Debug)] @@ -66,8 +66,6 @@ pub struct Handler { requested_streams: VecDeque, outbound: SelectAll)>>, - - keep_alive: KeepAlive, } impl Handler { @@ -76,7 +74,6 @@ impl Handler { queued_events: Default::default(), requested_streams: Default::default(), outbound: Default::default(), - keep_alive: KeepAlive::Yes, } } } @@ -157,7 +154,11 @@ impl ConnectionHandler for Handler { } fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive + if self.outbound.is_empty() { + KeepAlive::No + } else { + KeepAlive::Yes + } } fn poll( @@ -182,12 +183,6 @@ impl ConnectionHandler for Handler { })); } - if self.outbound.is_empty() { - self.keep_alive = KeepAlive::No - } else { - self.keep_alive = KeepAlive::Yes - } - Poll::Pending } } diff --git a/protocols/perf/src/lib.rs b/protocols/perf/src/lib.rs index 9cb086754fc..d26c95bc4c7 100644 --- a/protocols/perf/src/lib.rs +++ b/protocols/perf/src/lib.rs @@ -37,20 +37,20 @@ pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/perf/1.0.0"); #[derive(Debug, Clone, Copy)] pub enum RunUpdate { - Progressed(Progressed), - Finished(Finished), + Intermediate(Intermediate), + Final(Final), } #[derive(Debug, Clone, Copy)] -pub struct Progressed { +pub struct Intermediate { pub duration: Duration, pub sent: usize, pub received: usize, } -impl Display for Progressed { +impl Display for Intermediate { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let Progressed { + let Intermediate { duration, sent, received, @@ -69,7 +69,7 @@ impl Display for Progressed { } #[derive(Debug, Clone, Copy)] -pub struct Finished { +pub struct Final { pub duration: RunDuration, } diff --git a/protocols/perf/src/protocol.rs b/protocols/perf/src/protocol.rs index 82fb3f4a01e..9bb7ab5b0b4 100644 --- a/protocols/perf/src/protocol.rs +++ b/protocols/perf/src/protocol.rs @@ -27,7 +27,7 @@ use futures::{ AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt, Stream, StreamExt, }; -use crate::{Finished, Progressed, Run, RunDuration, RunParams, RunUpdate}; +use crate::{Final, Intermediate, Run, RunDuration, RunParams, RunUpdate}; const BUF: [u8; 1024] = [0; 1024]; const REPORT_INTERVAL: Duration = Duration::from_secs(1); @@ -43,9 +43,9 @@ pub(crate) fn send_receive( let inner = send_receive_inner(params, stream, sender).fuse(); futures::stream::select( - receiver.map(|progressed| Ok(RunUpdate::Progressed(progressed))), + receiver.map(|progressed| Ok(RunUpdate::Intermediate(progressed))), inner - .map(|finished| finished.map(RunUpdate::Finished)) + .map(|finished| finished.map(RunUpdate::Final)) .into_stream(), ) } @@ -53,8 +53,8 @@ pub(crate) fn send_receive( async fn send_receive_inner( params: RunParams, mut stream: S, - mut progress: futures::channel::mpsc::Sender, -) -> Result { + mut progress: futures::channel::mpsc::Sender, +) -> Result { let mut delay = Delay::new(REPORT_INTERVAL); let RunParams { @@ -81,7 +81,7 @@ async fn send_receive_inner( Either::Left((_, _)) => { delay.reset(REPORT_INTERVAL); progress - .send(Progressed { + .send(Intermediate { duration: intermittant_start.elapsed(), sent: sent - intermittent_sent, received: 0, @@ -101,7 +101,7 @@ async fn send_receive_inner( Either::Left((_, _)) => { delay.reset(REPORT_INTERVAL); progress - .send(Progressed { + .send(Intermediate { duration: intermittant_start.elapsed(), sent: sent - intermittent_sent, received: 0, @@ -127,7 +127,7 @@ async fn send_receive_inner( Either::Left((_, _)) => { delay.reset(REPORT_INTERVAL); progress - .send(Progressed { + .send(Intermediate { duration: intermittant_start.elapsed(), sent: sent - intermittent_sent, received: received - intermittend_received, @@ -145,7 +145,7 @@ async fn send_receive_inner( let read_done = Instant::now(); - Ok(Finished { + Ok(Final { duration: RunDuration { upload: write_done.duration_since(write_start), download: read_done.duration_since(write_done), diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index a1ce8db4a4a..d4a7ce8546c 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -41,14 +41,12 @@ pub struct Event { pub struct Handler { inbound: FuturesUnordered>>, - keep_alive: KeepAlive, } impl Handler { pub fn new() -> Self { Self { inbound: Default::default(), - keep_alive: KeepAlive::Yes, } } } @@ -110,7 +108,11 @@ impl ConnectionHandler for Handler { } fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive + if self.inbound.is_empty() { + KeepAlive::No + } else { + KeepAlive::Yes + } } fn poll( @@ -135,12 +137,6 @@ impl ConnectionHandler for Handler { } } - if self.inbound.is_empty() { - self.keep_alive = KeepAlive::No - } else { - self.keep_alive = KeepAlive::Yes - } - Poll::Pending } } From 4facef9a0c0600730ba5f7e457ad6046a8c03ec6 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 23 Oct 2023 21:18:17 +0200 Subject: [PATCH 20/29] Use SwarmBuilder --- Cargo.lock | 1 + protocols/perf/Cargo.toml | 1 + protocols/perf/src/bin/perf.rs | 51 +++++++++++++--------------------- 3 files changed, 21 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index faa4ffcff99..eecf9c75af2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2825,6 +2825,7 @@ dependencies = [ "futures", "futures-timer", "instant", + "libp2p", "libp2p-core", "libp2p-dns", "libp2p-identity", diff --git a/protocols/perf/Cargo.toml b/protocols/perf/Cargo.toml index de9b7ec3982..854745d8e19 100644 --- a/protocols/perf/Cargo.toml +++ b/protocols/perf/Cargo.toml @@ -17,6 +17,7 @@ env_logger = "0.10.0" futures = "0.3.28" futures-timer = "3.0" instant = "0.1.12" +libp2p = { workspace = true, features = ["tokio", "tcp", "quic", "tls", "yamux", "dns"] } libp2p-core = { workspace = true } libp2p-dns = { workspace = true, features = ["tokio"] } libp2p-identity = { workspace = true, features = ["rand"] } diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index 7904acc370b..61371317ed2 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -22,16 +22,14 @@ use std::{net::SocketAddr, str::FromStr}; use anyhow::{bail, Result}; use clap::Parser; -use futures::{future::Either, StreamExt}; +use futures::StreamExt; use instant::{Duration, Instant}; -use libp2p_core::{ - multiaddr::Protocol, muxing::StreamMuxerBox, transport::OrTransport, upgrade, Multiaddr, - Transport as _, -}; -use libp2p_identity::PeerId; +use libp2p::core::{multiaddr::Protocol, upgrade, Multiaddr}; +use libp2p::identity::PeerId; +use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p::SwarmBuilder; use libp2p_perf::{client, server}; use libp2p_perf::{Final, Intermediate, Run, RunParams, RunUpdate}; -use libp2p_swarm::{Config, NetworkBehaviour, Swarm, SwarmEvent}; use log::{error, info}; use serde::{Deserialize, Serialize}; @@ -206,32 +204,21 @@ struct BenchmarkResult { } async fn swarm() -> Result> { - let local_key = libp2p_identity::Keypair::generate_ed25519(); - let local_peer_id = PeerId::from(local_key.public()); - - let transport = { - let tcp = libp2p_tcp::tokio::Transport::new(libp2p_tcp::Config::default().nodelay(true)) - .upgrade(upgrade::Version::V1Lazy) - .authenticate(libp2p_tls::Config::new(&local_key)?) - .multiplex(libp2p_yamux::Config::default()); - let quic = libp2p_quic::tokio::Transport::new(libp2p_quic::Config::new(&local_key)); - let dns = libp2p_dns::tokio::Transport::system(OrTransport::new(quic, tcp))?; - - dns.map(|either_output, _| match either_output { - Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + let swarm = SwarmBuilder::with_new_identity() + .with_tokio() + .with_tcp( + libp2p_tcp::Config::default().nodelay(true), + libp2p_tls::Config::new, + libp2p_yamux::Config::default, + )? + .with_quic() + .with_dns()? + .with_behaviour(|_| B::default())? + .with_swarm_config(|cfg| { + cfg.with_substream_upgrade_protocol_override(upgrade::Version::V1Lazy) + .with_idle_connection_timeout(Duration::from_secs(60 * 5)) }) - .boxed() - }; - - let swarm = Swarm::new( - transport, - Default::default(), - local_peer_id, - Config::with_tokio_executor() - .with_substream_upgrade_protocol_override(upgrade::Version::V1Lazy) - .with_idle_connection_timeout(Duration::from_secs(60 * 5)), - ); + .build(); Ok(swarm) } From 9048af284fd346fda0765713693e7b6f58d80dc4 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 23 Oct 2023 21:22:50 +0200 Subject: [PATCH 21/29] Use loop --- protocols/perf/src/server/handler.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index d4a7ce8546c..5a01d0942c5 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -126,17 +126,19 @@ impl ConnectionHandler for Handler { Self::Error, >, > { - while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) { - match result { - Ok(stats) => { + loop { + match self.inbound.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(stats))) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { stats })) } - Err(e) => { - error!("{e:?}") + Poll::Ready(Some(Err(e))) => { + error!("{e:?}"); + continue; } + Poll::Ready(None) | Poll::Pending => {} } - } - Poll::Pending + return Poll::Pending; + } } } From a33842bd4eaadfd1c361e9e2c18d474b11c9bce3 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 23 Oct 2023 21:51:34 +0200 Subject: [PATCH 22/29] Use futures-bounded in server handler --- Cargo.lock | 1 + protocols/perf/Cargo.toml | 1 + protocols/perf/src/lib.rs | 2 ++ protocols/perf/src/server/handler.rs | 27 ++++++++++++++++++--------- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eecf9c75af2..dc2074a0356 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2823,6 +2823,7 @@ dependencies = [ "clap", "env_logger 0.10.0", "futures", + "futures-bounded", "futures-timer", "instant", "libp2p", diff --git a/protocols/perf/Cargo.toml b/protocols/perf/Cargo.toml index 854745d8e19..cd9619a6b62 100644 --- a/protocols/perf/Cargo.toml +++ b/protocols/perf/Cargo.toml @@ -15,6 +15,7 @@ anyhow = "1" clap = { version = "4.3.23", features = ["derive"] } env_logger = "0.10.0" futures = "0.3.28" +futures-bounded = { workspace = true } futures-timer = "3.0" instant = "0.1.12" libp2p = { workspace = true, features = ["tokio", "tcp", "quic", "tls", "yamux", "dns"] } diff --git a/protocols/perf/src/lib.rs b/protocols/perf/src/lib.rs index d26c95bc4c7..f9db96aa9d9 100644 --- a/protocols/perf/src/lib.rs +++ b/protocols/perf/src/lib.rs @@ -34,6 +34,8 @@ mod protocol; pub mod server; pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/perf/1.0.0"); +const RUN_TIMEOUT: Duration = Duration::from_secs(5 * 60); +const MAX_PARALLEL_RUNS_PER_CONNECTION: usize = 1_000; #[derive(Debug, Clone, Copy)] pub enum RunUpdate { diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index 5a01d0942c5..2de2c396819 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -20,7 +20,7 @@ use std::task::{Context, Poll}; -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::FutureExt; use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; use libp2p_swarm::{ handler::{ @@ -40,13 +40,13 @@ pub struct Event { } pub struct Handler { - inbound: FuturesUnordered>>, + inbound: futures_bounded::FuturesSet>, } impl Handler { pub fn new() -> Self { Self { - inbound: Default::default(), + inbound: futures_bounded::FuturesSet::new(crate::RUN_TIMEOUT, crate::MAX_PARALLEL_RUNS_PER_CONNECTION), } } } @@ -88,8 +88,13 @@ impl ConnectionHandler for Handler { protocol, info: _, }) => { - self.inbound - .push(crate::protocol::receive_send(protocol).boxed()); + if self + .inbound + .try_push(crate::protocol::receive_send(protocol).boxed()) + .is_err() + { + log::warn!("Dropping inbound stream because we are at capacity"); + } } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { info, .. }) => { void::unreachable(info) @@ -127,15 +132,19 @@ impl ConnectionHandler for Handler { >, > { loop { - match self.inbound.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(stats))) => { + match self.inbound.poll_unpin(cx) { + Poll::Ready(Ok(Ok(stats))) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { stats })) } - Poll::Ready(Some(Err(e))) => { + Poll::Ready(Ok(Err(e))) => { error!("{e:?}"); continue; } - Poll::Ready(None) | Poll::Pending => {} + Poll::Ready(Err(e @ futures_bounded::Timeout { .. })) => { + error!("inbound perf request timed out: {e}"); + continue; + } + Poll::Pending => {} } return Poll::Pending; From 5d4a7c4bbd2f0e4e73562dde955f25e0b020e8b1 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 23 Oct 2023 21:58:49 +0200 Subject: [PATCH 23/29] Make RunError::NotConnected an empty struct --- protocols/perf/src/client/behaviour.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/protocols/perf/src/client/behaviour.rs b/protocols/perf/src/client/behaviour.rs index 64dc2680167..5a710daf5aa 100644 --- a/protocols/perf/src/client/behaviour.rs +++ b/protocols/perf/src/client/behaviour.rs @@ -29,14 +29,13 @@ use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_swarm::{ derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionId, FromSwarm, - NetworkBehaviour, NotifyHandler, PollParameters, THandlerInEvent, - THandlerOutEvent, ToSwarm, + NetworkBehaviour, NotifyHandler, PollParameters, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use crate::RunParams; use crate::{client::handler::Handler, RunUpdate}; -use super::{RunId, RunError}; +use super::{RunError, RunId}; #[derive(Debug)] pub struct Event { @@ -57,9 +56,9 @@ impl Behaviour { Self::default() } - pub fn perf(&mut self, server: PeerId, params: RunParams) -> Result { + pub fn perf(&mut self, server: PeerId, params: RunParams) -> Result { if !self.connected.contains(&server) { - return Err(PerfError::NotConnected); + return Err(NotConnected {}); } let id = RunId::next(); @@ -75,9 +74,12 @@ impl Behaviour { } #[derive(thiserror::Error, Debug)] -pub enum PerfError { - #[error("Not connected to peer")] - NotConnected, +pub struct NotConnected(); + +impl std::fmt::Display for NotConnected { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "not connected to peer") + } } impl NetworkBehaviour for Behaviour { From b8a7066ed83c5b674024c29a436ff34baa1de041 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 24 Oct 2023 15:07:23 +0200 Subject: [PATCH 24/29] Refactor import --- protocols/perf/src/protocol.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/perf/src/protocol.rs b/protocols/perf/src/protocol.rs index 9bb7ab5b0b4..d2d65b42303 100644 --- a/protocols/perf/src/protocol.rs +++ b/protocols/perf/src/protocol.rs @@ -53,7 +53,7 @@ pub(crate) fn send_receive( async fn send_receive_inner( params: RunParams, mut stream: S, - mut progress: futures::channel::mpsc::Sender, + mut progress: futures::channel::mpsc::Sender, ) -> Result { let mut delay = Delay::new(REPORT_INTERVAL); From 911175280c1aac5e5a4e7ed8d23cd69051255289 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 25 Oct 2023 09:30:59 +0200 Subject: [PATCH 25/29] fmt --- protocols/perf/src/server/handler.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index 2de2c396819..983d26d62f3 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -46,7 +46,10 @@ pub struct Handler { impl Handler { pub fn new() -> Self { Self { - inbound: futures_bounded::FuturesSet::new(crate::RUN_TIMEOUT, crate::MAX_PARALLEL_RUNS_PER_CONNECTION), + inbound: futures_bounded::FuturesSet::new( + crate::RUN_TIMEOUT, + crate::MAX_PARALLEL_RUNS_PER_CONNECTION, + ), } } } From d124a4654fecb6dd064e55fad33b38ab3037ea15 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 25 Oct 2023 09:34:03 +0200 Subject: [PATCH 26/29] fix(.github): allow perf to depend on meta crate --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bf786abaa2f..1a75170cc88 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,7 +52,7 @@ jobs: run: cargo build --package "$CRATE" --no-default-features - name: Enforce no dependency on meta crate - if: env.CRATE != 'libp2p-server' + if: env.CRATE != 'libp2p-server' && env.CRATE != 'libp2p-perf' run: | cargo metadata --format-version=1 --no-deps | \ jq -e -r '.packages[] | select(.name == "'"$CRATE"'") | .dependencies | all(.name != "libp2p")' From affdfce73c17f9cd496ccc52b2f722b0653b73ce Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 25 Oct 2023 09:57:42 +0200 Subject: [PATCH 27/29] Bump version and add changelog entry --- Cargo.lock | 2 +- protocols/perf/CHANGELOG.md | 8 +++++++- protocols/perf/Cargo.toml | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d88d77a0736..378ea5af2f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2814,7 +2814,7 @@ dependencies = [ [[package]] name = "libp2p-perf" -version = "0.2.0" +version = "0.3.0" dependencies = [ "anyhow", "clap", diff --git a/protocols/perf/CHANGELOG.md b/protocols/perf/CHANGELOG.md index e46a94e981a..6976a89887b 100644 --- a/protocols/perf/CHANGELOG.md +++ b/protocols/perf/CHANGELOG.md @@ -1,4 +1,10 @@ -## 0.2.0 +## 0.3.0 - unreleased + +- Continuously measure on single connection (iperf-style). + See https://github.com/libp2p/test-plans/issues/261 for high level overview. + See [PR 4382](https://github.com/libp2p/rust-libp2p/pull/4382). + +## 0.2.0 - Raise MSRV to 1.65. See [PR 3715]. diff --git a/protocols/perf/Cargo.toml b/protocols/perf/Cargo.toml index 914567256dd..03aef55d6f2 100644 --- a/protocols/perf/Cargo.toml +++ b/protocols/perf/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-perf" edition = "2021" rust-version = { workspace = true } description = "libp2p perf protocol implementation" -version = "0.2.0" +version = "0.3.0" authors = ["Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From 0218fcff8e0936844c2a467511763d2b9c57a1e5 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 25 Oct 2023 10:11:43 +0200 Subject: [PATCH 28/29] Bump workspace dependency version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 2c6f214c5d4..8f487a4e06a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,7 @@ libp2p-metrics = { version = "0.14.0", path = "misc/metrics" } libp2p-mplex = { version = "0.41.0", path = "muxers/mplex" } libp2p-muxer-test-harness = { path = "muxers/test-harness" } libp2p-noise = { version = "0.44.0", path = "transports/noise" } -libp2p-perf = { version = "0.2.0", path = "protocols/perf" } +libp2p-perf = { version = "0.3.0", path = "protocols/perf" } libp2p-ping = { version = "0.44.0", path = "protocols/ping" } libp2p-plaintext = { version = "0.41.0", path = "transports/plaintext" } libp2p-pnet = { version = "0.24.0", path = "transports/pnet" } From f8176675ccff2f0bd21927d340dd997222edf5af Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 25 Oct 2023 10:40:46 +0200 Subject: [PATCH 29/29] Remove keep alive handling --- protocols/perf/src/client/handler.rs | 10 +--------- protocols/perf/src/server/handler.rs | 10 +--------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index e4aed182bad..a9bb0c7d483 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -33,7 +33,7 @@ use libp2p_swarm::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }, - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol, }; use void::Void; @@ -153,14 +153,6 @@ impl ConnectionHandler for Handler { } } - fn connection_keep_alive(&self) -> KeepAlive { - if self.outbound.is_empty() { - KeepAlive::No - } else { - KeepAlive::Yes - } - } - fn poll( &mut self, cx: &mut Context<'_>, diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index 983d26d62f3..4e739995b67 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -27,7 +27,7 @@ use libp2p_swarm::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }, - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol, }; use log::error; use void::Void; @@ -115,14 +115,6 @@ impl ConnectionHandler for Handler { } } - fn connection_keep_alive(&self) -> KeepAlive { - if self.inbound.is_empty() { - KeepAlive::No - } else { - KeepAlive::Yes - } - } - fn poll( &mut self, cx: &mut Context<'_>,