diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 2c9212865535..9aa7324508ee 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -37,7 +37,6 @@ use log::{error, trace, warn}; use smallvec::SmallVec; use std::{ collections::VecDeque, - io, pin::Pin, task::{Context, Poll}, time::Duration, @@ -291,13 +290,16 @@ impl ConnectionHandler for Handler { > { // Handle any upgrade errors if let Some(error) = self.upgrade_errors.pop_front() { - let reported_error = match error { + match error { // Timeout errors get mapped to NegotiationTimeout and we close the connection. ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { Some(HandlerError::NegotiationTimeout) } // There was an error post negotiation, close the connection. - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e), + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { + self.keep_alive = KeepAlive::No; + log::info!("Gossipsub error: {e}"); + } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => { match negotiation_error { NegotiationError::Failed => { @@ -317,15 +319,14 @@ impl ConnectionHandler for Handler { } } NegotiationError::ProtocolError(e) => { - Some(HandlerError::NegotiationProtocolError(e)) + self.keep_alive = KeepAlive::No; + log::info!( + "Gossipsub error: {}", + HandlerError::NegotiationProtocolError(e) + ); } } } - }; - - // If there was a fatal error, close the connection. - if let Some(error) = reported_error { - return Poll::Ready(ConnectionHandlerEvent::Close(error)); } } @@ -340,9 +341,8 @@ impl ConnectionHandler for Handler { if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION { // Too many inbound substreams have been created, end the connection. - return Poll::Ready(ConnectionHandlerEvent::Close( - HandlerError::MaxInboundSubstreams, - )); + self.keep_alive = KeepAlive::No; + log::info!("Gossipsub error: {}", HandlerError::MaxInboundSubstreams); } // determine if we need to create the stream @@ -351,9 +351,8 @@ impl ConnectionHandler for Handler { && !self.outbound_substream_establishing { if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION { - return Poll::Ready(ConnectionHandlerEvent::Close( - HandlerError::MaxOutboundSubstreams, - )); + self.keep_alive = KeepAlive::No; + log::info!("Gossipsub error: {}", HandlerError::MaxInboundSubstreams); } let message = self.send_queue.remove(0); self.send_queue.shrink_to_fit(); @@ -475,14 +474,14 @@ impl ConnectionHandler for Handler { Some(OutboundSubstreamState::WaitingOutput(substream)); } Err(e) => { - error!("Error sending message: {}", e); - return Poll::Ready(ConnectionHandlerEvent::Close(e)); + error!("Error sending message: {e}"); + break; } } } Poll::Ready(Err(e)) => { - error!("Outbound substream error while sending output: {:?}", e); - return Poll::Ready(ConnectionHandlerEvent::Close(e)); + error!("Outbound substream error while sending output: {e:?}"); + break; } Poll::Pending => { self.keep_alive = KeepAlive::Yes; @@ -504,7 +503,8 @@ impl ConnectionHandler for Handler { Some(OutboundSubstreamState::WaitingOutput(substream)) } Poll::Ready(Err(e)) => { - return Poll::Ready(ConnectionHandlerEvent::Close(e)) + log::info!("NegotaitedSubstream error: {e}"); + break; } Poll::Pending => { self.keep_alive = KeepAlive::Yes; @@ -525,14 +525,8 @@ impl ConnectionHandler for Handler { break; } Poll::Ready(Err(e)) => { - warn!("Outbound substream error while closing: {:?}", e); - return Poll::Ready(ConnectionHandlerEvent::Close( - io::Error::new( - io::ErrorKind::BrokenPipe, - "Failed to close outbound substream", - ) - .into(), - )); + warn!("Outbound substream error while closing: {e:?}"); + break; } Poll::Pending => { self.keep_alive = KeepAlive::No;