Skip to content

Commit

Permalink
Implement message time-bound dropping
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Nov 30, 2023
1 parent a4a3f39 commit 838860c
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 22 deletions.
13 changes: 9 additions & 4 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3054,7 +3054,7 @@ where
let sender = self
.handler_send_queues
.entry(peer_id)
.or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len()));
.or_insert_with(|| RpcSender::new(peer_id, (&self.config).into()));
Ok(Handler::new(
self.config.protocol_config(),
sender.new_receiver(),
Expand All @@ -3071,7 +3071,7 @@ where
let sender = self
.handler_send_queues
.entry(peer_id)
.or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len()));
.or_insert_with(|| RpcSender::new(peer_id, (&self.config).into()));
Ok(Handler::new(
self.config.protocol_config(),
sender.new_receiver(),
Expand Down Expand Up @@ -3115,6 +3115,11 @@ where
}
}
}
HandlerEvent::PublishMessageDropped | HandlerEvent::ForwardMessageDropped => {
// TODO:
// * Build scoring logic to handle peers that are dropping messages
// * Add some metrics to help visualize kinds of messages being dropped
}
HandlerEvent::Message {
rpc,
invalid_messages,
Expand Down Expand Up @@ -3470,8 +3475,8 @@ mod local_test {
match u8::arbitrary(g) % 5 {
0 => RpcOut::Subscribe(IdentTopic::new("TestTopic").hash()),
1 => RpcOut::Unsubscribe(IdentTopic::new("TestTopic").hash()),
2 => RpcOut::Publish(test_message()),
3 => RpcOut::Forward(test_message()),
2 => RpcOut::Publish(test_message(), Instant::now()),
3 => RpcOut::Forward(test_message(), Instant::now()),
4 => RpcOut::Control(test_control()),
_ => panic!("outside range"),
}
Expand Down
33 changes: 31 additions & 2 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub struct Config {
iwant_followup_time: Duration,
published_message_ids_cache_time: Duration,
connection_handler_queue_len: usize,
connection_handler_publish_duration: Duration,
connection_handler_forward_duration: Duration,
}

impl Config {
Expand Down Expand Up @@ -352,10 +354,22 @@ impl Config {
self.published_message_ids_cache_time
}

/// The max number of messages a `ConnectionHandler` can buffer.
/// The max number of messages a `ConnectionHandler` can buffer. The default is 5000.
pub fn connection_handler_queue_len(&self) -> usize {
self.connection_handler_queue_len
}

/// The duration a message to be published can wait to be sent before it is abandoned. The
/// default is 5 seconds.
pub fn publish_queue_duration(&self) -> Duration {
self.connection_handler_publish_duration
}

/// The duration a message to be forwarded can wait to be sent before it is abandoned. The
/// default is 500ms.
pub fn forward_queue_duration(&self) -> Duration {
self.connection_handler_forward_duration
}
}

impl Default for Config {
Expand Down Expand Up @@ -423,7 +437,9 @@ impl Default for ConfigBuilder {
max_ihave_messages: 10,
iwant_followup_time: Duration::from_secs(3),
published_message_ids_cache_time: Duration::from_secs(10),
connection_handler_queue_len: 100,
connection_handler_queue_len: 5000,
connection_handler_publish_duration: Duration::from_secs(5),
connection_handler_forward_duration: Duration::from_millis(500),
},
invalid_protocol: false,
}
Expand Down Expand Up @@ -789,10 +805,23 @@ impl ConfigBuilder {
self
}

/// The max number of messages a `ConnectionHandler` can buffer. The default is 5000.
pub fn connection_handler_queue_len(&mut self, len: usize) {
self.config.connection_handler_queue_len = len;
}

/// The duration a message to be published can wait to be sent before it is abandoned. The
/// default is 5 seconds.
pub fn publish_queue_duration(&mut self, duration: Duration) {
self.config.connection_handler_publish_duration = duration;
}

/// The duration a message to be forwarded can wait to be sent before it is abandoned. The
/// default is 500ms.
pub fn forward_queue_duration(&mut self, duration: Duration) {
self.config.connection_handler_forward_duration = duration;
}

/// Constructs a [`Config`] from the given configuration and validates the settings.
pub fn build(&self) -> Result<Config, ConfigBuilderError> {
// check all constraints on config
Expand Down
46 changes: 45 additions & 1 deletion protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use crate::protocol::{GossipsubCodec, ProtocolConfig};
use crate::rpc_proto::proto;
use crate::types::{PeerKind, RawMessage, Rpc, RpcReceiver};
use crate::types::{PeerKind, RawMessage, Rpc, RpcOut, RpcReceiver};
use crate::ValidationError;
use asynchronous_codec::Framed;
use futures::future::Either;
Expand Down Expand Up @@ -54,6 +54,10 @@ pub enum HandlerEvent {
/// An inbound or outbound substream has been established with the peer and this informs over
/// which protocol. This message only occurs once per connection.
PeerKind(PeerKind),
/// A message to be published was dropped because it could not be sent in time.
PublishMessageDropped,
/// A forward message was dropped because it could not be sent in time.
ForwardMessageDropped,
}

/// A message sent from the behaviour to the handler.
Expand Down Expand Up @@ -239,6 +243,10 @@ impl EnabledHandler {
});
}

// We may need to inform the behviour if we have a dropped a message. This gets set if that
// is the case.
let mut inform_behaviour_of_dropped_message = None;

// process outbound stream
loop {
match std::mem::replace(
Expand All @@ -248,6 +256,35 @@ impl EnabledHandler {
// outbound idle state
Some(OutboundSubstreamState::WaitingOutput(substream)) => {
if let Poll::Ready(Some(message)) = self.send_queue.poll_next_unpin(cx) {
match message {
RpcOut::Publish(_, time_created) => {
if Instant::now()
> time_created

Check failure on line 262 in protocols/gossipsub/src/handler.rs

View workflow job for this annotation

GitHub Actions / Compile on wasm32-unknown-unknown

mismatched types

Check failure on line 262 in protocols/gossipsub/src/handler.rs

View workflow job for this annotation

GitHub Actions / Compile on wasm32-unknown-emscripten

mismatched types
+ self.send_queue.queue_config.publish_message_max_duration
{
// Inform the behaviour and end the poll.
inform_behaviour_of_dropped_message =
Some(HandlerEvent::PublishMessageDropped);
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream));
break;
}
}
RpcOut::Forward(_, time_created) => {
if Instant::now()
> time_created

Check failure on line 275 in protocols/gossipsub/src/handler.rs

View workflow job for this annotation

GitHub Actions / Compile on wasm32-unknown-unknown

mismatched types

Check failure on line 275 in protocols/gossipsub/src/handler.rs

View workflow job for this annotation

GitHub Actions / Compile on wasm32-unknown-emscripten

mismatched types
+ self.send_queue.queue_config.forward_message_max_duration
{
// Inform the behaviour and end the poll.
inform_behaviour_of_dropped_message =
Some(HandlerEvent::ForwardMessageDropped);
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream));
break;
}
}
_ => {} // All other messages are not time-bound.
}
self.outbound_substream = Some(OutboundSubstreamState::PendingSend(
substream,
message.into_protobuf(),
Expand Down Expand Up @@ -317,6 +354,13 @@ impl EnabledHandler {
}
}

// If there was a timeout in sending a message, inform the behaviour before restarting the
// poll
if let Some(handler_event) = inform_behaviour_of_dropped_message {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(handler_event));
}

// Handle inbound messages
loop {
match std::mem::replace(
&mut self.inbound_substream,
Expand Down
67 changes: 52 additions & 15 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

//! A collection of types using the Gossipsub system.
use crate::metrics::Metrics;
use crate::Config;
use crate::TopicHash;
use async_channel::{Receiver, Sender};
use futures::Stream;
Expand All @@ -31,6 +32,7 @@ use std::fmt::Debug;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
use std::time::{Duration, Instant};
use std::{fmt, pin::Pin};

use crate::rpc_proto::proto;
Expand Down Expand Up @@ -242,10 +244,12 @@ pub enum ControlAction {
/// A Gossipsub RPC message sent.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum RpcOut {
/// Publish a Gossipsub message on network.
Publish(RawMessage),
/// Forward a Gossipsub message to the network.
Forward(RawMessage),
/// Publish a Gossipsub message on network. The [`Instant`] tags the time we attempted to
/// send it.
Publish(RawMessage, Instant),
/// Forward a Gossipsub message to the network. The [`Instant`] tags the time we attempted to
/// send it.
Forward(RawMessage, Instant),
/// Subscribe a topic.
Subscribe(TopicHash),
/// Unsubscribe a topic.
Expand All @@ -266,12 +270,12 @@ impl From<RpcOut> for proto::RPC {
/// Converts the RPC into protobuf format.
fn from(rpc: RpcOut) -> Self {
match rpc {
RpcOut::Publish(message) => proto::RPC {
RpcOut::Publish(message, _) => proto::RPC {
subscriptions: Vec::new(),
publish: vec![message.into()],
control: None,
},
RpcOut::Forward(message) => proto::RPC {
RpcOut::Forward(message, _) => proto::RPC {
publish: vec![message.into()],
subscriptions: Vec::new(),
control: None,
Expand Down Expand Up @@ -532,18 +536,21 @@ pub(crate) struct RpcSender {

impl RpcSender {
/// Create a RpcSender.
pub(crate) fn new(peer_id: PeerId, cap: usize) -> RpcSender {
pub(crate) fn new(peer_id: PeerId, queue_config: QueueConfig) -> RpcSender {
let queue_capacity = queue_config.capacity;
let (priority_sender, priority_receiver) = async_channel::unbounded();
let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2);
let (non_priority_sender, non_priority_receiver) =
async_channel::bounded(queue_capacity / 2);
let len = Arc::new(AtomicUsize::new(0));
let receiver = RpcReceiver {
len: len.clone(),
priority_len: len.clone(),
priority: priority_receiver,
non_priority: non_priority_receiver,
queue_config,
};
RpcSender {
peer_id,
cap: cap / 2,
cap: queue_capacity / 2,
len,
priority: priority_sender,
non_priority: non_priority_sender,
Expand Down Expand Up @@ -591,7 +598,7 @@ impl RpcSender {
return Err(());
}
self.priority
.try_send(RpcOut::Publish(message.clone()))
.try_send(RpcOut::Publish(message.clone(), Instant::now()))
.expect("Channel is unbounded and should always be open");
self.len.fetch_add(1, Ordering::Relaxed);

Expand All @@ -605,7 +612,10 @@ impl RpcSender {
/// Send a `RpcOut::Forward` message to the `RpcReceiver`
/// this is high priority. If the queue is full the message is discarded.
pub(crate) fn forward(&mut self, message: RawMessage, metrics: Option<&mut Metrics>) {
if let Err(err) = self.non_priority.try_send(RpcOut::Forward(message.clone())) {
if let Err(err) = self
.non_priority
.try_send(RpcOut::Forward(message.clone(), Instant::now()))
{
let rpc = err.into_inner();
tracing::trace!(
"{:?} message to peer {} dropped, queue is full",
Expand All @@ -624,9 +634,36 @@ impl RpcSender {
/// `RpcOut` sender that is priority aware.
#[derive(Debug, Clone)]
pub struct RpcReceiver {
len: Arc<AtomicUsize>,
/// The maximum length of the priority queue.
priority_len: Arc<AtomicUsize>,
/// The priority queue receiver.
pub(crate) priority: Receiver<RpcOut>,
/// The non priority queue receiver.
pub(crate) non_priority: Receiver<RpcOut>,
/// The maximum duration to leave non-priority messages in the queue.
pub(crate) queue_config: QueueConfig,
}

/// The maximum durations for messages in a queue.
#[derive(Debug, Clone)]
pub(crate) struct QueueConfig {
/// The maximum size of the priority queue.
capacity: usize,
/// The maximum duration of publish messages to sit in a queue.
pub(crate) publish_message_max_duration: Duration,
/// The maximum duration of forward messages to sit in a queue.
pub(crate) forward_message_max_duration: Duration,
// TODO: Extend for IHAVE/IWANT
}

impl From<&Config> for QueueConfig {
fn from(config: &Config) -> QueueConfig {
QueueConfig {
capacity: config.connection_handler_queue_len(),
publish_message_max_duration: config.publish_queue_duration(),
forward_message_max_duration: config.forward_queue_duration(),
}
}
}

impl RpcReceiver {
Expand All @@ -645,8 +682,8 @@ impl Stream for RpcReceiver {
) -> std::task::Poll<Option<Self::Item>> {
// The priority queue is first polled.
if let Poll::Ready(rpc) = Pin::new(&mut self.priority).poll_next(cx) {
if let Some(RpcOut::Publish(_)) = rpc {
self.len.fetch_sub(1, Ordering::Relaxed);
if let Some(RpcOut::Publish(_, _)) = rpc {
self.priority_len.fetch_sub(1, Ordering::Relaxed);
}
return Poll::Ready(rpc);
}
Expand Down

0 comments on commit 838860c

Please sign in to comment.