Skip to content

Commit

Permalink
split control messages for different priorities
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Dec 5, 2023
1 parent 327dafd commit 55600c8
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 90 deletions.
39 changes: 21 additions & 18 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
events: VecDeque<ToSwarm<Event, HandlerIn>>,

/// Pools non-urgent control messages between heartbeats.
control_pool: HashMap<PeerId, Vec<ControlAction>>,
control_pool: HashMap<PeerId, Vec<RpcOut>>,

/// Information used for publishing messages.
publish_config: PublishConfig,
Expand Down Expand Up @@ -1136,7 +1136,7 @@ where
tracing::debug!(%peer, "LEAVE: Sending PRUNE to peer");
let on_unsubscribe = true;
let prune = self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe);
Self::control_pool_add(&mut self.control_pool, peer, ControlAction::Prune(prune));
Self::control_pool_add(&mut self.control_pool, peer, RpcOut::Prune(prune));

// If the peer did not previously exist in any mesh, inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -1310,7 +1310,7 @@ where
Self::control_pool_add(
&mut self.control_pool,
*peer_id,
ControlAction::IWant(IWant {
RpcOut::IWant(IWant {
message_ids: iwant_ids_vec,
}),
);
Expand Down Expand Up @@ -1521,7 +1521,7 @@ where
.iter()
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
{
sender.control(ControlAction::Prune(prune));
sender.prune(prune);
}
// Send the prune messages to the peer
tracing::debug!(
Expand Down Expand Up @@ -2022,7 +2022,7 @@ where
.expect("Peerid should exist");

for topic_hash in topics_to_graft.into_iter() {
sender.control(ControlAction::Graft(Graft { topic_hash }));
sender.graft(Graft { topic_hash });
}

// Notify the application of the subscriptions
Expand Down Expand Up @@ -2512,7 +2512,7 @@ where
Self::control_pool_add(
&mut self.control_pool,
peer,
ControlAction::IHave(IHave {
RpcOut::IHave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
}),
Expand Down Expand Up @@ -2548,11 +2548,6 @@ where
&self.connected_peers,
);
}
let grafts = topics.iter().map(|topic_hash| {
ControlAction::Graft(Graft {
topic_hash: topic_hash.clone(),
})
});

// If there are prunes associated with the same peer add them.
// NOTE: In this case a peer has been added to a topic mesh, and removed from another.
Expand Down Expand Up @@ -2580,12 +2575,14 @@ where
)
});

for graft in grafts {
sender.control(graft);
for topic_hash in topics {
sender.graft(Graft {
topic_hash: topic_hash.clone(),
});
}

for prune in prunes {
sender.control(ControlAction::Prune(prune));
sender.prune(prune);
}
}

Expand All @@ -2605,7 +2602,7 @@ where
.expect("Peerid should exist")
.clone();

sender.control(ControlAction::Prune(prune));
sender.prune(prune);

// inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -2786,9 +2783,9 @@ where

// adds a control action to control_pool
fn control_pool_add(
control_pool: &mut HashMap<PeerId, Vec<ControlAction>>,
control_pool: &mut HashMap<PeerId, Vec<RpcOut>>,
peer: PeerId,
control: ControlAction,
control: RpcOut,
) {
control_pool.entry(peer).or_default().push(control);
}
Expand All @@ -2802,7 +2799,13 @@ where
.get_mut(&peer)
.expect("Peerid should exist");

sender.control(msg);
match msg {
RpcOut::IHave(ihave) => sender.ihave(ihave),
RpcOut::IWant(iwant) => sender.iwant(iwant),
RpcOut::Graft(graft) => sender.graft(graft),
RpcOut::Prune(prune) => sender.prune(prune),
_ => unreachable!(),
}
}
}

Expand Down
Loading

0 comments on commit 55600c8

Please sign in to comment.