Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(gossipsub): gracefully disable handler on stream errors #3625

Merged
merged 42 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1264345
Gossipsub: remove `ConnectionHandlerEvent::Close`
Mar 16, 2023
3c2fbce
Move error handling closer to the source.
Mar 18, 2023
b8fed53
Address PR comments.
Mar 21, 2023
f4cfbc3
Address PR comments.
Mar 21, 2023
e7e96ed
Address PR comments.
Mar 21, 2023
e37ba58
Use `void` instead of panic
thomaseizinger Mar 21, 2023
b6be9ce
Check created streams counter also for failed upgrades
thomaseizinger Mar 21, 2023
1e06367
Merge branch 'libp2p:master' into deprecate/gossipsub-close-event
vnermolaev Mar 22, 2023
415f648
Update changelog.
Mar 22, 2023
f87949d
Fix typo in changelog
thomaseizinger Mar 22, 2023
9e12f9d
Remove outdated comments
thomaseizinger Mar 22, 2023
ee6cb02
Extract utility functions for classifying `ConnectionEvent`
thomaseizinger Mar 22, 2023
3443a69
Set `outbound_substream_establishing` in a single place
thomaseizinger Mar 22, 2023
fef9751
Flatten match
thomaseizinger Mar 22, 2023
7dec223
Avoid being stuck in "Poisoned" state for outbound streams
thomaseizinger Mar 23, 2023
3163213
Don't handle error that is never constructed
thomaseizinger Mar 23, 2023
e28af53
Deprecate `HandlerError` entirely
thomaseizinger Mar 23, 2023
0507493
Track # of outbound streams requested, not successfully established
mxinden Mar 29, 2023
b572895
Re-enqueue message when outbound stream fails
mxinden Mar 29, 2023
12e9b53
Use early return instead of if-else
thomaseizinger Mar 29, 2023
fd4958d
Only send messages in `poll`
thomaseizinger Mar 29, 2023
44dce05
Fix use of `DialUpgradeError` in `is_inbound` check
thomaseizinger Mar 30, 2023
6a5f1d0
Merge branch 'master' into deprecate/gossipsub-close-event
thomaseizinger Apr 3, 2023
3432ac0
Move changelog entry
thomaseizinger Apr 3, 2023
db59d23
Bump version
thomaseizinger Apr 3, 2023
b94ec28
Change log level to warn for bad events
thomaseizinger Apr 3, 2023
c5e3c41
Don't end log messages with periods
thomaseizinger Apr 3, 2023
c02a3a3
Use exhaustive match
thomaseizinger Apr 4, 2023
e94c2c7
Make error message consistently `debug` and use same wording
thomaseizinger Apr 4, 2023
a7ed378
Merge branch 'master' into deprecate/gossipsub-close-event
thomaseizinger Apr 4, 2023
798ef5c
Update protocols/gossipsub/src/handler.rs
mxinden Apr 5, 2023
bbdf8f5
chore: bump libp2p-swarm to v0.42.2
mxinden Apr 5, 2023
af21589
Update Cargo.lock
mxinden Apr 5, 2023
f999f3e
Refactor keep alive mechanism
mxinden Apr 6, 2023
397afa2
Do minor clean up
mxinden Apr 6, 2023
9f44adc
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into d…
mxinden Apr 6, 2023
b01e86f
Add debug for dropped message
mxinden Apr 6, 2023
552cb08
Track MAX_SUBSTREAM_ATTEMPTS in on_connection_event
mxinden Apr 6, 2023
b42e71e
Update swarm/CHANGELOG.md
mxinden Apr 6, 2023
a958b60
Merge branch 'master' into deprecate/gossipsub-close-event
mxinden Apr 11, 2023
d673ed2
Merge branch 'master' into deprecate/gossipsub-close-event
mxinden Apr 12, 2023
7cb4e41
Merge branch 'master' into deprecate/gossipsub-close-event
mxinden Apr 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions protocols/gossipsub/src/error_priv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,18 @@ impl From<SigningError> for PublishError {
/// Errors that can occur in the protocols handler.
#[derive(Debug, Error)]
pub enum HandlerError {
#[deprecated(note = "This error will not be emitted")]
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
#[error("The maximum number of inbound substreams created has been exceeded.")]
MaxInboundSubstreams,
#[deprecated(note = "This error will not be emitted")]
#[error("The maximum number of outbound substreams created has been exceeded.")]
MaxOutboundSubstreams,
#[error("The message exceeds the maximum transmission size.")]
MaxTransmissionSize,
#[deprecated(note = "This error will not be emitted")]
#[error("Protocol negotiation timeout.")]
NegotiationTimeout,
#[deprecated(note = "This error will not be emitted")]
#[error("Protocol negotiation failed.")]
NegotiationProtocolError(ProtocolError),
#[error("Failed to encode or decode")]
Expand Down
132 changes: 61 additions & 71 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ use libp2p_swarm::NegotiatedSubstream;
use log::{error, trace, warn};
use smallvec::SmallVec;
use std::{
collections::VecDeque,
io,
pin::Pin,
task::{Context, Poll},
time::Duration,
Expand Down Expand Up @@ -124,9 +122,6 @@ pub struct Handler {
/// The amount of time we allow idle connections before disconnecting.
idle_timeout: Duration,

/// Collection of errors from attempting an upgrade.
upgrade_errors: VecDeque<ConnectionHandlerUpgrErr<HandlerError>>,

/// Flag determining whether to maintain the connection to the peer.
keep_alive: KeepAlive,

Expand Down Expand Up @@ -174,7 +169,6 @@ impl Handler {
peer_kind_sent: false,
protocol_unsupported: false,
idle_timeout,
upgrade_errors: VecDeque::new(),
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)),
in_mesh: false,
}
Expand Down Expand Up @@ -289,44 +283,15 @@ impl ConnectionHandler for Handler {
Self::Error,
>,
> {
// Handle any upgrade errors
if let Some(error) = self.upgrade_errors.pop_front() {
let reported_error = match error {
// Timeout errors get mapped to NegotiationTimeout and we close the connection.
ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => {
Some(HandlerError::NegotiationTimeout)
}
// There was an error post negotiation, close the connection.
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => {
match negotiation_error {
NegotiationError::Failed => {
// The protocol is not supported
self.protocol_unsupported = true;
if !self.peer_kind_sent {
self.peer_kind_sent = true;
// clear all substreams so the keep alive returns false
self.inbound_substream = None;
self.outbound_substream = None;
self.keep_alive = KeepAlive::No;
return Poll::Ready(ConnectionHandlerEvent::Custom(
HandlerEvent::PeerKind(PeerKind::NotSupported),
));
} else {
None
}
}
NegotiationError::ProtocolError(e) => {
Some(HandlerError::NegotiationProtocolError(e))
}
}
}
};

// If there was a fatal error, close the connection.
if let Some(error) = reported_error {
return Poll::Ready(ConnectionHandlerEvent::Close(error));
}
if self.protocol_unsupported && !self.peer_kind_sent {
self.peer_kind_sent = true;
// clear all substreams so the keep alive returns false
self.inbound_substream = None;
self.outbound_substream = None;
self.keep_alive = KeepAlive::No;
return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind(
PeerKind::NotSupported,
)));
}

if !self.peer_kind_sent {
Expand All @@ -338,23 +303,14 @@ impl ConnectionHandler for Handler {
}
}

if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION {
// Too many inbound substreams have been created, end the connection.
return Poll::Ready(ConnectionHandlerEvent::Close(
HandlerError::MaxInboundSubstreams,
));
}
// Invariant: `self.inbound_substreams_created < MAX_SUBSTREAM_CREATION`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding an assert! or debug_assert! (preference for the former) here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking a closer look, I don't think this invariant actually holds.

In on_connection_event we drop new inbound requests on ==:

        if event.is_inbound() && self.inbound_substreams_created == MAX_SUBSTREAM_CREATION {
            // Too many inbound substreams have been created, disable the handler.
            self.keep_alive = KeepAlive::No;
            log::warn!("The maximum number of inbound substreams created has been exceeded");
            return;
        }

Thus self.inbound_substreams may never be > but it could be == and thus the invariant of < does not hold.

@thomaseizinger am I missing something?


// determine if we need to create the stream
if !self.send_queue.is_empty()
&& self.outbound_substream.is_none()
&& !self.outbound_substream_establishing
{
if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION {
return Poll::Ready(ConnectionHandlerEvent::Close(
HandlerError::MaxOutboundSubstreams,
));
}
// Invariant: `self.outbound_substreams_created < MAX_SUBSTREAM_CREATION`.
let message = self.send_queue.remove(0);
self.send_queue.shrink_to_fit();
self.outbound_substream_establishing = true;
Expand Down Expand Up @@ -475,14 +431,17 @@ impl ConnectionHandler for Handler {
Some(OutboundSubstreamState::WaitingOutput(substream));
}
Err(e) => {
error!("Error sending message: {}", e);
return Poll::Ready(ConnectionHandlerEvent::Close(e));
log::debug!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logs for errors on outbound substreams have all been downgraded to debug while the inbound ones are kept as warn. I would expect the pub part of gossipsub to be as important (if not more) as the sub part to keep these all as warn at least

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, a warn always means "Should I wake an ops person at 3am because of this"? Connections can die at any time because somebody e.g. closes their laptop. That is not a reason to wake an ops person IMO, hence I am gonna downgrade the inbound streams to debug instead upgrading the outbound ones to warn.

"Outbound substream error while sending output: {e}"
);
self.outbound_substream = None;
break;
}
}
}
Poll::Ready(Err(e)) => {
error!("Outbound substream error while sending output: {:?}", e);
return Poll::Ready(ConnectionHandlerEvent::Close(e));
error!("Outbound substream error while sending output: {e:?}");
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
break;
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
Poll::Pending => {
self.keep_alive = KeepAlive::Yes;
Expand All @@ -504,7 +463,8 @@ impl ConnectionHandler for Handler {
Some(OutboundSubstreamState::WaitingOutput(substream))
}
Poll::Ready(Err(e)) => {
return Poll::Ready(ConnectionHandlerEvent::Close(e))
log::debug!("Outbound substream error while flushing output: {e}");
break;
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
Poll::Pending => {
self.keep_alive = KeepAlive::Yes;
Expand All @@ -525,14 +485,8 @@ impl ConnectionHandler for Handler {
break;
}
Poll::Ready(Err(e)) => {
warn!("Outbound substream error while closing: {:?}", e);
return Poll::Ready(ConnectionHandlerEvent::Close(
io::Error::new(
io::ErrorKind::BrokenPipe,
"Failed to close outbound substream",
)
.into(),
));
warn!("Outbound substream error while closing: {e:?}");
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
break;
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
Poll::Pending => {
self.keep_alive = KeepAlive::No;
Expand Down Expand Up @@ -566,15 +520,51 @@ impl ConnectionHandler for Handler {
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
if self.inbound_substreams_created == MAX_SUBSTREAM_CREATION {
// Too many inbound substreams have been created, end the connection.
self.keep_alive = KeepAlive::No;
log::info!(
"The maximum number of inbound substreams created has been exceeded."
);
return;
}

self.on_fully_negotiated_inbound(fully_negotiated_inbound)
}
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
if self.outbound_substreams_created == MAX_SUBSTREAM_CREATION {
self.keep_alive = KeepAlive::No;
log::info!(
"The maximum number of outbound substreams created has been exceeded"
);
return;
}

self.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::DialUpgradeError(DialUpgradeError { error: e, .. }) => {
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
self.outbound_substream_establishing = false;
warn!("Dial upgrade error {:?}", e);
self.upgrade_errors.push_back(e);

match error {
// Timeout errors get mapped to NegotiationTimeout and we close the connection.
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => {
log::info!("Dial upgrade error: Protocol negotiation timeout.");
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
// There was an error post negotiation, close the connection.
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
log::info!("Dial upgrade error: {e}");
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed,
)) => {
// The protocol is not supported
self.protocol_unsupported = true;
log::info!("Dial upgrade error: {}", NegotiationError::Failed);
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e),
)) => log::info!("Protocol negotiation failed. {e}"),
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
}
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {}
}
Expand Down