Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(relay): propagate errors to Transport::{listen_on,dial} #4745

Merged
merged 32 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ed10848
Forward errors to `dial` and `listen_on`
thomaseizinger Oct 27, 2023
b457a93
Remove unused `PeerId`
thomaseizinger Oct 27, 2023
516e37c
Add docs
thomaseizinger Oct 27, 2023
d85d918
Remove old TODO
thomaseizinger Oct 27, 2023
ed21877
Further propagate rename
thomaseizinger Oct 27, 2023
6efa8af
Use shorthand
thomaseizinger Oct 27, 2023
ef5a3d5
Add debug-asserts for internal state
thomaseizinger Oct 27, 2023
7c74072
Remove unused event
thomaseizinger Oct 27, 2023
8687543
Don't report errors from inbound circuits
thomaseizinger Oct 27, 2023
1de8be3
Add test for `dial`
thomaseizinger Oct 27, 2023
c78b44d
Add changelog entry
thomaseizinger Oct 27, 2023
0594ea0
Update changelog
thomaseizinger Oct 27, 2023
891f96d
Add sentence regarding connection closing
thomaseizinger Oct 27, 2023
5c7407c
Update misc/futures-bounded/CHANGELOG.md
thomaseizinger Oct 27, 2023
c535fc4
Use if let
thomaseizinger Oct 27, 2023
ef84668
Update hole-punch tests to use new errors
thomaseizinger Oct 27, 2023
c15b49e
Merge remote-tracking branch 'origin/refactor/relay-client-error' int…
thomaseizinger Oct 27, 2023
e898ba0
Fix rustdoc link
thomaseizinger Oct 27, 2023
1c260d9
Fix clippy warning
thomaseizinger Oct 27, 2023
b1566b3
Merge branch 'master' into refactor/relay-client-error
thomaseizinger Oct 27, 2023
badda20
Merge branch 'master' into refactor/relay-client-error
thomaseizinger Oct 27, 2023
69a068a
Update protocols/relay/CHANGELOG.md
thomaseizinger Oct 30, 2023
cc888b6
Rename field
thomaseizinger Oct 31, 2023
2750ea5
Remove quasi-unbounded channel in favor of `try_send` + `log` + large
thomaseizinger Oct 31, 2023
f381b33
Replace many loops with one
thomaseizinger Oct 31, 2023
667d4c4
Ensure we always continue polling
thomaseizinger Oct 31, 2023
e394c38
Merge branch 'master' into refactor/relay-client-error
thomaseizinger Oct 31, 2023
02ae398
Rename field
thomaseizinger Oct 31, 2023
ce2a6a6
Remove unused `pending_error` field
thomaseizinger Oct 31, 2023
77f49c5
Log error if listener for new circuit is gone
thomaseizinger Oct 31, 2023
6233e9d
Merge branch 'master' into refactor/relay-client-error
mergify[bot] Oct 31, 2023
10d3f02
Merge branch 'master' into refactor/relay-client-error
thomaseizinger Oct 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ resolver = "2"
rust-version = "1.73.0"

[workspace.dependencies]
futures-bounded = { version = "0.2.0", path = "misc/futures-bounded" }
futures-bounded = { version = "0.2.1", path = "misc/futures-bounded" }
libp2p = { version = "0.53.0", path = "libp2p" }
libp2p-allow-block-list = { version = "0.3.0", path = "misc/allow-block-list" }
libp2p-autonat = { version = "0.12.0", path = "protocols/autonat" }
Expand Down
1 change: 1 addition & 0 deletions hole-punching-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ redis = { version = "0.23.0", default-features = false, features = ["tokio-comp"
tokio = { version = "1.29.1", features = ["full"] }
serde = { version = "1.0.190", features = ["derive"] }
serde_json = "1.0.107"
either = "1.9.0"
58 changes: 48 additions & 10 deletions hole-punching-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
// DEALINGS IN THE SOFTWARE.

use anyhow::{Context, Result};
use either::Either;
use futures::stream::StreamExt;
use libp2p::core::transport::ListenerId;
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::ConnectionId;
use libp2p::{
core::multiaddr::{Multiaddr, Protocol},
dcutr, identify, noise, ping, relay,
Expand Down Expand Up @@ -83,17 +87,22 @@ async fn main() -> Result<()> {
.build();

client_listen_on_transport(&mut swarm, transport).await?;
client_setup(&mut swarm, &mut redis, relay_addr.clone(), mode).await?;
let id = client_setup(&mut swarm, &mut redis, relay_addr.clone(), mode).await?;

let mut hole_punched_peer_connection = None;

loop {
match (swarm.next().await.unwrap(), hole_punched_peer_connection) {
match (
swarm.next().await.unwrap(),
hole_punched_peer_connection,
id,
) {
(
SwarmEvent::Behaviour(BehaviourEvent::RelayClient(
relay::client::Event::ReservationReqAccepted { .. },
)),
_,
_,
) => {
log::info!("Relay accepted our reservation request.");

Expand All @@ -109,6 +118,7 @@ async fn main() -> Result<()> {
},
)),
_,
_,
) => {
log::info!("Successfully hole-punched to {remote_peer_id}");

Expand All @@ -121,6 +131,7 @@ async fn main() -> Result<()> {
..
})),
Some(hole_punched_connection),
_,
) if mode == Mode::Dial && connection == hole_punched_connection => {
println!("{}", serde_json::to_string(&Report::new(rtt))?);

Expand All @@ -135,12 +146,32 @@ async fn main() -> Result<()> {
},
)),
_,
_,
) => {
log::info!("Failed to hole-punched to {remote_peer_id}");
return Err(anyhow::Error::new(error));
}
(SwarmEvent::OutgoingConnectionError { error, .. }, _) => {
anyhow::bail!(error)
(
SwarmEvent::ListenerClosed {
listener_id,
reason: Err(e),
..
},
_,
Either::Left(reservation),
) if listener_id == reservation => {
anyhow::bail!("Reservation on relay failed: {e}");
}
(
SwarmEvent::OutgoingConnectionError {
connection_id,
error,
..
},
_,
Either::Right(circuit),
) if connection_id == circuit => {
anyhow::bail!("Circuit request relay failed: {error}");
}
_ => {}
}
Expand Down Expand Up @@ -209,23 +240,30 @@ async fn client_setup(
redis: &mut RedisClient,
relay_addr: Multiaddr,
mode: Mode,
) -> Result<()> {
match mode {
) -> Result<Either<ListenerId, ConnectionId>> {
let either = match mode {
Mode::Listen => {
swarm.listen_on(relay_addr.with(Protocol::P2pCircuit))?;
let id = swarm.listen_on(relay_addr.with(Protocol::P2pCircuit))?;

Either::Left(id)
}
Mode::Dial => {
let remote_peer_id = redis.pop(LISTEN_CLIENT_PEER_ID).await?;

swarm.dial(
let opts = DialOpts::from(
relay_addr
.with(Protocol::P2pCircuit)
.with(Protocol::P2p(remote_peer_id)),
)?;
);
let id = opts.connection_id();

swarm.dial(opts)?;

Either::Right(id)
}
};

Ok(())
Ok(either)
}

fn tcp_addr(addr: IpAddr) -> Multiaddr {
Expand Down
5 changes: 5 additions & 0 deletions misc/futures-bounded/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.2.1 - unreleased

- Add `.len()` getter to `FuturesMap`, `FuturesSet`, `StreamMap` and `StreamSet`.
See [PR 4745](https://github.com/libp2p/rust-lib2pp/pulls/4745).

## 0.2.0

- Add `StreamMap` type and remove `Future`-suffix from `PushError::ReplacedFuture` to reuse it for `StreamMap`.
Expand Down
2 changes: 1 addition & 1 deletion misc/futures-bounded/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-bounded"
version = "0.2.0"
version = "0.2.1"
edition = "2021"
rust-version.workspace = true
license = "MIT"
Expand Down
4 changes: 4 additions & 0 deletions misc/futures-bounded/src/futures_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ where
}
}

pub fn len(&self) -> usize {
self.inner.len()
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
Expand Down
4 changes: 4 additions & 0 deletions misc/futures-bounded/src/futures_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ impl<O> FuturesSet<O> {
}
}

pub fn len(&self) -> usize {
self.inner.len()
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
Expand Down
6 changes: 5 additions & 1 deletion misc/futures-bounded/src/stream_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ where
Some(inner)
}

pub fn len(&self) -> usize {
self.inner.len()
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
Expand Down Expand Up @@ -256,7 +260,7 @@ mod tests {

assert!(poll.is_pending());
assert_eq!(
streams.inner.len(),
streams.len(),
0,
"resources of cancelled streams are cleaned up properly"
);
Expand Down
4 changes: 4 additions & 0 deletions misc/futures-bounded/src/stream_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ where
}
}

pub fn len(&self) -> usize {
self.inner.len()
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
Expand Down
11 changes: 11 additions & 0 deletions protocols/relay/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

- Fix a rare race condition when making a reservation on a relay that could lead to a failed reservation.
See [PR 4747](https://github.com/libp2p/rust-lib2pp/pulls/4747).
- Propagate errors of relay client to the listener / dialer.
A failed reservation will now appear as `SwarmEvent::ListenerClosed` with the `ListenerId` of the corresponding `Swarm::listen_on` call.
A failed circuit request will now appear as `SwarmEvent::OutgoingConnectionError` with the `ConnectionId` of the corresponding `Swarm::dial` call.
Lastly, a failed reservation or circuit request will **no longer** close the underlying relay connection.
As a result, we remove the following enum variants:
- `relay::client::Event::ReservationReqFailed`
- `relay::client::Event::OutboundCircuitReqFailed`
- `relay::client::Event::InboundCircuitReqDenied`
- `relay::client::Event::InboundCircuitReqDenyFailed`

See [PR 4745](https://github.com/libp2p/rust-lib2pp/pulls/4745).

## 0.16.2

Expand Down
1 change: 1 addition & 0 deletions protocols/relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ libp2p-plaintext = { workspace = true }
libp2p-swarm = { workspace = true, features = ["macros", "async-std"] }
libp2p-yamux = { workspace = true }
quickcheck = { workspace = true }
libp2p-swarm-test = { workspace = true }

# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling
Expand Down
5 changes: 1 addition & 4 deletions protocols/relay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,12 @@ pub mod inbound {
pub mod hop {
pub use crate::protocol::inbound_hop::FatalUpgradeError;
}
pub mod stop {
pub use crate::protocol::inbound_stop::FatalUpgradeError;
}
}

/// Types related to the relay protocol outbound.
pub mod outbound {
pub mod hop {
pub use crate::protocol::outbound_hop::FatalUpgradeError;
pub use crate::protocol::outbound_hop::{ConnectError, ProtocolViolation, ReserveError};
}
pub mod stop {
pub use crate::protocol::outbound_stop::FatalUpgradeError;
Expand Down
43 changes: 4 additions & 39 deletions protocols/relay/src/priv_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) mod transport;

use crate::multiaddr_ext::MultiaddrExt;
use crate::priv_client::handler::Handler;
use crate::protocol::{self, inbound_stop, outbound_hop};
use crate::protocol::{self, inbound_stop};
use bytes::Bytes;
use either::Either;
use futures::channel::mpsc::Receiver;
Expand All @@ -39,8 +39,7 @@ use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm
use libp2p_swarm::dial_opts::DialOpts;
use libp2p_swarm::{
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour,
NotifyHandler, Stream, StreamUpgradeError, THandler, THandlerInEvent, THandlerOutEvent,
ToSwarm,
NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use std::collections::{hash_map, HashMap, VecDeque};
use std::io::{Error, ErrorKind, IoSlice};
Expand All @@ -59,32 +58,15 @@ pub enum Event {
renewal: bool,
limit: Option<protocol::Limit>,
},
ReservationReqFailed {
relay_peer_id: PeerId,
/// Indicates whether the request replaces an existing reservation.
renewal: bool,
error: StreamUpgradeError<outbound_hop::ReservationFailedReason>,
},
OutboundCircuitEstablished {
relay_peer_id: PeerId,
limit: Option<protocol::Limit>,
},
OutboundCircuitReqFailed {
relay_peer_id: PeerId,
error: StreamUpgradeError<outbound_hop::CircuitFailedReason>,
},
/// An inbound circuit has been established.
InboundCircuitEstablished {
src_peer_id: PeerId,
limit: Option<protocol::Limit>,
},
/// An inbound circuit request has been denied.
InboundCircuitReqDenied { src_peer_id: PeerId },
/// Denying an inbound circuit request failed.
InboundCircuitReqDenyFailed {
src_peer_id: PeerId,
error: inbound_stop::UpgradeError,
},
}

/// [`NetworkBehaviour`] implementation of the relay client
Expand Down Expand Up @@ -252,32 +234,15 @@ impl NetworkBehaviour for Behaviour {
limit,
}
}
handler::Event::ReservationReqFailed { renewal, error } => {
Event::ReservationReqFailed {
relay_peer_id: event_source,
renewal,
error,
}
}
handler::Event::OutboundCircuitEstablished { limit } => {
Event::OutboundCircuitEstablished {
relay_peer_id: event_source,
limit,
}
}
handler::Event::OutboundCircuitReqFailed { error } => Event::OutboundCircuitReqFailed {
relay_peer_id: event_source,
error,
},
handler::Event::InboundCircuitEstablished { src_peer_id, limit } => {
Event::InboundCircuitEstablished { src_peer_id, limit }
}
handler::Event::InboundCircuitReqDenied { src_peer_id } => {
Event::InboundCircuitReqDenied { src_peer_id }
}
handler::Event::InboundCircuitReqDenyFailed { src_peer_id, error } => {
Event::InboundCircuitReqDenyFailed { src_peer_id, error }
}
};

self.queued_actions.push_back(ToSwarm::GenerateEvent(event))
Expand Down Expand Up @@ -336,7 +301,7 @@ impl NetworkBehaviour for Behaviour {
peer_id: relay_peer_id,
handler: NotifyHandler::One(*connection_id),
event: Either::Left(handler::In::EstablishCircuit {
send_back,
to_dial: send_back,
dst_peer_id,
}),
},
Expand All @@ -350,7 +315,7 @@ impl NetworkBehaviour for Behaviour {
self.pending_handler_commands.insert(
connection_id,
handler::In::EstablishCircuit {
send_back,
to_dial: send_back,
dst_peer_id,
},
);
Expand Down
Loading