From 4b5f4e2b4df524b34f049c97a7288799fdf690a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Thu, 17 Nov 2022 09:28:40 +0000 Subject: [PATCH] swarm/behaviour: Replace `inject_*` with `on_event` (#3011) --- CHANGELOG.md | 5 ++ src/behaviour.rs | 215 ++++++++++++++++++++++++++--------------------- 2 files changed, 126 insertions(+), 94 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cbcc4d87c6..ac7af065ba8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ - Update to `libp2p-request-response` `v0.23.0`. +- Replace `Behaviour`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods. + See [PR 3011]. + +[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 + # 0.8.0 - Update to `libp2p-core` `v0.37.0`. diff --git a/src/behaviour.rs b/src/behaviour.rs index b39a7b141b4..cc0adc51c02 100644 --- a/src/behaviour.rs +++ b/src/behaviour.rs @@ -29,15 +29,19 @@ pub use as_server::{InboundProbeError, InboundProbeEvent}; use futures_timer::Delay; use instant::Instant; use libp2p_core::{ - connection::ConnectionId, multiaddr::Protocol, transport::ListenerId, ConnectedPoint, Endpoint, - Multiaddr, PeerId, + connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Endpoint, Multiaddr, PeerId, }; use libp2p_request_response::{ - handler::RequestResponseHandlerEvent, ProtocolSupport, RequestId, RequestResponse, - RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel, + ProtocolSupport, RequestId, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, ResponseChannel, }; use libp2p_swarm::{ - DialError, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, + behaviour::{ + AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredExternalAddr, + ExpiredListenAddr, FromSwarm, + }, + ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, + PollParameters, }; use std::{ collections::{HashMap, VecDeque}, @@ -298,28 +302,17 @@ impl Behaviour { ongoing_inbound: &mut self.ongoing_inbound, } } -} - -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = as NetworkBehaviour>::ConnectionHandler; - type OutEvent = Event; - fn inject_connection_established( + fn on_connection_established( &mut self, - peer: &PeerId, - conn: &ConnectionId, - endpoint: &ConnectedPoint, - failed_addresses: Option<&Vec>, - other_established: usize, - ) { - self.inner.inject_connection_established( - peer, - conn, + ConnectionEstablished { + peer_id: peer, + connection_id: conn, endpoint, - failed_addresses, - other_established, - ); - let connections = self.connected.entry(*peer).or_default(); + .. + }: ConnectionEstablished, + ) { + let connections = self.connected.entry(peer).or_default(); let addr = endpoint.get_remote_address(); let observed_addr = if !endpoint.is_relayed() && (!self.config.only_global_ips || addr.is_global_ip()) { @@ -327,14 +320,14 @@ impl NetworkBehaviour for Behaviour { } else { None }; - connections.insert(*conn, observed_addr); + connections.insert(conn, observed_addr); match endpoint { ConnectedPoint::Dialer { address, role_override: Endpoint::Dialer, } => { - if let Some(event) = self.as_server().on_outbound_connection(peer, address) { + if let Some(event) = self.as_server().on_outbound_connection(&peer, address) { self.pending_out_events .push_back(Event::InboundProbe(event)); } @@ -351,50 +344,69 @@ impl NetworkBehaviour for Behaviour { } } - fn inject_connection_closed( + fn on_connection_closed( &mut self, - peer: &PeerId, - conn: &ConnectionId, - endpoint: &ConnectedPoint, - handler: ::Handler, - remaining_established: usize, + ConnectionClosed { + peer_id, + connection_id, + endpoint, + handler, + remaining_established, + }: ConnectionClosed<::ConnectionHandler>, ) { self.inner - .inject_connection_closed(peer, conn, endpoint, handler, remaining_established); + .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + endpoint, + handler, + remaining_established, + })); + if remaining_established == 0 { - self.connected.remove(peer); + self.connected.remove(&peer_id); } else { - let connections = self.connected.get_mut(peer).expect("Peer is connected."); - connections.remove(conn); + let connections = self + .connected + .get_mut(&peer_id) + .expect("Peer is connected."); + connections.remove(&connection_id); } } - fn inject_dial_failure( + fn on_dial_failure( &mut self, - peer: Option, - handler: Self::ConnectionHandler, - error: &DialError, + DialFailure { + peer_id, + handler, + error, + }: DialFailure<::ConnectionHandler>, ) { - self.inner.inject_dial_failure(peer, handler, error); - if let Some(event) = self.as_server().on_outbound_dial_error(peer, error) { + self.inner + .on_swarm_event(FromSwarm::DialFailure(DialFailure { + peer_id, + handler, + error, + })); + if let Some(event) = self.as_server().on_outbound_dial_error(peer_id, error) { self.pending_out_events .push_back(Event::InboundProbe(event)); } } - fn inject_address_change( + fn on_address_change( &mut self, - peer: &PeerId, - conn: &ConnectionId, - old: &ConnectedPoint, - new: &ConnectedPoint, + AddressChange { + peer_id: peer, + connection_id: conn, + old, + new, + }: AddressChange, ) { - self.inner.inject_address_change(peer, conn, old, new); - if old.is_relayed() && new.is_relayed() { return; } - let connections = self.connected.get_mut(peer).expect("Peer is connected."); + let connections = self.connected.get_mut(&peer).expect("Peer is connected."); let addr = new.get_remote_address(); let observed_addr = if !new.is_relayed() && (!self.config.only_global_ips || addr.is_global_ip()) { @@ -402,28 +414,13 @@ impl NetworkBehaviour for Behaviour { } else { None }; - connections.insert(*conn, observed_addr); - } - - fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) { - self.inner.inject_new_listen_addr(id, addr); - self.as_client().on_new_address(); - } - - fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) { - self.inner.inject_expired_listen_addr(id, addr); - self.as_client().on_expired_address(addr); - } - - fn inject_new_external_addr(&mut self, addr: &Multiaddr) { - self.inner.inject_new_external_addr(addr); - self.as_client().on_new_address(); + connections.insert(conn, observed_addr); } +} - fn inject_expired_external_addr(&mut self, addr: &Multiaddr) { - self.inner.inject_expired_external_addr(addr); - self.as_client().on_expired_address(addr); - } +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = as NetworkBehaviour>::ConnectionHandler; + type OutEvent = Event; fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) -> Poll { loop { @@ -478,35 +475,65 @@ impl NetworkBehaviour for Behaviour { self.inner.addresses_of_peer(peer) } - fn inject_event( - &mut self, - peer_id: PeerId, - conn: ConnectionId, - event: RequestResponseHandlerEvent, - ) { - self.inner.inject_event(peer_id, conn, event) + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(connection_established) => { + self.inner + .on_swarm_event(FromSwarm::ConnectionEstablished(connection_established)); + self.on_connection_established(connection_established) + } + FromSwarm::ConnectionClosed(connection_closed) => { + self.on_connection_closed(connection_closed) + } + FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure), + FromSwarm::AddressChange(address_change) => { + self.inner + .on_swarm_event(FromSwarm::AddressChange(address_change)); + self.on_address_change(address_change) + } + listen_addr @ FromSwarm::NewListenAddr(_) => { + self.inner.on_swarm_event(listen_addr); + self.as_client().on_new_address(); + } + FromSwarm::ExpiredListenAddr(ExpiredListenAddr { listener_id, addr }) => { + self.inner + .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr { + listener_id, + addr, + })); + self.as_client().on_expired_address(addr); + } + FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr }) => { + self.inner + .on_swarm_event(FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr })); + self.as_client().on_expired_address(addr); + } + external_addr @ FromSwarm::NewExternalAddr(_) => { + self.inner.on_swarm_event(external_addr); + self.as_client().on_new_address(); + } + listen_failure @ FromSwarm::ListenFailure(_) => { + self.inner.on_swarm_event(listen_failure) + } + new_listener @ FromSwarm::NewListener(_) => self.inner.on_swarm_event(new_listener), + listener_error @ FromSwarm::ListenerError(_) => { + self.inner.on_swarm_event(listener_error) + } + listener_closed @ FromSwarm::ListenerClosed(_) => { + self.inner.on_swarm_event(listener_closed) + } + } } - fn inject_listen_failure( + fn on_connection_handler_event( &mut self, - local_addr: &Multiaddr, - send_back_addr: &Multiaddr, - handler: Self::ConnectionHandler, + peer_id: PeerId, + connection_id: ConnectionId, + event: <::Handler as + ConnectionHandler>::OutEvent, ) { self.inner - .inject_listen_failure(local_addr, send_back_addr, handler) - } - - fn inject_new_listener(&mut self, id: ListenerId) { - self.inner.inject_new_listener(id) - } - - fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) { - self.inner.inject_listener_error(id, err) - } - - fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) { - self.inner.inject_listener_closed(id, reason) + .on_connection_handler_event(peer_id, connection_id, event) } }