Skip to content

Commit

Permalink
swarm/behaviour: Replace inject_* with on_event (libp2p#3011)
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs authored Nov 17, 2022
1 parent ff59e46 commit 4b5f4e2
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 94 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

- Update to `libp2p-request-response` `v0.23.0`.

- Replace `Behaviour`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods.
See [PR 3011].

[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011

# 0.8.0

- Update to `libp2p-core` `v0.37.0`.
Expand Down
215 changes: 121 additions & 94 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@ pub use as_server::{InboundProbeError, InboundProbeEvent};
use futures_timer::Delay;
use instant::Instant;
use libp2p_core::{
connection::ConnectionId, multiaddr::Protocol, transport::ListenerId, ConnectedPoint, Endpoint,
Multiaddr, PeerId,
connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Endpoint, Multiaddr, PeerId,
};
use libp2p_request_response::{
handler::RequestResponseHandlerEvent, ProtocolSupport, RequestId, RequestResponse,
RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
ProtocolSupport, RequestId, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseMessage, ResponseChannel,
};
use libp2p_swarm::{
DialError, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
behaviour::{
AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredExternalAddr,
ExpiredListenAddr, FromSwarm,
},
ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters,
};
use std::{
collections::{HashMap, VecDeque},
Expand Down Expand Up @@ -298,43 +302,32 @@ impl Behaviour {
ongoing_inbound: &mut self.ongoing_inbound,
}
}
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = <RequestResponse<AutoNatCodec> as NetworkBehaviour>::ConnectionHandler;
type OutEvent = Event;

fn inject_connection_established(
fn on_connection_established(
&mut self,
peer: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.inner.inject_connection_established(
peer,
conn,
ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint,
failed_addresses,
other_established,
);
let connections = self.connected.entry(*peer).or_default();
..
}: ConnectionEstablished,
) {
let connections = self.connected.entry(peer).or_default();
let addr = endpoint.get_remote_address();
let observed_addr =
if !endpoint.is_relayed() && (!self.config.only_global_ips || addr.is_global_ip()) {
Some(addr.clone())
} else {
None
};
connections.insert(*conn, observed_addr);
connections.insert(conn, observed_addr);

match endpoint {
ConnectedPoint::Dialer {
address,
role_override: Endpoint::Dialer,
} => {
if let Some(event) = self.as_server().on_outbound_connection(peer, address) {
if let Some(event) = self.as_server().on_outbound_connection(&peer, address) {
self.pending_out_events
.push_back(Event::InboundProbe(event));
}
Expand All @@ -351,79 +344,83 @@ impl NetworkBehaviour for Behaviour {
}
}

fn inject_connection_closed(
fn on_connection_closed(
&mut self,
peer: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
ConnectionClosed {
peer_id,
connection_id,
endpoint,
handler,
remaining_established,
}: ConnectionClosed<<Self as NetworkBehaviour>::ConnectionHandler>,
) {
self.inner
.inject_connection_closed(peer, conn, endpoint, handler, remaining_established);
.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
handler,
remaining_established,
}));

if remaining_established == 0 {
self.connected.remove(peer);
self.connected.remove(&peer_id);
} else {
let connections = self.connected.get_mut(peer).expect("Peer is connected.");
connections.remove(conn);
let connections = self
.connected
.get_mut(&peer_id)
.expect("Peer is connected.");
connections.remove(&connection_id);
}
}

fn inject_dial_failure(
fn on_dial_failure(
&mut self,
peer: Option<PeerId>,
handler: Self::ConnectionHandler,
error: &DialError,
DialFailure {
peer_id,
handler,
error,
}: DialFailure<<Self as NetworkBehaviour>::ConnectionHandler>,
) {
self.inner.inject_dial_failure(peer, handler, error);
if let Some(event) = self.as_server().on_outbound_dial_error(peer, error) {
self.inner
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler,
error,
}));
if let Some(event) = self.as_server().on_outbound_dial_error(peer_id, error) {
self.pending_out_events
.push_back(Event::InboundProbe(event));
}
}

fn inject_address_change(
fn on_address_change(
&mut self,
peer: &PeerId,
conn: &ConnectionId,
old: &ConnectedPoint,
new: &ConnectedPoint,
AddressChange {
peer_id: peer,
connection_id: conn,
old,
new,
}: AddressChange,
) {
self.inner.inject_address_change(peer, conn, old, new);

if old.is_relayed() && new.is_relayed() {
return;
}
let connections = self.connected.get_mut(peer).expect("Peer is connected.");
let connections = self.connected.get_mut(&peer).expect("Peer is connected.");
let addr = new.get_remote_address();
let observed_addr =
if !new.is_relayed() && (!self.config.only_global_ips || addr.is_global_ip()) {
Some(addr.clone())
} else {
None
};
connections.insert(*conn, observed_addr);
}

fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.inner.inject_new_listen_addr(id, addr);
self.as_client().on_new_address();
}

fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.inner.inject_expired_listen_addr(id, addr);
self.as_client().on_expired_address(addr);
}

fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
self.inner.inject_new_external_addr(addr);
self.as_client().on_new_address();
connections.insert(conn, observed_addr);
}
}

fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
self.inner.inject_expired_external_addr(addr);
self.as_client().on_expired_address(addr);
}
impl NetworkBehaviour for Behaviour {
type ConnectionHandler = <RequestResponse<AutoNatCodec> as NetworkBehaviour>::ConnectionHandler;
type OutEvent = Event;

fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) -> Poll<Action> {
loop {
Expand Down Expand Up @@ -478,35 +475,65 @@ impl NetworkBehaviour for Behaviour {
self.inner.addresses_of_peer(peer)
}

fn inject_event(
&mut self,
peer_id: PeerId,
conn: ConnectionId,
event: RequestResponseHandlerEvent<AutoNatCodec>,
) {
self.inner.inject_event(peer_id, conn, event)
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(connection_established) => {
self.inner
.on_swarm_event(FromSwarm::ConnectionEstablished(connection_established));
self.on_connection_established(connection_established)
}
FromSwarm::ConnectionClosed(connection_closed) => {
self.on_connection_closed(connection_closed)
}
FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
FromSwarm::AddressChange(address_change) => {
self.inner
.on_swarm_event(FromSwarm::AddressChange(address_change));
self.on_address_change(address_change)
}
listen_addr @ FromSwarm::NewListenAddr(_) => {
self.inner.on_swarm_event(listen_addr);
self.as_client().on_new_address();
}
FromSwarm::ExpiredListenAddr(ExpiredListenAddr { listener_id, addr }) => {
self.inner
.on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
listener_id,
addr,
}));
self.as_client().on_expired_address(addr);
}
FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr }) => {
self.inner
.on_swarm_event(FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr }));
self.as_client().on_expired_address(addr);
}
external_addr @ FromSwarm::NewExternalAddr(_) => {
self.inner.on_swarm_event(external_addr);
self.as_client().on_new_address();
}
listen_failure @ FromSwarm::ListenFailure(_) => {
self.inner.on_swarm_event(listen_failure)
}
new_listener @ FromSwarm::NewListener(_) => self.inner.on_swarm_event(new_listener),
listener_error @ FromSwarm::ListenerError(_) => {
self.inner.on_swarm_event(listener_error)
}
listener_closed @ FromSwarm::ListenerClosed(_) => {
self.inner.on_swarm_event(listener_closed)
}
}
}

fn inject_listen_failure(
fn on_connection_handler_event(
&mut self,
local_addr: &Multiaddr,
send_back_addr: &Multiaddr,
handler: Self::ConnectionHandler,
peer_id: PeerId,
connection_id: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as
ConnectionHandler>::OutEvent,
) {
self.inner
.inject_listen_failure(local_addr, send_back_addr, handler)
}

fn inject_new_listener(&mut self, id: ListenerId) {
self.inner.inject_new_listener(id)
}

fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
self.inner.inject_listener_error(id, err)
}

fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) {
self.inner.inject_listener_closed(id, reason)
.on_connection_handler_event(peer_id, connection_id, event)
}
}

Expand Down

0 comments on commit 4b5f4e2

Please sign in to comment.