-
Notifications
You must be signed in to change notification settings - Fork 999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(gossipsub): gracefully disable handler on stream errors #3625
Changes from 7 commits
1264345
3c2fbce
b8fed53
f4cfbc3
e7e96ed
e37ba58
b6be9ce
1e06367
415f648
f87949d
9e12f9d
ee6cb02
3443a69
fef9751
7dec223
3163213
e28af53
0507493
b572895
12e9b53
fd4958d
44dce05
6a5f1d0
3432ac0
db59d23
b94ec28
c5e3c41
c02a3a3
e94c2c7
a7ed378
798ef5c
bbdf8f5
af21589
f999f3e
397afa2
9f44adc
b01e86f
552cb08
b42e71e
a958b60
d673ed2
7cb4e41
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -33,11 +33,8 @@ use libp2p_swarm::handler::{ | |||
SubstreamProtocol, | ||||
}; | ||||
use libp2p_swarm::NegotiatedSubstream; | ||||
use log::{error, trace, warn}; | ||||
use smallvec::SmallVec; | ||||
use std::{ | ||||
collections::VecDeque, | ||||
io, | ||||
pin::Pin, | ||||
task::{Context, Poll}, | ||||
time::Duration, | ||||
|
@@ -124,9 +121,6 @@ pub struct Handler { | |||
/// The amount of time we allow idle connections before disconnecting. | ||||
idle_timeout: Duration, | ||||
|
||||
/// Collection of errors from attempting an upgrade. | ||||
upgrade_errors: VecDeque<ConnectionHandlerUpgrErr<HandlerError>>, | ||||
|
||||
/// Flag determining whether to maintain the connection to the peer. | ||||
keep_alive: KeepAlive, | ||||
|
||||
|
@@ -174,7 +168,6 @@ impl Handler { | |||
peer_kind_sent: false, | ||||
protocol_unsupported: false, | ||||
idle_timeout, | ||||
upgrade_errors: VecDeque::new(), | ||||
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)), | ||||
in_mesh: false, | ||||
} | ||||
|
@@ -202,7 +195,7 @@ impl Handler { | |||
} | ||||
|
||||
// new inbound substream. Replace the current one, if it exists. | ||||
trace!("New inbound substream request"); | ||||
log::trace!("New inbound substream request"); | ||||
self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream)); | ||||
} | ||||
|
||||
|
@@ -234,7 +227,7 @@ impl Handler { | |||
// Should never establish a new outbound substream if one already exists. | ||||
// If this happens, an outbound message is not sent. | ||||
if self.outbound_substream.is_some() { | ||||
warn!("Established an outbound substream with one already available"); | ||||
log::warn!("Established an outbound substream with one already available"); | ||||
// Add the message back to the send queue | ||||
self.send_queue.push(message); | ||||
} else { | ||||
|
@@ -289,44 +282,15 @@ impl ConnectionHandler for Handler { | |||
Self::Error, | ||||
>, | ||||
> { | ||||
// Handle any upgrade errors | ||||
if let Some(error) = self.upgrade_errors.pop_front() { | ||||
let reported_error = match error { | ||||
// Timeout errors get mapped to NegotiationTimeout and we close the connection. | ||||
ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { | ||||
Some(HandlerError::NegotiationTimeout) | ||||
} | ||||
// There was an error post negotiation, close the connection. | ||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e), | ||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => { | ||||
match negotiation_error { | ||||
NegotiationError::Failed => { | ||||
// The protocol is not supported | ||||
self.protocol_unsupported = true; | ||||
if !self.peer_kind_sent { | ||||
self.peer_kind_sent = true; | ||||
// clear all substreams so the keep alive returns false | ||||
self.inbound_substream = None; | ||||
self.outbound_substream = None; | ||||
self.keep_alive = KeepAlive::No; | ||||
return Poll::Ready(ConnectionHandlerEvent::Custom( | ||||
HandlerEvent::PeerKind(PeerKind::NotSupported), | ||||
)); | ||||
} else { | ||||
None | ||||
} | ||||
} | ||||
NegotiationError::ProtocolError(e) => { | ||||
Some(HandlerError::NegotiationProtocolError(e)) | ||||
} | ||||
} | ||||
} | ||||
}; | ||||
|
||||
// If there was a fatal error, close the connection. | ||||
if let Some(error) = reported_error { | ||||
return Poll::Ready(ConnectionHandlerEvent::Close(error)); | ||||
} | ||||
if self.protocol_unsupported && !self.peer_kind_sent { | ||||
self.peer_kind_sent = true; | ||||
// clear all substreams so the keep alive returns false | ||||
self.inbound_substream = None; | ||||
self.outbound_substream = None; | ||||
self.keep_alive = KeepAlive::No; | ||||
return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind( | ||||
PeerKind::NotSupported, | ||||
))); | ||||
} | ||||
|
||||
if !self.peer_kind_sent { | ||||
|
@@ -338,23 +302,14 @@ impl ConnectionHandler for Handler { | |||
} | ||||
} | ||||
|
||||
if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION { | ||||
// Too many inbound substreams have been created, end the connection. | ||||
return Poll::Ready(ConnectionHandlerEvent::Close( | ||||
HandlerError::MaxInboundSubstreams, | ||||
)); | ||||
} | ||||
// Invariant: `self.inbound_substreams_created < MAX_SUBSTREAM_CREATION`. | ||||
|
||||
// determine if we need to create the stream | ||||
if !self.send_queue.is_empty() | ||||
&& self.outbound_substream.is_none() | ||||
&& !self.outbound_substream_establishing | ||||
{ | ||||
if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION { | ||||
return Poll::Ready(ConnectionHandlerEvent::Close( | ||||
HandlerError::MaxOutboundSubstreams, | ||||
)); | ||||
} | ||||
// Invariant: `self.outbound_substreams_created < MAX_SUBSTREAM_CREATION`. | ||||
let message = self.send_queue.remove(0); | ||||
self.send_queue.shrink_to_fit(); | ||||
self.outbound_substream_establishing = true; | ||||
|
@@ -383,12 +338,12 @@ impl ConnectionHandler for Handler { | |||
Poll::Ready(Some(Err(error))) => { | ||||
match error { | ||||
HandlerError::MaxTransmissionSize => { | ||||
warn!("Message exceeded the maximum transmission size"); | ||||
log::warn!("Message exceeded the maximum transmission size"); | ||||
self.inbound_substream = | ||||
Some(InboundSubstreamState::WaitingInput(substream)); | ||||
} | ||||
_ => { | ||||
warn!("Inbound stream error: {}", error); | ||||
log::warn!("Inbound stream error: {}", error); | ||||
// More serious errors, close this side of the stream. If the | ||||
// peer is still around, they will re-establish their | ||||
// connection | ||||
|
@@ -399,7 +354,7 @@ impl ConnectionHandler for Handler { | |||
} | ||||
// peer closed the stream | ||||
Poll::Ready(None) => { | ||||
warn!("Peer closed their outbound stream"); | ||||
log::warn!("Peer closed their outbound stream"); | ||||
self.inbound_substream = | ||||
Some(InboundSubstreamState::Closing(substream)); | ||||
} | ||||
|
@@ -417,7 +372,7 @@ impl ConnectionHandler for Handler { | |||
// Don't close the connection but just drop the inbound substream. | ||||
// In case the remote has more to send, they will open up a new | ||||
// substream. | ||||
warn!("Inbound substream error while closing: {:?}", e); | ||||
log::warn!("Inbound substream error while closing: {e}"); | ||||
} | ||||
self.inbound_substream = None; | ||||
if self.outbound_substream.is_none() { | ||||
|
@@ -470,19 +425,22 @@ impl ConnectionHandler for Handler { | |||
Some(OutboundSubstreamState::PendingFlush(substream)) | ||||
} | ||||
Err(HandlerError::MaxTransmissionSize) => { | ||||
error!("Message exceeded the maximum transmission size and was not sent."); | ||||
log::error!("Message exceeded the maximum transmission size and was not sent."); | ||||
self.outbound_substream = | ||||
Some(OutboundSubstreamState::WaitingOutput(substream)); | ||||
} | ||||
Err(e) => { | ||||
error!("Error sending message: {}", e); | ||||
return Poll::Ready(ConnectionHandlerEvent::Close(e)); | ||||
log::debug!( | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logs for errors on outbound substreams have all been downgraded to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To me, a |
||||
"Outbound substream error while sending output: {e}" | ||||
); | ||||
self.outbound_substream = None; | ||||
break; | ||||
} | ||||
} | ||||
} | ||||
Poll::Ready(Err(e)) => { | ||||
error!("Outbound substream error while sending output: {:?}", e); | ||||
return Poll::Ready(ConnectionHandlerEvent::Close(e)); | ||||
log::debug!("Outbound substream error while sending output: {e}"); | ||||
break; | ||||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
} | ||||
Poll::Pending => { | ||||
self.keep_alive = KeepAlive::Yes; | ||||
|
@@ -504,7 +462,8 @@ impl ConnectionHandler for Handler { | |||
Some(OutboundSubstreamState::WaitingOutput(substream)) | ||||
} | ||||
Poll::Ready(Err(e)) => { | ||||
return Poll::Ready(ConnectionHandlerEvent::Close(e)) | ||||
log::debug!("Outbound substream error while flushing output: {e}"); | ||||
break; | ||||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
} | ||||
Poll::Pending => { | ||||
self.keep_alive = KeepAlive::Yes; | ||||
|
@@ -525,14 +484,8 @@ impl ConnectionHandler for Handler { | |||
break; | ||||
} | ||||
Poll::Ready(Err(e)) => { | ||||
warn!("Outbound substream error while closing: {:?}", e); | ||||
return Poll::Ready(ConnectionHandlerEvent::Close( | ||||
io::Error::new( | ||||
io::ErrorKind::BrokenPipe, | ||||
"Failed to close outbound substream", | ||||
) | ||||
.into(), | ||||
)); | ||||
log::debug!("Outbound substream error while closing: {e}"); | ||||
break; | ||||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
} | ||||
Poll::Pending => { | ||||
self.keep_alive = KeepAlive::No; | ||||
|
@@ -564,17 +517,65 @@ impl ConnectionHandler for Handler { | |||
Self::OutboundOpenInfo, | ||||
>, | ||||
) { | ||||
// Note: This will get simpler with https://github.com/libp2p/rust-libp2p/pull/3605. | ||||
if matches!( | ||||
event, | ||||
ConnectionEvent::FullyNegotiatedInbound(_) | ||||
| ConnectionEvent::DialUpgradeError(DialUpgradeError { | ||||
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)), // Only `Select` is relevant, the others may be for other handlers too. | ||||
.. | ||||
}) | ||||
) && self.inbound_substreams_created == MAX_SUBSTREAM_CREATION | ||||
{ | ||||
// Too many inbound substreams have been created, disable the handler. | ||||
self.keep_alive = KeepAlive::No; | ||||
log::info!("The maximum number of inbound substreams created has been exceeded."); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this should be a warn, error or a crit. In principle this should never happen and we don't want the connection to just end silently There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The thinking was that Happy to be convinced otherwise. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I'm not all that concerned. I;d be curious to know when to use a warn then. I'd typically use a warn if something has not behaved as it should and is worth notifying a user. But if other parts of libp2p are not doing it, happy to leave as an info. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would we never see this log? We would see this if we connect to a node that doesn't support the gossipsub protocol right? Nothing is inherently broken if that happens so I am not sure a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed that this should rather be a
By default
Agreed.
As far as I can tell a Just to avoid confusion, I am in favor of the above behavior. I don't think there is value in directly requesting another stream in case the remote signaled that it does not support the protocol on the previous stream. Objections? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense! |
||||
return; | ||||
} | ||||
|
||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||
if matches!( | ||||
event, | ||||
ConnectionEvent::FullyNegotiatedOutbound(_) | ConnectionEvent::DialUpgradeError(_) | ||||
) && self.outbound_substreams_created == MAX_SUBSTREAM_CREATION | ||||
{ | ||||
// Too many outbound substreams have been created, disable the handler. | ||||
self.keep_alive = KeepAlive::No; | ||||
log::info!("The maximum number of outbound substreams created has been exceeded."); | ||||
AgeManning marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
return; | ||||
} | ||||
|
||||
match event { | ||||
ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { | ||||
self.on_fully_negotiated_inbound(fully_negotiated_inbound) | ||||
} | ||||
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { | ||||
self.on_fully_negotiated_outbound(fully_negotiated_outbound) | ||||
} | ||||
ConnectionEvent::DialUpgradeError(DialUpgradeError { error: e, .. }) => { | ||||
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { | ||||
self.outbound_substream_establishing = false; | ||||
warn!("Dial upgrade error {:?}", e); | ||||
self.upgrade_errors.push_back(e); | ||||
|
||||
match error { | ||||
// Timeout errors get mapped to NegotiationTimeout and we close the connection. | ||||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { | ||||
log::debug!("Dial upgrade error: Protocol negotiation timeout."); | ||||
} | ||||
// There was an error post negotiation, close the connection. | ||||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { | ||||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
void::unreachable(e) | ||||
} | ||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( | ||||
NegotiationError::Failed, | ||||
)) => { | ||||
// The protocol is not supported | ||||
self.protocol_unsupported = true; | ||||
log::debug!( | ||||
"The remote peer does not support gossipsub on this connection" | ||||
); | ||||
} | ||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( | ||||
NegotiationError::ProtocolError(e), | ||||
)) => log::debug!("Protocol negotiation failed: {e}"), | ||||
} | ||||
} | ||||
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} | ||||
} | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding an
assert!
ordebug_assert!
(preference for the former) here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taking a closer look, I don't think this invariant actually holds.
In
on_connection_event
we drop new inbound requests on==
:Thus
self.inbound_substreams
may never be>
but it could be==
and thus the invariant of<
does not hold.@thomaseizinger am I missing something?