Skip to content

Commit

Permalink
feat(gossipsub): add messages_added_to_queue and messages_removed_fro…
Browse files Browse the repository at this point in the history
…m_queue metrics
  • Loading branch information
mxinden committed Nov 2, 2023
1 parent 823d0b2 commit a976b3b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 4 deletions.
28 changes: 26 additions & 2 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3272,7 +3272,19 @@ where
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(self.config.protocol_config()))
Ok(Handler::new(
self.config.protocol_config(),
self.metrics
.as_ref()
.expect("to be enabled")
.messages_added_to_queue
.clone(),
self.metrics
.as_ref()
.expect("to be enabled")
.messages_removed_from_queue
.clone(),
))
}

fn handle_established_outbound_connection(
Expand All @@ -3282,7 +3294,19 @@ where
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(self.config.protocol_config()))
Ok(Handler::new(
self.config.protocol_config(),
self.metrics
.as_ref()
.expect("to be enabled")
.messages_added_to_queue
.clone(),
self.metrics
.as_ref()
.expect("to be enabled")
.messages_removed_from_queue
.clone(),
))
}

fn on_connection_handler_event(
Expand Down
18 changes: 16 additions & 2 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use libp2p_swarm::handler::{
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
};
use libp2p_swarm::Stream;
use prometheus_client::metrics::counter::Counter;
use smallvec::SmallVec;
use std::{
pin::Pin,
Expand Down Expand Up @@ -120,6 +121,9 @@ pub struct EnabledHandler {
/// Keeps track of whether this connection is for a peer in the mesh. This is used to make
/// decisions about the keep alive state for this connection.
in_mesh: bool,

messages_added_to_queue: Counter,
messages_removed_from_queue: Counter,
}

pub enum DisabledHandler {
Expand Down Expand Up @@ -159,7 +163,11 @@ enum OutboundSubstreamState {

impl Handler {
/// Builds a new [`Handler`].
pub fn new(protocol_config: ProtocolConfig) -> Self {
pub fn new(
protocol_config: ProtocolConfig,
messages_added_to_queue: Counter,
messages_removed_from_queue: Counter,
) -> Self {
Handler::Enabled(EnabledHandler {
listen_protocol: protocol_config,
inbound_substream: None,
Expand All @@ -172,6 +180,8 @@ impl Handler {
peer_kind_sent: false,
last_io_activity: Instant::now(),
in_mesh: false,
messages_added_to_queue,
messages_removed_from_queue,
})
}
}
Expand Down Expand Up @@ -316,6 +326,7 @@ impl EnabledHandler {
// outbound idle state
Some(OutboundSubstreamState::WaitingOutput(substream)) => {
if let Some(message) = self.send_queue.pop() {
self.messages_removed_from_queue.inc();
self.send_queue.shrink_to_fit();
self.outbound_substream =
Some(OutboundSubstreamState::PendingSend(substream, message));
Expand Down Expand Up @@ -409,7 +420,10 @@ impl ConnectionHandler for Handler {
fn on_behaviour_event(&mut self, message: HandlerIn) {
match self {
Handler::Enabled(handler) => match message {
HandlerIn::Message(m) => handler.send_queue.push(m),
HandlerIn::Message(m) => {
handler.send_queue.push(m);
handler.messages_added_to_queue.inc();
}
HandlerIn::JoinedMesh => {
handler.in_mesh = true;
}
Expand Down
23 changes: 23 additions & 0 deletions protocols/gossipsub/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ pub(crate) struct Metrics {
/// The number of times we have decided that an IWANT control message is required for this
/// topic. A very high metric might indicate an underperforming network.
topic_iwant_msgs: Family<TopicHash, Counter>,

pub messages_added_to_queue: Counter,

Check failure on line 178 in protocols/gossipsub/src/metrics.rs

View workflow job for this annotation

GitHub Actions / clippy (nightly-2023-09-10)

unreachable `pub` field
pub messages_removed_from_queue: Counter,

Check failure on line 179 in protocols/gossipsub/src/metrics.rs

View workflow job for this annotation

GitHub Actions / clippy (nightly-2023-09-10)

unreachable `pub` field
}

impl Metrics {
Expand Down Expand Up @@ -301,6 +304,24 @@ impl Metrics {
);
metric
};

Check failure on line 306 in protocols/gossipsub/src/metrics.rs

View workflow job for this annotation

GitHub Actions / rustfmt

Diff in /home/runner/work/rust-libp2p/rust-libp2p/protocols/gossipsub/src/metrics.rs
let messages_added_to_queue = {
let metric = Counter::default();
registry.register(
"messages_added_to_queue",
"TODO",
metric.clone(),
);
metric
};
let messages_removed_from_queue = {
let metric = Counter::default();

Check failure on line 317 in protocols/gossipsub/src/metrics.rs

View workflow job for this annotation

GitHub Actions / rustfmt

Diff in /home/runner/work/rust-libp2p/rust-libp2p/protocols/gossipsub/src/metrics.rs
registry.register(
"messages_removed_from_queue",
"TODO",
metric.clone(),
);
metric
};

Self {
max_topics,
Expand All @@ -327,6 +348,8 @@ impl Metrics {
heartbeat_duration,
memcache_misses,
topic_iwant_msgs,
messages_added_to_queue,
messages_removed_from_queue,
}
}

Expand Down

0 comments on commit a976b3b

Please sign in to comment.