Skip to content

Commit

Permalink
split ControlAction variants into its own structs
Browse files Browse the repository at this point in the history
which will help for the goal of prioritizing GRAFT and PRUNE messages over IWANT/IHAVE.
  • Loading branch information
jxs committed Dec 4, 2023
1 parent 73ae996 commit 327dafd
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 135 deletions.
76 changes: 42 additions & 34 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ use libp2p_swarm::{
THandlerOutEvent, ToSwarm,
};

use crate::gossip_promises::GossipPromises;
use crate::handler::{Handler, HandlerEvent, HandlerIn};
use crate::mcache::MessageCache;
use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty};
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
Expand All @@ -65,6 +61,16 @@ use crate::{
config::{Config, ValidationMode},
types::RpcOut,
};
use crate::{gossip_promises::GossipPromises, types::Graft};
use crate::{
handler::{Handler, HandlerEvent, HandlerIn},
types::Prune,
};
use crate::{mcache::MessageCache, types::IWant};
use crate::{
metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
types::IHave,
};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use instant::SystemTime;
Expand Down Expand Up @@ -1030,9 +1036,9 @@ where
Self::control_pool_add(
&mut self.control_pool,
peer_id,
ControlAction::Graft {
ControlAction::Graft(Graft {
topic_hash: topic_hash.clone(),
},
}),
);

// If the peer did not previously exist in any mesh, inform the handler
Expand Down Expand Up @@ -1061,7 +1067,7 @@ where
peer: &PeerId,
do_px: bool,
on_unsubscribe: bool,
) -> ControlAction {
) -> Prune {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.prune(peer, topic_hash.clone());
}
Expand All @@ -1072,7 +1078,7 @@ where
}
Some(PeerKind::Gossipsub) => {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return ControlAction::Prune {
return Prune {
topic_hash: topic_hash.clone(),
peers: Vec::new(),
backoff: None,
Expand Down Expand Up @@ -1109,7 +1115,7 @@ where
// update backoff
self.backoffs.update_backoff(topic_hash, peer, backoff);

ControlAction::Prune {
Prune {
topic_hash: topic_hash.clone(),
peers,
backoff: Some(backoff.as_secs()),
Expand All @@ -1129,9 +1135,8 @@ where
// Send a PRUNE control message
tracing::debug!(%peer, "LEAVE: Sending PRUNE to peer");
let on_unsubscribe = true;
let control =
self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe);
Self::control_pool_add(&mut self.control_pool, peer, control);
let prune = self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe);
Self::control_pool_add(&mut self.control_pool, peer, ControlAction::Prune(prune));

// If the peer did not previously exist in any mesh, inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -1305,9 +1310,9 @@ where
Self::control_pool_add(
&mut self.control_pool,
*peer_id,
ControlAction::IWant {
ControlAction::IWant(IWant {
message_ids: iwant_ids_vec,
},
}),
);
}
tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
Expand Down Expand Up @@ -1512,11 +1517,11 @@ where
.expect("Peerid should exist")
.clone();

for action in to_prune_topics
for prune in to_prune_topics
.iter()
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
{
sender.control(action);
sender.control(ControlAction::Prune(prune));
}
// Send the prune messages to the peer
tracing::debug!(
Expand Down Expand Up @@ -2016,11 +2021,8 @@ where
.get_mut(propagation_source)
.expect("Peerid should exist");

for action in topics_to_graft
.into_iter()
.map(|topic_hash| ControlAction::Graft { topic_hash })
{
sender.control(action);
for topic_hash in topics_to_graft.into_iter() {
sender.control(ControlAction::Graft(Graft { topic_hash }));
}

// Notify the application of the subscriptions
Expand Down Expand Up @@ -2510,10 +2512,10 @@ where
Self::control_pool_add(
&mut self.control_pool,
peer,
ControlAction::IHave {
ControlAction::IHave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
},
}),
);
}
}
Expand Down Expand Up @@ -2546,8 +2548,10 @@ where
&self.connected_peers,
);
}
let control_msgs = topics.iter().map(|topic_hash| ControlAction::Graft {
topic_hash: topic_hash.clone(),
let grafts = topics.iter().map(|topic_hash| {
ControlAction::Graft(Graft {
topic_hash: topic_hash.clone(),
})
});

// If there are prunes associated with the same peer add them.
Expand Down Expand Up @@ -2576,8 +2580,12 @@ where
)
});

for msg in control_msgs.chain(prunes) {
sender.control(msg);
for graft in grafts {
sender.control(graft);
}

for prune in prunes {
sender.control(ControlAction::Prune(prune));
}
}

Expand All @@ -2597,7 +2605,7 @@ where
.expect("Peerid should exist")
.clone();

sender.control(prune);
sender.control(ControlAction::Prune(prune));

// inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -3205,21 +3213,21 @@ where
let mut prune_msgs = vec![];
for control_msg in rpc.control_msgs {
match control_msg {
ControlAction::IHave {
ControlAction::IHave(IHave {
topic_hash,
message_ids,
} => {
}) => {
ihave_msgs.push((topic_hash, message_ids));
}
ControlAction::IWant { message_ids } => {
ControlAction::IWant(IWant { message_ids }) => {
self.handle_iwant(&propagation_source, message_ids)
}
ControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash),
ControlAction::Prune {
ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
ControlAction::Prune(Prune {
topic_hash,
peers,
backoff,
} => prune_msgs.push((topic_hash, peers, backoff)),
}) => prune_msgs.push((topic_hash, peers, backoff)),
}
}
if !ihave_msgs.is_empty() {
Expand Down
Loading

0 comments on commit 327dafd

Please sign in to comment.