Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gossipsub): add messages_added and messages_removed metrics #4793

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3272,7 +3272,19 @@ where
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(self.config.protocol_config()))
Ok(Handler::new(
self.config.protocol_config(),
self.metrics
.as_ref()
.expect("to be enabled")
.messages_added_to_queue
.clone(),
self.metrics
.as_ref()
.expect("to be enabled")
.messages_removed_from_queue
.clone(),
))
}

fn handle_established_outbound_connection(
Expand All @@ -3282,7 +3294,19 @@ where
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(self.config.protocol_config()))
Ok(Handler::new(
self.config.protocol_config(),
self.metrics
.as_ref()
.expect("to be enabled")
.messages_added_to_queue
.clone(),
self.metrics
.as_ref()
.expect("to be enabled")
.messages_removed_from_queue
.clone(),
))
}

fn on_connection_handler_event(
Expand Down
18 changes: 16 additions & 2 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use libp2p_swarm::handler::{
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
};
use libp2p_swarm::Stream;
use prometheus_client::metrics::counter::Counter;
use smallvec::SmallVec;
use std::{
pin::Pin,
Expand Down Expand Up @@ -120,6 +121,9 @@ pub struct EnabledHandler {
/// Keeps track of whether this connection is for a peer in the mesh. This is used to make
/// decisions about the keep alive state for this connection.
in_mesh: bool,

messages_added_to_queue: Counter,
messages_removed_from_queue: Counter,
}

pub enum DisabledHandler {
Expand Down Expand Up @@ -159,7 +163,11 @@ enum OutboundSubstreamState {

impl Handler {
/// Builds a new [`Handler`].
pub fn new(protocol_config: ProtocolConfig) -> Self {
pub fn new(
protocol_config: ProtocolConfig,
messages_added_to_queue: Counter,
messages_removed_from_queue: Counter,
) -> Self {
Handler::Enabled(EnabledHandler {
listen_protocol: protocol_config,
inbound_substream: None,
Expand All @@ -172,6 +180,8 @@ impl Handler {
peer_kind_sent: false,
last_io_activity: Instant::now(),
in_mesh: false,
messages_added_to_queue,
messages_removed_from_queue,
})
}
}
Expand Down Expand Up @@ -316,6 +326,7 @@ impl EnabledHandler {
// outbound idle state
Some(OutboundSubstreamState::WaitingOutput(substream)) => {
if let Some(message) = self.send_queue.pop() {
self.messages_removed_from_queue.inc();
self.send_queue.shrink_to_fit();
self.outbound_substream =
Some(OutboundSubstreamState::PendingSend(substream, message));
Expand Down Expand Up @@ -409,7 +420,10 @@ impl ConnectionHandler for Handler {
fn on_behaviour_event(&mut self, message: HandlerIn) {
match self {
Handler::Enabled(handler) => match message {
HandlerIn::Message(m) => handler.send_queue.push(m),
HandlerIn::Message(m) => {
handler.send_queue.push(m);
handler.messages_added_to_queue.inc();
}
HandlerIn::JoinedMesh => {
handler.in_mesh = true;
}
Expand Down
23 changes: 23 additions & 0 deletions protocols/gossipsub/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@
/// The number of times we have decided that an IWANT control message is required for this
/// topic. A very high metric might indicate an underperforming network.
topic_iwant_msgs: Family<TopicHash, Counter>,

pub messages_added_to_queue: Counter,

Check failure on line 178 in protocols/gossipsub/src/metrics.rs

View workflow job for this annotation

GitHub Actions / clippy (nightly-2023-09-10)

unreachable `pub` field
pub messages_removed_from_queue: Counter,

Check failure on line 179 in protocols/gossipsub/src/metrics.rs

View workflow job for this annotation

GitHub Actions / clippy (nightly-2023-09-10)

unreachable `pub` field
}

impl Metrics {
Expand Down Expand Up @@ -300,7 +303,25 @@
metric.clone(),
);
metric
};

Check failure on line 306 in protocols/gossipsub/src/metrics.rs

View workflow job for this annotation

GitHub Actions / rustfmt

Diff in /home/runner/work/rust-libp2p/rust-libp2p/protocols/gossipsub/src/metrics.rs
let messages_added_to_queue = {
let metric = Counter::default();
registry.register(
"messages_added_to_queue",
"TODO",
metric.clone(),
);
metric
};
let messages_removed_from_queue = {
let metric = Counter::default();

Check failure on line 317 in protocols/gossipsub/src/metrics.rs

View workflow job for this annotation

GitHub Actions / rustfmt

Diff in /home/runner/work/rust-libp2p/rust-libp2p/protocols/gossipsub/src/metrics.rs
registry.register(
"messages_removed_from_queue",
"TODO",
metric.clone(),
);
metric
};

Self {
max_topics,
Expand All @@ -327,6 +348,8 @@
heartbeat_duration,
memcache_misses,
topic_iwant_msgs,
messages_added_to_queue,
messages_removed_from_queue,
}
}

Expand Down
Loading