From 123eaa833eb6716d470abdbd3bbc681328cd20e2 Mon Sep 17 00:00:00 2001 From: Bastien Faivre Date: Mon, 13 Jan 2025 22:58:29 +0100 Subject: [PATCH] feat: added basic metrics --- .gitignore | 2 + Cargo.lock | 43 ++++++++++++++++ Cargo.toml | 4 +- dog/Cargo.toml | 1 + dog/src/behaviour.rs | 33 +++++++++++- dog/src/lib.rs | 1 + dog/src/metrics.rs | 86 ++++++++++++++++++++++++++++++++ dog/src/types.rs | 9 ++++ dog/tests/src/lib.rs | 51 ++++++++++--------- examples/simple/Cargo.toml | 1 + examples/simple/src/behaviour.rs | 6 ++- examples/simple/src/main.rs | 30 +++++++++-- examples/simple/src/swarm.rs | 5 +- 13 files changed, 237 insertions(+), 35 deletions(-) create mode 100644 dog/src/metrics.rs diff --git a/.gitignore b/.gitignore index eb5a316..f0bd838 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ +benchmark/config/* +!benchmark/config/README.md target diff --git a/Cargo.lock b/Cargo.lock index 751d636..06c5373 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -625,6 +625,12 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "dtoa" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653" + [[package]] name = "ed25519" version = "2.2.3" @@ -1447,6 +1453,7 @@ dependencies = [ "hex_fmt", "libp2p", "lru", + "prometheus-client", "quick-protobuf", "quick-protobuf-codec", "rand", @@ -1456,6 +1463,18 @@ dependencies = [ "void", ] +[[package]] +name = "libp2p-dog-benchmark" +version = "0.1.0" +dependencies = [ + "clap", + "libp2p", + "libp2p-dog", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "libp2p-dog-example" version = "0.1.0" @@ -1463,6 +1482,7 @@ dependencies = [ "clap", "libp2p", "libp2p-dog", + "prometheus-client", "tokio", "tracing", "tracing-subscriber", @@ -2161,6 +2181,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus-client" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92f2b0cae6fd19ec4f2b6ded3f39ffcce7a8c8490b34aa406c27e2c855bdc97d" +dependencies = [ + "dtoa", + "itoa", + "parking_lot", + "prometheus-client-derive-encode", +] + +[[package]] +name = "prometheus-client-derive-encode" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "quick-error" version = "1.2.3" diff --git a/Cargo.toml b/Cargo.toml index 0bc81b8..23d6537 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,8 @@ [workspace] members = [ - "examples/simple", + "benchmark/code", "dog", + "examples/simple", # Tests "dog/tests", @@ -24,6 +25,7 @@ futures-timer = "3.0.3" hex_fmt = "0.3.0" libp2p = "0.54.1" lru = "0.12.5" +prometheus-client = "0.23.0" quick-protobuf = "0.8.1" quick-protobuf-codec = "0.3.1" rand = "0.8" diff --git a/dog/Cargo.toml b/dog/Cargo.toml index 304de76..3a76830 100644 --- a/dog/Cargo.toml +++ b/dog/Cargo.toml @@ -17,6 +17,7 @@ futures-timer = { workspace = true } hex_fmt = { workspace = true } libp2p = { workspace = true } lru = { workspace = true } +prometheus-client = { workspace = true } quick-protobuf = { workspace = true } quick-protobuf-codec = { workspace = true } rand = { workspace = true } diff --git a/dog/src/behaviour.rs b/dog/src/behaviour.rs index 5d205fa..8c3c105 100644 --- a/dog/src/behaviour.rs +++ b/dog/src/behaviour.rs @@ -15,6 +15,7 @@ use libp2p::{ PeerId, }; use lru::LruCache; +use prometheus_client::registry::Registry; use quick_protobuf::{MessageWrite, Writer}; use crate::{ @@ -22,6 +23,7 @@ use crate::{ dog::{Controller, Route, Router}, error::PublishError, handler::{Handler, HandlerEvent, HandlerIn}, + metrics::Metrics, protocol::SIGNING_PREFIX, rpc::Sender, rpc_proto::proto, @@ -158,6 +160,7 @@ pub struct Behaviour { redundancy_controller: Controller, router: Router, cache: LruCache, + metrics: Option, } impl Behaviour @@ -168,7 +171,15 @@ where authenticity: TransactionAuthenticity, config: Config, ) -> Result { - Self::new_with_transform(authenticity, config, D::default()) + Self::new_with_transform(authenticity, config, None, D::default()) + } + + pub fn new_with_metrics( + authenticity: TransactionAuthenticity, + config: Config, + metrics: &mut Registry, + ) -> Result { + Self::new_with_transform(authenticity, config, Some(metrics), D::default()) } } @@ -179,6 +190,7 @@ where pub fn new_with_transform( authenticity: TransactionAuthenticity, config: Config, + metrics: Option<&mut Registry>, data_transform: D, ) -> Result { // TODO: validate config @@ -193,6 +205,7 @@ where router: Router::new(), cache: LruCache::new(NonZeroUsize::new(config.cache_size()).unwrap()), config, + metrics: metrics.map(Metrics::new), }) } } @@ -267,6 +280,10 @@ where tracing::debug!(transaction=%tx_id, "Published transaction"); + if let Some(m) = self.metrics.as_mut() { + m.register_published_tx(); + } + Ok(tx_id) } @@ -326,6 +343,12 @@ where /// Returns `true` if the sending was successful, `false` otherwise. fn send_transaction(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool { + if let Some(m) = self.metrics.as_mut() { + if let RpcOut::Publish { ref tx, .. } | RpcOut::Forward { ref tx, .. } = rpc { + m.tx_sent(tx.raw_protobuf_len()); + } + } + let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else { tracing::error!(peer=%peer_id, "Could not send rpc to connection handler, peer doesn't exist in connected peers list"); return false; @@ -419,6 +442,10 @@ where raw_transaction: RawTransaction, propagation_source: &PeerId, ) { + if let Some(m) = self.metrics.as_mut() { + m.tx_recv_unfiltered(raw_transaction.raw_protobuf_len()); + } + let transaction = match self .data_transform .inbound_transform(raw_transaction.clone()) @@ -460,6 +487,10 @@ where } self.redundancy_controller.incr_first_time_txs_count(); + if let Some(m) = self.metrics.as_mut() { + m.tx_recv(); + } + tracing::debug!("Deliver received transaction to user"); self.events .push_back(ToSwarm::GenerateEvent(Event::Transaction { diff --git a/dog/src/lib.rs b/dog/src/lib.rs index 2a442ce..440584a 100644 --- a/dog/src/lib.rs +++ b/dog/src/lib.rs @@ -3,6 +3,7 @@ mod config; mod dog; mod error; mod handler; +mod metrics; pub mod protocol; mod rpc; mod rpc_proto; diff --git a/dog/src/metrics.rs b/dog/src/metrics.rs new file mode 100644 index 0000000..cac99d2 --- /dev/null +++ b/dog/src/metrics.rs @@ -0,0 +1,86 @@ +use prometheus_client::{metrics::counter::Counter, registry::Registry}; + +pub(crate) struct Metrics { + /// Number of transactions sent. + txs_sent_counts: Counter, + /// Number of bytes sent. + txs_sent_bytes: Counter, + /// Number of published transactions. + txs_sent_published: Counter, + + /// Number of transactions received (without filtering duplicates). + txs_recv_counts_unfiltered: Counter, + /// Number of transactions received (after filtering duplicates). + txs_recv_counts: Counter, + /// Number of bytes received. + txs_recv_bytes: Counter, +} + +impl Metrics { + pub(crate) fn new(registry: &mut Registry) -> Self { + let txs_sent_counts = Counter::default(); + let txs_sent_bytes = Counter::default(); + let txs_sent_published = Counter::default(); + let txs_recv_counts_unfiltered = Counter::default(); + let txs_recv_counts = Counter::default(); + let txs_recv_bytes = Counter::default(); + + registry.register( + "txs_sent_counts", + "Number of transactions sent.", + txs_sent_counts.clone(), + ); + registry.register( + "txs_sent_bytes", + "Number of bytes sent.", + txs_sent_bytes.clone(), + ); + registry.register( + "txs_sent_published", + "Number of published transactions.", + txs_sent_published.clone(), + ); + registry.register( + "txs_recv_counts_unfiltered", + "Number of transactions received (without filtering duplicates).", + txs_recv_counts_unfiltered.clone(), + ); + registry.register( + "txs_recv_counts", + "Number of transactions received (after filtering duplicates).", + txs_recv_counts.clone(), + ); + registry.register( + "txs_recv_bytes", + "Number of bytes received.", + txs_recv_bytes.clone(), + ); + + Self { + txs_sent_counts, + txs_sent_bytes, + txs_sent_published, + txs_recv_counts_unfiltered, + txs_recv_counts, + txs_recv_bytes, + } + } + + pub(crate) fn tx_sent(&mut self, bytes: usize) { + self.txs_sent_counts.inc(); + self.txs_sent_bytes.inc_by(bytes as u64); + } + + pub(crate) fn register_published_tx(&mut self) { + self.txs_sent_published.inc(); + } + + pub(crate) fn tx_recv_unfiltered(&mut self, bytes: usize) { + self.txs_recv_counts_unfiltered.inc(); + self.txs_recv_bytes.inc_by(bytes as u64); + } + + pub(crate) fn tx_recv(&mut self) { + self.txs_recv_counts.inc(); + } +} diff --git a/dog/src/types.rs b/dog/src/types.rs index 23311be..56d0f6b 100644 --- a/dog/src/types.rs +++ b/dog/src/types.rs @@ -1,5 +1,6 @@ use futures_timer::Delay; use libp2p::{identity::ParseError, swarm::ConnectionId, PeerId}; +use quick_protobuf::MessageWrite; use crate::{rpc::Sender, rpc_proto::proto}; @@ -57,6 +58,14 @@ pub struct RawTransaction { pub key: Option>, } +impl RawTransaction { + pub fn raw_protobuf_len(&self) -> usize { + let transaction: proto::Transaction = self.clone().into(); + + transaction.get_size() + } +} + impl From for proto::Transaction { fn from(tx: RawTransaction) -> Self { proto::Transaction { diff --git a/dog/tests/src/lib.rs b/dog/tests/src/lib.rs index 373fd51..fd69e2f 100644 --- a/dog/tests/src/lib.rs +++ b/dog/tests/src/lib.rs @@ -4,7 +4,7 @@ use libp2p::{ futures::StreamExt, identity::Keypair, swarm::dial_opts::DialOpts, Multiaddr, PeerId, SwarmBuilder, }; -use libp2p_dog::{IdentityTransform, Route}; +use libp2p_dog::Route; use rand::Rng; use tokio::{sync::mpsc, task::JoinHandle, time::sleep}; @@ -200,30 +200,33 @@ impl TestNode { self.tx_publish = Some(tx_publish); tokio::spawn(async move { - let mut swarm = SwarmBuilder::with_existing_identity(keypair) - .with_tokio() - .with_tcp( - libp2p::tcp::Config::new().nodelay(true), // Disable Nagle's algorithm - libp2p::noise::Config::new, - libp2p::yamux::Config::default, - ) - .unwrap() - .with_behaviour(|key| { - libp2p_dog::Behaviour::::new( - if signed_transactions { - libp2p_dog::TransactionAuthenticity::Signed(key.clone()) - } else { - libp2p_dog::TransactionAuthenticity::Author(key.public().to_peer_id()) - }, - config, + let mut swarm: libp2p::Swarm = + SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_tcp( + libp2p::tcp::Config::new().nodelay(true), // Disable Nagle's algorithm + libp2p::noise::Config::new, + libp2p::yamux::Config::default, ) - .expect("Failed to create dog behaviour") - }) - .unwrap() - .with_swarm_config(|cfg| { - cfg.with_idle_connection_timeout(std::time::Duration::from_secs(u64::MAX)) - }) - .build(); + .unwrap() + .with_behaviour(|key| { + libp2p_dog::Behaviour::new( + if signed_transactions { + libp2p_dog::TransactionAuthenticity::Signed(key.clone()) + } else { + libp2p_dog::TransactionAuthenticity::Author( + key.public().to_peer_id(), + ) + }, + config, + ) + .expect("Failed to create dog behaviour") + }) + .unwrap() + .with_swarm_config(|cfg| { + cfg.with_idle_connection_timeout(std::time::Duration::from_secs(u64::MAX)) + }) + .build(); match swarm.listen_on(addr.clone()) { Ok(_) => {} diff --git a/examples/simple/Cargo.toml b/examples/simple/Cargo.toml index adc8e48..f38493d 100644 --- a/examples/simple/Cargo.toml +++ b/examples/simple/Cargo.toml @@ -12,6 +12,7 @@ release = false [dependencies] libp2p = { workspace = true, features = ["noise", "tcp", "yamux", "tokio", "macros"] } libp2p-dog = { path = "../../dog" } +prometheus-client = { workspace = true } tokio = { workspace = true, features = ["full"] } clap = { version = "4.5.16", features = ["derive"] } tracing = { workspace = true } diff --git a/examples/simple/src/behaviour.rs b/examples/simple/src/behaviour.rs index dd75a53..26960e2 100644 --- a/examples/simple/src/behaviour.rs +++ b/examples/simple/src/behaviour.rs @@ -1,4 +1,5 @@ use libp2p::{identity::Keypair, swarm::NetworkBehaviour}; +use prometheus_client::registry::Registry; use crate::config::Config; @@ -20,10 +21,11 @@ pub(crate) struct MyBehaviour { } impl MyBehaviour { - pub(crate) fn new(_config: &Config, key: &Keypair) -> Self { - let dog = libp2p_dog::Behaviour::new( + pub(crate) fn new(_config: &Config, key: &Keypair, registry: &mut Registry) -> Self { + let dog = libp2p_dog::Behaviour::new_with_metrics( libp2p_dog::TransactionAuthenticity::Signed(key.clone()), libp2p_dog::Config::default(), + registry, // libp2p_dog::TransactionAuthenticity::Author(key.public().to_peer_id()), // libp2p_dog::ConfigBuilder::default() // .validation_mode(libp2p_dog::ValidationMode::None) diff --git a/examples/simple/src/main.rs b/examples/simple/src/main.rs index 36816fd..1a2c89f 100644 --- a/examples/simple/src/main.rs +++ b/examples/simple/src/main.rs @@ -1,6 +1,7 @@ use std::{error::Error, time::Duration}; use libp2p::{futures::StreamExt, swarm::dial_opts::DialOpts}; +use prometheus_client::registry::Registry; use tokio::{select, time}; use tracing::{debug, info}; @@ -21,23 +22,33 @@ async fn main() -> Result<(), Box> { let config = config::Config::new(&args); - let mut swarm = swarm::new_swarm(&config); + let mut registry = Registry::with_prefix("dog"); + + let mut swarm = swarm::new_swarm(&config, &mut registry); swarm.listen_on(format!("/ip4/127.0.0.1/tcp/{}", args.port).parse()?)?; let mut state = state::State::new(&config); if let Some(node) = &config.node { - swarm.dial(DialOpts::unknown_peer_id().address(node.clone()).build())?; + swarm.dial( + DialOpts::unknown_peer_id() + .address(node.clone()) + .allocate_new_port() + .build(), + )?; } let mut i = 0; let local_peer_id = swarm.local_peer_id().clone(); - loop { - let sleep = time::sleep(Duration::from_secs(5)); - tokio::pin!(sleep); + let sleep = time::sleep(Duration::from_secs(5)); + tokio::pin!(sleep); + let stop = time::sleep(Duration::from_secs(20)); + tokio::pin!(stop); + + loop { select! { event = swarm.select_next_some() => { handler::handle_swarm_event(event, &mut swarm, &config, &mut state).await; @@ -56,7 +67,16 @@ async fn main() -> Result<(), Box> { } i += 1; + + sleep.as_mut().reset(time::Instant::now() + Duration::from_secs(5)); + } + + _ = &mut stop => { + info!("Stopping the node"); + break; } } } + + Ok(()) } diff --git a/examples/simple/src/swarm.rs b/examples/simple/src/swarm.rs index 25fed6d..e4285b8 100644 --- a/examples/simple/src/swarm.rs +++ b/examples/simple/src/swarm.rs @@ -1,9 +1,10 @@ use crate::behaviour::MyBehaviour; use crate::config::Config; use libp2p::{noise, tcp, yamux, SwarmBuilder}; +use prometheus_client::registry::Registry; use std::time::Duration; -pub(crate) fn new_swarm(config: &Config) -> libp2p::Swarm { +pub(crate) fn new_swarm(config: &Config, registry: &mut Registry) -> libp2p::Swarm { SwarmBuilder::with_new_identity() .with_tokio() .with_tcp( @@ -12,7 +13,7 @@ pub(crate) fn new_swarm(config: &Config) -> libp2p::Swarm { yamux::Config::default, ) .unwrap() - .with_behaviour(|key| MyBehaviour::new(config, key)) + .with_behaviour(|key| MyBehaviour::new(config, key, registry)) .unwrap() .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))) .build()