diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 95781daa370..88fdcd21a13 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -254,7 +254,7 @@ pub struct Behaviour { events: VecDeque>, /// Pools non-urgent control messages between heartbeats. - control_pool: HashMap>, + control_pool: HashMap>, /// Information used for publishing messages. publish_config: PublishConfig, @@ -1036,7 +1036,7 @@ where Self::control_pool_add( &mut self.control_pool, peer_id, - ControlAction::Graft(Graft { + RpcOut::Graft(Graft { topic_hash: topic_hash.clone(), }), ); @@ -1136,7 +1136,7 @@ where tracing::debug!(%peer, "LEAVE: Sending PRUNE to peer"); let on_unsubscribe = true; let prune = self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe); - Self::control_pool_add(&mut self.control_pool, peer, ControlAction::Prune(prune)); + Self::control_pool_add(&mut self.control_pool, peer, RpcOut::Prune(prune)); // If the peer did not previously exist in any mesh, inform the handler peer_removed_from_mesh( @@ -1310,7 +1310,7 @@ where Self::control_pool_add( &mut self.control_pool, *peer_id, - ControlAction::IWant(IWant { + RpcOut::IWant(IWant { message_ids: iwant_ids_vec, }), ); @@ -1521,7 +1521,7 @@ where .iter() .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe)) { - sender.control(ControlAction::Prune(prune)); + sender.prune(prune); } // Send the prune messages to the peer tracing::debug!( @@ -2022,7 +2022,7 @@ where .expect("Peerid should exist"); for topic_hash in topics_to_graft.into_iter() { - sender.control(ControlAction::Graft(Graft { topic_hash })); + sender.graft(Graft { topic_hash }); } // Notify the application of the subscriptions @@ -2512,7 +2512,7 @@ where Self::control_pool_add( &mut self.control_pool, peer, - ControlAction::IHave(IHave { + RpcOut::IHave(IHave { topic_hash: topic_hash.clone(), message_ids: peer_message_ids, }), @@ -2548,11 +2548,6 @@ where &self.connected_peers, ); } - let grafts = topics.iter().map(|topic_hash| { - ControlAction::Graft(Graft { - topic_hash: topic_hash.clone(), - }) - }); // If there are prunes associated with the same peer add them. // NOTE: In this case a peer has been added to a topic mesh, and removed from another. @@ -2580,12 +2575,14 @@ where ) }); - for graft in grafts { - sender.control(graft); + for topic_hash in topics { + sender.graft(Graft { + topic_hash: topic_hash.clone(), + }); } for prune in prunes { - sender.control(ControlAction::Prune(prune)); + sender.prune(prune); } } @@ -2605,7 +2602,7 @@ where .expect("Peerid should exist") .clone(); - sender.control(ControlAction::Prune(prune)); + sender.prune(prune); // inform the handler peer_removed_from_mesh( @@ -2786,9 +2783,9 @@ where // adds a control action to control_pool fn control_pool_add( - control_pool: &mut HashMap>, + control_pool: &mut HashMap>, peer: PeerId, - control: ControlAction, + control: RpcOut, ) { control_pool.entry(peer).or_default().push(control); } @@ -2802,7 +2799,13 @@ where .get_mut(&peer) .expect("Peerid should exist"); - sender.control(msg); + match msg { + RpcOut::IHave(ihave) => sender.ihave(ihave), + RpcOut::IWant(iwant) => sender.iwant(iwant), + RpcOut::Graft(graft) => sender.graft(graft), + RpcOut::Prune(prune) => sender.prune(prune), + _ => unreachable!(), + } } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 383ffa171b0..7ae9deb5b92 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -549,13 +549,13 @@ fn test_join() { "Should have added 6 nodes to the mesh" ); - fn collect_grafts( - mut collected_grafts: Vec, - (_, controls): (&PeerId, &Vec), - ) -> Vec { + fn collect_grafts<'a>( + mut collected_grafts: Vec<&'a RpcOut>, + (_, controls): (&'a PeerId, &'a Vec), + ) -> Vec<&'a RpcOut> { for c in controls.iter() { - if let ControlAction::Graft(Graft { topic_hash: _ }) = c { - collected_grafts.push(c.clone()) + if let RpcOut::Graft(Graft { topic_hash: _ }) = c { + collected_grafts.push(c) } } collected_grafts @@ -1159,7 +1159,7 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() { // check that we sent an IWANT request for `unknown id` let iwant_exists = match gs.control_pool.get(&peers[7]) { Some(controls) => controls.iter().any(|c| match c { - ControlAction::IWant(IWant { message_ids }) => message_ids + RpcOut::IWant(IWant { message_ids }) => message_ids .iter() .any(|m| *m == MessageId::new(b"unknown id")), _ => false, @@ -1328,7 +1328,7 @@ fn test_handle_prune_peer_in_mesh() { fn count_control_msgs( gs: &Behaviour, queues: &HashMap, - mut filter: impl FnMut(&PeerId, &ControlAction) -> bool, + mut filter: impl FnMut(&PeerId, &RpcOut) -> bool, ) -> usize { gs.control_pool .iter() @@ -1338,13 +1338,13 @@ fn count_control_msgs( .iter() .fold(0, |mut collected_messages, (peer_id, c)| { while !c.priority.is_empty() || !c.non_priority.is_empty() { - if let Ok(RpcOut::Control(action)) = c.priority.try_recv() { - if filter(peer_id, &action) { + if let Ok(rpc) = c.priority.try_recv() { + if filter(peer_id, &rpc) { collected_messages += 1; } } - if let Ok(RpcOut::Control(action)) = c.non_priority.try_recv() { - if filter(peer_id, &action) { + if let Ok(rpc) = c.non_priority.try_recv() { + if filter(peer_id, &rpc) { collected_messages += 1; } } @@ -1355,11 +1355,11 @@ fn count_control_msgs( fn flush_events( gs: &mut Behaviour, - receiver_queues: &mut HashMap, + receiver_queues: &HashMap, ) { gs.control_pool.clear(); gs.events.clear(); - for c in receiver_queues.values_mut() { + for c in receiver_queues.values() { while !c.priority.is_empty() || !c.non_priority.is_empty() { let _ = c.priority.try_recv(); let _ = c.non_priority.try_recv(); @@ -1473,7 +1473,7 @@ fn test_handle_graft_explicit_peer() { assert!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == peer && match m { - ControlAction::Prune(Prune { topic_hash, .. }) => + RpcOut::Prune(Prune { topic_hash, .. }) => topic_hash == &topic_hashes[0] || topic_hash == &topic_hashes[1], _ => false, }) @@ -1501,7 +1501,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that graft gets created to non-explicit peer assert!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] - && matches!(m, ControlAction::Graft { .. })) + && matches!(m, RpcOut::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" ); @@ -1509,7 +1509,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that no graft gets created to explicit peer assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] - && matches!(m, ControlAction::Graft { .. })), + && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1533,7 +1533,7 @@ fn do_not_graft_explicit_peer() { //assert that no graft gets created to explicit peer assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &others[0] - && matches!(m, ControlAction::Graft { .. })), + && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1607,7 +1607,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] - && matches!(m, ControlAction::Graft { .. })) + && matches!(m, RpcOut::Graft { .. })) > 0, "No graft message got created to non-explicit peer" ); @@ -1615,7 +1615,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] - && matches!(m, ControlAction::Graft { .. })), + && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1656,7 +1656,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] - && matches!(m, ControlAction::Graft { .. })) + && matches!(m, RpcOut::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" ); @@ -1664,7 +1664,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] - && matches!(m, ControlAction::Graft { .. })), + && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1706,7 +1706,7 @@ fn no_gossip_gets_sent_to_explicit_peers() { .get(&peers[0]) .unwrap_or(&Vec::new()) .iter() - .filter(|m| matches!(m, ControlAction::IHave { .. })) + .filter(|m| matches!(m, RpcOut::IHave { .. })) .count(), 0, "Gossip got emitted to explicit peer" @@ -1854,7 +1854,7 @@ fn test_send_px_and_backoff_in_prune() { assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { - ControlAction::Prune(Prune { + RpcOut::Prune(Prune { topic_hash, peers, backoff, @@ -1902,7 +1902,7 @@ fn test_prune_backoffed_peer_on_graft() { assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { - ControlAction::Prune(Prune { + RpcOut::Prune(Prune { topic_hash, peers, backoff, @@ -1950,10 +1950,7 @@ fn test_do_not_graft_within_backoff_period() { //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )), + count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -1964,10 +1961,7 @@ fn test_do_not_graft_within_backoff_period() { //check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )) > 0, + count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2005,10 +1999,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )), + count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -2019,10 +2010,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )) > 0, + count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2052,7 +2040,7 @@ fn test_unsubscribe_backoff() { assert_eq!( count_control_msgs(&gs, &queues, |_, m| match m { - ControlAction::Prune(Prune { backoff, .. }) => backoff == &Some(1), + RpcOut::Prune(Prune { backoff, .. }) => backoff == &Some(1), _ => false, }), 1, @@ -2076,10 +2064,7 @@ fn test_unsubscribe_backoff() { // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )), + count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -2090,10 +2075,7 @@ fn test_unsubscribe_backoff() { // check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )) > 0, + count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2187,7 +2169,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( count_control_msgs(&gs, &queues, |_, action| match action { - ControlAction::IHave(IHave { + RpcOut::IHave(IHave { topic_hash, message_ids, }) => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), @@ -2231,7 +2213,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( count_control_msgs(&gs, &queues, |_, action| match action { - ControlAction::IHave(IHave { + RpcOut::IHave(IHave { topic_hash, message_ids, }) => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), @@ -2391,7 +2373,7 @@ fn test_prune_negative_scored_peers() { assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { - ControlAction::Prune(Prune { + RpcOut::Prune(Prune { topic_hash, peers, backoff, @@ -2525,7 +2507,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] && match m { - ControlAction::Prune(Prune { + RpcOut::Prune(Prune { topic_hash, peers: px, .. @@ -2597,7 +2579,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( count_control_msgs(&gs, &queues, |peer, action| match action { - ControlAction::IHave(IHave { + RpcOut::IHave(IHave { topic_hash, message_ids, }) => { @@ -2761,7 +2743,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { // check that we sent exactly one IWANT request to p2 assert_eq!( count_control_msgs(&gs, &queues, |peer, c| match c { - ControlAction::IWant(IWant { message_ids }) => + RpcOut::IWant(IWant { message_ids }) => if message_ids.iter().any(|m| m == &msg_id) { assert_eq!(peer, &p2); true @@ -4354,10 +4336,7 @@ fn test_ignore_graft_from_unknown_topic() { //assert that no prune got created assert_eq!( - count_control_msgs(&gs, &queues, |_, a| matches!( - a, - ControlAction::Prune { .. } - )), + count_control_msgs(&gs, &queues, |_, a| matches!(a, RpcOut::Prune { .. })), 0, "we should not prune after graft in unknown topic" ); @@ -4457,7 +4436,7 @@ fn test_ignore_too_many_ihaves() { //we send iwant only for the first 10 messages assert_eq!( count_control_msgs(&gs, &queues, |p, action| p == &peer - && matches!(action, ControlAction::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), + && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), 10, "exactly the first ten ihaves should be processed and one iwant for each created" ); @@ -4480,7 +4459,7 @@ fn test_ignore_too_many_ihaves() { //we sent iwant for all 20 messages assert_eq!( count_control_msgs(&gs, &queues, |p, action| p == &peer - && matches!(action, ControlAction::IWant(IWant { message_ids }) if message_ids.len() == 1)), + && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1)), 20, "all 20 should get sent" ); @@ -4530,7 +4509,7 @@ fn test_ignore_too_many_messages_in_ihave() { let mut sum = 0; assert_eq!( count_control_msgs(&gs, &queues, |p, action| match action { - ControlAction::IWant(IWant { message_ids }) => + RpcOut::IWant(IWant { message_ids }) => p == &peer && { assert!(first_twelve.is_superset(&message_ids.iter().collect())); sum += message_ids.len(); @@ -4555,7 +4534,7 @@ fn test_ignore_too_many_messages_in_ihave() { let mut sum = 0; assert_eq!( count_control_msgs(&gs, &queues, |p, action| match action { - ControlAction::IWant(IWant { message_ids }) => + RpcOut::IWant(IWant { message_ids }) => p == &peer && { sum += message_ids.len(); true @@ -4609,7 +4588,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { assert_eq!( count_control_msgs(&gs, &queues, |p, action| match action { - ControlAction::IHave(IHave { message_ids, .. }) => { + RpcOut::IHave(IHave { message_ids, .. }) => { if p == &p1 { ihaves1 = message_ids.iter().cloned().collect(); true @@ -4944,7 +4923,7 @@ fn test_dont_send_px_to_old_gossipsub_peers() { //check that prune does not contain px assert_eq!( count_control_msgs(&gs, &queues, |_, m| match m { - ControlAction::Prune(Prune { peers: px, .. }) => !px.is_empty(), + RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }), 0, @@ -4982,7 +4961,7 @@ fn test_dont_send_floodsub_peers_in_px() { //check that px in prune message is empty assert_eq!( count_control_msgs(&gs, &queues, |_, m| match m { - ControlAction::Prune(Prune { peers: px, .. }) => !px.is_empty(), + RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }), 0, diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index d266daae8a5..edf619cab5e 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -270,8 +270,14 @@ pub enum RpcOut { Subscribe(TopicHash), /// Unsubscribe a topic. Unsubscribe(TopicHash), - /// List of Gossipsub control messages. - Control(ControlAction), + /// Send a GRAFT control message. + Graft(Graft), + /// Send a PRUNE control message. + Prune(Prune), + /// Send a IHave control message. + IHave(IHave), + /// Send a IWant control message. + IWant(IWant), } impl RpcOut { @@ -318,10 +324,10 @@ impl From for proto::RPC { }], control: None, }, - RpcOut::Control(ControlAction::IHave(IHave { + RpcOut::IHave(IHave { topic_hash, message_ids, - })) => proto::RPC { + }) => proto::RPC { publish: Vec::new(), subscriptions: Vec::new(), control: Some(proto::ControlMessage { @@ -334,7 +340,7 @@ impl From for proto::RPC { prune: vec![], }), }, - RpcOut::Control(ControlAction::IWant(IWant { message_ids })) => proto::RPC { + RpcOut::IWant(IWant { message_ids }) => proto::RPC { publish: Vec::new(), subscriptions: Vec::new(), control: Some(proto::ControlMessage { @@ -346,7 +352,7 @@ impl From for proto::RPC { prune: vec![], }), }, - RpcOut::Control(ControlAction::Graft(Graft { topic_hash })) => proto::RPC { + RpcOut::Graft(Graft { topic_hash }) => proto::RPC { publish: Vec::new(), subscriptions: vec![], control: Some(proto::ControlMessage { @@ -358,11 +364,11 @@ impl From for proto::RPC { prune: vec![], }), }, - RpcOut::Control(ControlAction::Prune(Prune { + RpcOut::Prune(Prune { topic_hash, peers, backoff, - })) => { + }) => { proto::RPC { publish: Vec::new(), subscriptions: vec![], @@ -582,14 +588,48 @@ impl RpcSender { self.receiver.clone() } - /// Send a `RpcOut::Control` message to the `RpcReceiver` + /// Send a `RpcOut::Graft` message to the `RpcReceiver` /// this is high priority. - pub(crate) fn control(&mut self, control: ControlAction) { + pub(crate) fn graft(&mut self, graft: Graft) { self.priority - .try_send(RpcOut::Control(control)) + .try_send(RpcOut::Graft(graft)) .expect("Channel is unbounded and should always be open"); } + /// Send a `RpcOut::Prune` message to the `RpcReceiver` + /// this is high priority. + pub(crate) fn prune(&mut self, prune: Prune) { + self.priority + .try_send(RpcOut::Prune(prune)) + .expect("Channel is unbounded and should always be open"); + } + + /// Send a `RpcOut::IHave` message to the `RpcReceiver` + /// this is low priority and if queue is full the message is dropped. + pub(crate) fn ihave(&mut self, ihave: IHave) { + if let Err(err) = self.non_priority.try_send(RpcOut::IHave(ihave)) { + let rpc = err.into_inner(); + tracing::trace!( + "IHAVE message {:?} to peer {} dropped, queue is full", + rpc, + self.peer_id + ); + } + } + + /// Send a `RpcOut::IHave` message to the `RpcReceiver` + /// this is low priority and if queue is full the message is dropped. + pub(crate) fn iwant(&mut self, iwant: IWant) { + if let Err(err) = self.non_priority.try_send(RpcOut::IWant(iwant)) { + let rpc = err.into_inner(); + tracing::trace!( + "IWANT message {:?} to peer {} dropped, queue is full", + rpc, + self.peer_id + ); + } + } + /// Send a `RpcOut::Subscribe` message to the `RpcReceiver` /// this is high priority. pub(crate) fn subscribe(&mut self, topic: TopicHash) {