Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm): don't close connection in OneShotHandler #4715

Merged
merged 9 commits into from
Oct 24, 2023
19 changes: 14 additions & 5 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm};
use libp2p_swarm::{
dial_opts::DialOpts, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler,
OneShotHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
dial_opts::DialOpts, CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour,
NotifyHandler, OneShotHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent,
ToSwarm,
};
use log::warn;
use smallvec::SmallVec;
Expand Down Expand Up @@ -354,13 +355,21 @@ impl NetworkBehaviour for Floodsub {
fn on_connection_handler_event(
&mut self,
propagation_source: PeerId,
_connection_id: ConnectionId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
// We ignore successful sends or timeouts.
let event = match event {
InnerMessage::Rx(event) => event,
InnerMessage::Sent => return,
Ok(InnerMessage::Rx(event)) => event,
Ok(InnerMessage::Sent) => return,
Err(e) => {
log::debug!("Failed to send floodsub message: {e}");
self.events.push_back(ToSwarm::CloseConnection {
peer_id: propagation_source,
connection: CloseConnection::One(connection_id),
});
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
return;
}
};

// Update connected peers topics
Expand Down
3 changes: 3 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
See [PR 4225](https://github.com/libp2p/rust-libp2p/pull/4225).
- Remove deprecated `keep_alive_timeout` in `OneShotHandlerConfig`.
See [PR 4677](https://github.com/libp2p/rust-libp2p/pull/4677).
- Don't close entire connection upon `DialFailure`s within `OneShotHandler`.
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
Instead, the error is reported as `Err(e)` via `ConnectionHandler::ToBehaviour`.
See [PR 4715](https://github.com/libp2p/rust-libp2p/pull/4715).

## 0.43.6

Expand Down
26 changes: 8 additions & 18 deletions swarm/src/handler/one_shot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

use crate::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError,
SubstreamProtocol,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol,
};
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend};
use crate::StreamUpgradeError;
use smallvec::SmallVec;
use std::{error, fmt::Debug, task::Context, task::Poll, time::Duration};

Expand All @@ -35,10 +35,8 @@ where
{
/// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<TInbound, ()>,
/// If `Some`, something bad happened and we should shut down the handler with an error.
pending_error: Option<StreamUpgradeError<<TOutbound as OutboundUpgradeSend>::Error>>,
/// Queue of events to produce in `poll()`.
events_out: SmallVec<[TEvent; 4]>,
events_out: SmallVec<[Result<TEvent, StreamUpgradeError<TOutbound::Error>>; 4]>,
/// Queue of outbound substreams to open.
dial_queue: SmallVec<[TOutbound; 4]>,
/// Current number of concurrent outbound substreams being opened.
Expand All @@ -60,7 +58,6 @@ where
) -> Self {
OneShotHandler {
listen_protocol,
pending_error: None,
events_out: SmallVec::new(),
dial_queue: SmallVec::new(),
dial_negotiated: 0,
Expand Down Expand Up @@ -121,8 +118,8 @@ where
TEvent: Debug + Send + 'static,
{
type FromBehaviour = TOutbound;
type ToBehaviour = TEvent;
type Error = StreamUpgradeError<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>;
type ToBehaviour = Result<TEvent, StreamUpgradeError<TOutbound::Error>>;
type Error = void::Void;
type InboundProtocol = TInbound;
type OutboundProtocol = TOutbound;
type OutboundOpenInfo = ();
Expand Down Expand Up @@ -151,10 +148,6 @@ where
Self::Error,
>,
> {
if let Some(err) = self.pending_error.take() {
return Poll::Ready(ConnectionHandlerEvent::Close(err));
}

if !self.events_out.is_empty() {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
self.events_out.remove(0),
Expand Down Expand Up @@ -197,20 +190,17 @@ where
protocol: out,
..
}) => {
self.events_out.push(out.into());
self.events_out.push(Ok(out.into()));
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol: out,
..
}) => {
self.dial_negotiated -= 1;
self.events_out.push(out.into());
self.events_out.push(Ok(out.into()));
}
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
if self.pending_error.is_none() {
log::debug!("DialUpgradeError: {error}");
self.keep_alive = KeepAlive::No;
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
self.events_out.push(Err(error));
}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
Expand Down