Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossipsub backpressure #549

Merged
merged 7 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
## 0.46.1
## 0.46.1 - unreleased

- Implement backpressure by diferentiating between priority and non priority messages.
Drop `Publish` and `Forward` messages when the queue becomes full.
See [PR 4914](https://github.com/libp2p/rust-libp2p/pull/4914)

- Deprecate `Rpc` in preparation for removing it from the public API because it is an internal type.
See [PR 4833](https://github.com/libp2p/rust-libp2p/pull/4833).
See [PR 4833](https://github.com/libp2p/rust-libp2p/pull/4833).

## 0.46.0

Expand Down
1 change: 1 addition & 0 deletions protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sha2 = "0.10.8"
smallvec = "1.11.2"
tracing = "0.1.37"
void = "1.0.2"
async-channel = "1.9.0"

# Metrics dependencies
prometheus-client = { workspace = true }
Expand Down
153 changes: 108 additions & 45 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use libp2p_swarm::{
THandlerOutEvent, ToSwarm,
};

use crate::backoff::BackoffStorage;
use crate::config::{Config, ValidationMode};
use crate::gossip_promises::GossipPromises;
use crate::handler::{Handler, HandlerEvent, HandlerIn};
Expand All @@ -61,7 +60,8 @@ use crate::types::{
ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription,
SubscriptionAction,
};
use crate::types::{PeerConnections, PeerKind, RpcOut};
use crate::types::{PeerConnections, PeerKind};
use crate::{backoff::BackoffStorage, types::RpcSender};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use instant::SystemTime;
Expand Down Expand Up @@ -332,6 +332,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {

/// Keep track of a set of internal metrics relating to gossipsub.
metrics: Option<Metrics>,

/// Connection handler message queue channels.
handler_send_queues: HashMap<PeerId, RpcSender>,
}

impl<D, F> Behaviour<D, F>
Expand Down Expand Up @@ -471,6 +474,7 @@ where
config,
subscription_filter,
data_transform,
handler_send_queues: Default::default(),
})
}
}
Expand Down Expand Up @@ -534,10 +538,14 @@ where
}

// send subscription request to all peers
for peer in self.peer_topics.keys().copied().collect::<Vec<_>>() {
for peer in self.peer_topics.keys() {
tracing::debug!(%peer, "Sending SUBSCRIBE to peer");
let event = RpcOut::Subscribe(topic_hash.clone());
self.send_message(peer, event);
let sender = self
.handler_send_queues
.get_mut(peer)
.expect("Peerid should exist");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is true, as I recall trying to enforce conditions like this.

But we have to be absolutely sure that there is no code path where a peer can get added or removed from the peer_topics mapping and not the handler_send_queues.

If we are uncertain about this condition in any way (or are concerned a future dev may break this condition), we should log a crit instead imo.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a peer is added when a new ConnectionHandler is created and removed when the connection is closed. As we are using mpmc channels if that PeerId exists it has a respective ConnectionHandler (theoretically). If you prefer we can for now use a crit/error log instead


sender.subscribe(topic_hash.clone());
}

// call JOIN(topic)
Expand All @@ -561,10 +569,14 @@ where
}

// announce to all peers
for peer in self.peer_topics.keys().copied().collect::<Vec<_>>() {
for peer in self.peer_topics.keys() {
tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
let event = RpcOut::Unsubscribe(topic_hash.clone());
self.send_message(peer, event);
let sender = self
.handler_send_queues
.get_mut(peer)
.expect("Peerid should exist");

sender.unsubscribe(topic_hash.clone());
}

// call LEAVE(topic)
Expand Down Expand Up @@ -711,9 +723,23 @@ where
}

// Send to peers we know are subscribed to the topic.
let mut errors = 0;
for peer_id in recipient_peers.iter() {
tracing::trace!(peer=%peer_id, "Sending message to peer");
self.send_message(*peer_id, RpcOut::Publish(raw_message.clone()));
let sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist");

if sender
.publish(raw_message.clone(), self.metrics.as_mut())
.is_err()
{
errors += 1;
}
}
if errors == recipient_peers.len() {
return Err(PublishError::InsufficientPeers);
}

tracing::debug!(message=%msg_id, "Published message");
Expand Down Expand Up @@ -1311,7 +1337,12 @@ where
);
} else {
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
self.send_message(*peer_id, RpcOut::Forward(msg));
let sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist");

sender.forward(msg, self.metrics.as_mut());
}
}
}
Expand Down Expand Up @@ -1464,12 +1495,17 @@ where
if !to_prune_topics.is_empty() {
// build the prune messages to send
let on_unsubscribe = false;
let mut sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist")
.clone();

for action in to_prune_topics
.iter()
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
.collect::<Vec<_>>()
{
self.send_message(*peer_id, RpcOut::Control(action));
sender.control(action);
}
// Send the prune messages to the peer
tracing::debug!(
Expand Down Expand Up @@ -1964,12 +2000,16 @@ where

// If we need to send grafts to peer, do so immediately, rather than waiting for the
// heartbeat.
let sender = self
.handler_send_queues
.get_mut(propagation_source)
.expect("Peerid should exist");

for action in topics_to_graft
.into_iter()
.map(|topic_hash| ControlAction::Graft { topic_hash })
.collect::<Vec<_>>()
{
self.send_message(*propagation_source, RpcOut::Control(action))
sender.control(action);
}

// Notify the application of the subscriptions
Expand Down Expand Up @@ -2504,6 +2544,13 @@ where
// It therefore must be in at least one mesh and we do not need to inform the handler
// of its removal from another.

// send the control messages
let mut sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist")
.clone();

// The following prunes are not due to unsubscribing.
let prunes = to_prune
.remove(&peer)
Expand All @@ -2518,9 +2565,8 @@ where
)
});

// send the control messages
for msg in control_msgs.chain(prunes).collect::<Vec<_>>() {
self.send_message(peer, RpcOut::Control(msg));
for msg in control_msgs.chain(prunes) {
sender.control(msg);
}
}

Expand All @@ -2534,7 +2580,13 @@ where
self.config.do_px() && !no_px.contains(peer),
false,
);
self.send_message(*peer, RpcOut::Control(prune));
let mut sender = self
.handler_send_queues
.get_mut(peer)
.expect("Peerid should exist")
.clone();

sender.control(prune);

// inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -2602,11 +2654,13 @@ where

// forward the message to peers
if !recipient_peers.is_empty() {
let event = RpcOut::Forward(message.clone());

for peer in recipient_peers.iter() {
tracing::debug!(%peer, message=%msg_id, "Sending message to peer");
self.send_message(*peer, event.clone());
let sender = self
.handler_send_queues
.get_mut(peer)
.expect("Peerid should exist");
sender.forward(message.clone(), self.metrics.as_mut());
}
tracing::debug!("Completed forwarding message");
Ok(true)
Expand Down Expand Up @@ -2720,31 +2774,19 @@ where
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
for msg in controls {
self.send_message(peer, RpcOut::Control(msg));
let sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist");

sender.control(msg);
}
}

// This clears all pending IWANT messages
self.pending_iwant_msgs.clear();
}

/// Send a [`RpcOut`] message to a peer. This will wrap the message in an arc if it
/// is not already an arc.
fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) {
if let Some(m) = self.metrics.as_mut() {
if let RpcOut::Publish(ref message) | RpcOut::Forward(ref message) = rpc {
// register bytes sent on the internal metrics.
m.msg_sent(&message.topic, message.raw_protobuf_len());
}
}

self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
event: HandlerIn::Message(rpc),
handler: NotifyHandler::Any,
});
}

fn on_connection_established(
&mut self,
ConnectionEstablished {
Expand Down Expand Up @@ -2810,8 +2852,14 @@ where

tracing::debug!(peer=%peer_id, "New peer connected");
// We need to send our subscriptions to the newly-connected node.
let mut sender = self
.handler_send_queues
.get_mut(&peer_id)
.expect("Peerid should exist")
.clone();

for topic_hash in self.mesh.clone().into_keys() {
self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
sender.subscribe(topic_hash);
}
}

Expand Down Expand Up @@ -2939,6 +2987,7 @@ where
}

self.connected_peers.remove(&peer_id);
self.handler_send_queues.remove(&peer_id);

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.remove_peer(&peer_id);
Expand Down Expand Up @@ -2998,21 +3047,35 @@ where
fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
peer_id: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(self.config.protocol_config()))
let sender = self
.handler_send_queues
.entry(peer_id)
.or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len()));
Ok(Handler::new(
self.config.protocol_config(),
sender.new_receiver(),
))
}

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
peer_id: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(self.config.protocol_config()))
let sender = self
.handler_send_queues
.entry(peer_id)
.or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len()));
Ok(Handler::new(
self.config.protocol_config(),
sender.new_receiver(),
))
}

fn on_connection_handler_event(
Expand Down Expand Up @@ -3380,7 +3443,7 @@ impl fmt::Debug for PublishConfig {
#[cfg(test)]
mod local_test {
use super::*;
use crate::IdentTopic;
use crate::{types::RpcOut, IdentTopic};
use quickcheck::*;

fn test_message() -> RawMessage {
Expand Down
Loading
Loading