Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove panics for later debugging #565

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 49 additions & 44 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,15 +722,15 @@ where
Err(_) => {
self.failed_messages.entry(*peer_id).or_default().priority += 1;

tracing::warn!(peer=%peer_id, "Publish queue full. Could not publish to peer");
tracing::warn!(peer_id=%peer_id, "Publish queue full. Could not publish to peer");
// Downscore the peer due to failed message.
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
}
}
} else {
tracing::error!(peer = %peer_id,
tracing::error!(peer_id = %peer_id,
"Could not PUBLISH, peer doesn't exist in connected peer list");
}
}
Expand Down Expand Up @@ -1382,11 +1382,11 @@ where
// For each topic, if a peer has grafted us, then we necessarily must be in their mesh
// and they must be subscribed to the topic. Ensure we have recorded the mapping.
for topic in &topics {
let peer = self
.connected_peers
.get_mut(peer_id)
.expect("Should be connected to peer");
peer.topics.insert(topic.clone());
let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else {
tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
return;
};
connected_peer.topics.insert(topic.clone());
}

// we don't GRAFT to/from explicit peers; complain loudly if this happens
Expand Down Expand Up @@ -1512,12 +1512,14 @@ where
if !to_prune_topics.is_empty() {
// build the prune messages to send
let on_unsubscribe = false;
let mut sender = self
.connected_peers
.get_mut(peer_id)
.expect("Peer must be connected")
.sender
.clone();

let mut sender = match self.connected_peers.get_mut(peer_id) {
Some(connected_peer) => connected_peer.sender.clone(),
None => {
tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft and obtaining a sender");
return;
}
};

for prune in to_prune_topics
.iter()
Expand Down Expand Up @@ -2560,12 +2562,13 @@ where
// of its removal from another.

// send the control messages
let mut sender = self
.connected_peers
.get_mut(&peer_id)
.expect("Peer must be connected")
.sender
.clone();
let mut sender = match self.connected_peers.get_mut(&peer_id) {
Some(connected_peer) => connected_peer.sender.clone(),
None => {
tracing::error!(peer_id = %peer_id, "Peer non-existent when sending graft/prune");
return;
}
};

// The following prunes are not due to unsubscribing.
let prunes = to_prune
Expand Down Expand Up @@ -2908,13 +2911,14 @@ where
} else {
// remove from mesh, topic_peers, peer_topic and the fanout
tracing::debug!(peer=%peer_id, "Peer disconnected");
let peer = self
.connected_peers
.get(&peer_id)
.expect("Peer should exist in the connected_peers");

let Some(connected_peer) = self.connected_peers.get(&peer_id) else {
tracing::error!(peer_id = %peer_id, "Peer non-existent when handling disconnection");
return;
};

// remove peer from all mappings
for topic in &peer.topics {
for topic in &connected_peer.topics {
// check the mesh for the topic
if let Some(mesh_peers) = self.mesh.get_mut(topic) {
// check if the peer is in the mesh and remove it
Expand Down Expand Up @@ -2942,12 +2946,7 @@ where

// If metrics are enabled, register the disconnection of a peer based on its protocol.
if let Some(metrics) = self.metrics.as_mut() {
let peer_kind = &self
.connected_peers
.get(&peer_id)
.expect("Connected peer must be registered")
.kind;
metrics.peer_protocol_disconnected(peer_kind.clone());
metrics.peer_protocol_disconnected(connected_peer.kind.clone());
}

self.connected_peers.remove(&peer_id);
Expand Down Expand Up @@ -3302,13 +3301,15 @@ fn peer_added_to_mesh(
connections: &HashMap<PeerId, PeerConnections>,
) {
// Ensure there is an active connection
let connection_id = {
let conn = connections.get(&peer_id).expect("To be connected to peer.");
assert!(
!conn.connections.is_empty(),
"Must have at least one connection"
);
conn.connections[0]
let connection_id = match connections.get(&peer_id) {
Some(p) => p
.connections
.first()
.expect("There should be at least one connection to a peer."),
None => {
tracing::error!(peer_id=%peer_id, "Peer not existent when added to the mesh");
return;
}
};

if let Some(peer) = connections.get(&peer_id) {
Expand All @@ -3327,7 +3328,7 @@ fn peer_added_to_mesh(
events.push_back(ToSwarm::NotifyHandler {
peer_id,
event: HandlerIn::JoinedMesh,
handler: NotifyHandler::One(connection_id),
handler: NotifyHandler::One(*connection_id),
});
}

Expand All @@ -3342,12 +3343,16 @@ fn peer_removed_from_mesh(
connections: &HashMap<PeerId, PeerConnections>,
) {
// Ensure there is an active connection
let connection_id = connections
.get(&peer_id)
.expect("To be connected to peer.")
.connections
.first()
.expect("There should be at least one connection to a peer.");
let connection_id = match connections.get(&peer_id) {
Some(p) => p
.connections
.first()
.expect("There should be at least one connection to a peer."),
None => {
tracing::error!(peer_id=%peer_id, "Peer not existent when removed from mesh");
return;
}
};

if let Some(peer) = connections.get(&peer_id) {
for topic in &peer.topics {
Expand Down
Loading