Skip to content

Commit

Permalink
feat: added basic metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
BastienFaivre committed Jan 13, 2025
1 parent d3e42ea commit 123eaa8
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 35 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
benchmark/config/*
!benchmark/config/README.md
target
43 changes: 43 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
[workspace]
members = [
"examples/simple",
"benchmark/code",
"dog",
"examples/simple",

# Tests
"dog/tests",
Expand All @@ -24,6 +25,7 @@ futures-timer = "3.0.3"
hex_fmt = "0.3.0"
libp2p = "0.54.1"
lru = "0.12.5"
prometheus-client = "0.23.0"
quick-protobuf = "0.8.1"
quick-protobuf-codec = "0.3.1"
rand = "0.8"
Expand Down
1 change: 1 addition & 0 deletions dog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ futures-timer = { workspace = true }
hex_fmt = { workspace = true }
libp2p = { workspace = true }
lru = { workspace = true }
prometheus-client = { workspace = true }
quick-protobuf = { workspace = true }
quick-protobuf-codec = { workspace = true }
rand = { workspace = true }
Expand Down
33 changes: 32 additions & 1 deletion dog/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ use libp2p::{
PeerId,
};
use lru::LruCache;
use prometheus_client::registry::Registry;
use quick_protobuf::{MessageWrite, Writer};

use crate::{
config::Config,
dog::{Controller, Route, Router},
error::PublishError,
handler::{Handler, HandlerEvent, HandlerIn},
metrics::Metrics,
protocol::SIGNING_PREFIX,
rpc::Sender,
rpc_proto::proto,
Expand Down Expand Up @@ -158,6 +160,7 @@ pub struct Behaviour<D = IdentityTransform> {
redundancy_controller: Controller,
router: Router,
cache: LruCache<TransactionId, ()>,
metrics: Option<Metrics>,
}

impl<D> Behaviour<D>
Expand All @@ -168,7 +171,15 @@ where
authenticity: TransactionAuthenticity,
config: Config,
) -> Result<Self, &'static str> {
Self::new_with_transform(authenticity, config, D::default())
Self::new_with_transform(authenticity, config, None, D::default())
}

pub fn new_with_metrics(
authenticity: TransactionAuthenticity,
config: Config,
metrics: &mut Registry,
) -> Result<Self, &'static str> {
Self::new_with_transform(authenticity, config, Some(metrics), D::default())
}
}

Expand All @@ -179,6 +190,7 @@ where
pub fn new_with_transform(
authenticity: TransactionAuthenticity,
config: Config,
metrics: Option<&mut Registry>,
data_transform: D,
) -> Result<Self, &'static str> {
// TODO: validate config
Expand All @@ -193,6 +205,7 @@ where
router: Router::new(),
cache: LruCache::new(NonZeroUsize::new(config.cache_size()).unwrap()),
config,
metrics: metrics.map(Metrics::new),
})
}
}
Expand Down Expand Up @@ -267,6 +280,10 @@ where

tracing::debug!(transaction=%tx_id, "Published transaction");

if let Some(m) = self.metrics.as_mut() {
m.register_published_tx();
}

Ok(tx_id)
}

Expand Down Expand Up @@ -326,6 +343,12 @@ where

/// Returns `true` if the sending was successful, `false` otherwise.
fn send_transaction(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool {
if let Some(m) = self.metrics.as_mut() {
if let RpcOut::Publish { ref tx, .. } | RpcOut::Forward { ref tx, .. } = rpc {
m.tx_sent(tx.raw_protobuf_len());
}
}

let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else {
tracing::error!(peer=%peer_id, "Could not send rpc to connection handler, peer doesn't exist in connected peers list");
return false;
Expand Down Expand Up @@ -419,6 +442,10 @@ where
raw_transaction: RawTransaction,
propagation_source: &PeerId,
) {
if let Some(m) = self.metrics.as_mut() {
m.tx_recv_unfiltered(raw_transaction.raw_protobuf_len());
}

let transaction = match self
.data_transform
.inbound_transform(raw_transaction.clone())
Expand Down Expand Up @@ -460,6 +487,10 @@ where
}
self.redundancy_controller.incr_first_time_txs_count();

if let Some(m) = self.metrics.as_mut() {
m.tx_recv();
}

tracing::debug!("Deliver received transaction to user");
self.events
.push_back(ToSwarm::GenerateEvent(Event::Transaction {
Expand Down
1 change: 1 addition & 0 deletions dog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod config;
mod dog;
mod error;
mod handler;
mod metrics;
pub mod protocol;
mod rpc;
mod rpc_proto;
Expand Down
86 changes: 86 additions & 0 deletions dog/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use prometheus_client::{metrics::counter::Counter, registry::Registry};

pub(crate) struct Metrics {
/// Number of transactions sent.
txs_sent_counts: Counter,
/// Number of bytes sent.
txs_sent_bytes: Counter,
/// Number of published transactions.
txs_sent_published: Counter,

/// Number of transactions received (without filtering duplicates).
txs_recv_counts_unfiltered: Counter,
/// Number of transactions received (after filtering duplicates).
txs_recv_counts: Counter,
/// Number of bytes received.
txs_recv_bytes: Counter,
}

impl Metrics {
pub(crate) fn new(registry: &mut Registry) -> Self {
let txs_sent_counts = Counter::default();
let txs_sent_bytes = Counter::default();
let txs_sent_published = Counter::default();
let txs_recv_counts_unfiltered = Counter::default();
let txs_recv_counts = Counter::default();
let txs_recv_bytes = Counter::default();

registry.register(
"txs_sent_counts",
"Number of transactions sent.",
txs_sent_counts.clone(),
);
registry.register(
"txs_sent_bytes",
"Number of bytes sent.",
txs_sent_bytes.clone(),
);
registry.register(
"txs_sent_published",
"Number of published transactions.",
txs_sent_published.clone(),
);
registry.register(
"txs_recv_counts_unfiltered",
"Number of transactions received (without filtering duplicates).",
txs_recv_counts_unfiltered.clone(),
);
registry.register(
"txs_recv_counts",
"Number of transactions received (after filtering duplicates).",
txs_recv_counts.clone(),
);
registry.register(
"txs_recv_bytes",
"Number of bytes received.",
txs_recv_bytes.clone(),
);

Self {
txs_sent_counts,
txs_sent_bytes,
txs_sent_published,
txs_recv_counts_unfiltered,
txs_recv_counts,
txs_recv_bytes,
}
}

pub(crate) fn tx_sent(&mut self, bytes: usize) {
self.txs_sent_counts.inc();
self.txs_sent_bytes.inc_by(bytes as u64);
}

pub(crate) fn register_published_tx(&mut self) {
self.txs_sent_published.inc();
}

pub(crate) fn tx_recv_unfiltered(&mut self, bytes: usize) {
self.txs_recv_counts_unfiltered.inc();
self.txs_recv_bytes.inc_by(bytes as u64);
}

pub(crate) fn tx_recv(&mut self) {
self.txs_recv_counts.inc();
}
}
9 changes: 9 additions & 0 deletions dog/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures_timer::Delay;
use libp2p::{identity::ParseError, swarm::ConnectionId, PeerId};
use quick_protobuf::MessageWrite;

use crate::{rpc::Sender, rpc_proto::proto};

Expand Down Expand Up @@ -57,6 +58,14 @@ pub struct RawTransaction {
pub key: Option<Vec<u8>>,
}

impl RawTransaction {
pub fn raw_protobuf_len(&self) -> usize {
let transaction: proto::Transaction = self.clone().into();

transaction.get_size()
}
}

impl From<RawTransaction> for proto::Transaction {
fn from(tx: RawTransaction) -> Self {
proto::Transaction {
Expand Down
51 changes: 27 additions & 24 deletions dog/tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use libp2p::{
futures::StreamExt, identity::Keypair, swarm::dial_opts::DialOpts, Multiaddr, PeerId,
SwarmBuilder,
};
use libp2p_dog::{IdentityTransform, Route};
use libp2p_dog::Route;
use rand::Rng;
use tokio::{sync::mpsc, task::JoinHandle, time::sleep};

Expand Down Expand Up @@ -200,30 +200,33 @@ impl TestNode {
self.tx_publish = Some(tx_publish);

tokio::spawn(async move {
let mut swarm = SwarmBuilder::with_existing_identity(keypair)
.with_tokio()
.with_tcp(
libp2p::tcp::Config::new().nodelay(true), // Disable Nagle's algorithm
libp2p::noise::Config::new,
libp2p::yamux::Config::default,
)
.unwrap()
.with_behaviour(|key| {
libp2p_dog::Behaviour::<IdentityTransform>::new(
if signed_transactions {
libp2p_dog::TransactionAuthenticity::Signed(key.clone())
} else {
libp2p_dog::TransactionAuthenticity::Author(key.public().to_peer_id())
},
config,
let mut swarm: libp2p::Swarm<libp2p_dog::Behaviour> =
SwarmBuilder::with_existing_identity(keypair)
.with_tokio()
.with_tcp(
libp2p::tcp::Config::new().nodelay(true), // Disable Nagle's algorithm
libp2p::noise::Config::new,
libp2p::yamux::Config::default,
)
.expect("Failed to create dog behaviour")
})
.unwrap()
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(std::time::Duration::from_secs(u64::MAX))
})
.build();
.unwrap()
.with_behaviour(|key| {
libp2p_dog::Behaviour::new(
if signed_transactions {
libp2p_dog::TransactionAuthenticity::Signed(key.clone())
} else {
libp2p_dog::TransactionAuthenticity::Author(
key.public().to_peer_id(),
)
},
config,
)
.expect("Failed to create dog behaviour")
})
.unwrap()
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(std::time::Duration::from_secs(u64::MAX))
})
.build();

match swarm.listen_on(addr.clone()) {
Ok(_) => {}
Expand Down
Loading

0 comments on commit 123eaa8

Please sign in to comment.