From c8600cce21a94c9220859c49e38ec2652f9a52b5 Mon Sep 17 00:00:00 2001 From: joshuef Date: Mon, 30 Oct 2023 00:43:25 +0100 Subject: [PATCH] fix(floodsub): use `Bytes` for `FloodsubMessage::data` This should help avoid potentially costly clones. Pull-Request: #4754. --- Cargo.lock | 1 + protocols/floodsub/CHANGELOG.md | 4 +++- protocols/floodsub/Cargo.toml | 1 + protocols/floodsub/src/layer.rs | 11 ++++++----- protocols/floodsub/src/protocol.rs | 7 ++++--- 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f4fd13cfccd..2caae44d4d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2544,6 +2544,7 @@ name = "libp2p-floodsub" version = "0.44.0" dependencies = [ "asynchronous-codec 0.6.2", + "bytes", "cuckoofilter", "fnv", "futures", diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index 0c23c1a19f4..3891a09f4d4 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -1,7 +1,9 @@ ## 0.44.0 - unreleased +- Change publish to require `data: impl Into` to internally avoid any costly cloning / allocation. + See [PR 4754](https://github.com/libp2p/rust-libp2p/pull/4754). -## 0.43.0 +## 0.43.0 - Raise MSRV to 1.65. See [PR 3715]. diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index 522188e92fd..04b29e1812d 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -14,6 +14,7 @@ categories = ["network-programming", "asynchronous"] asynchronous-codec = "0.6" cuckoofilter = "0.5.0" fnv = "1.0" +bytes = "1.5" futures = "0.3.28" libp2p-core = { workspace = true } libp2p-swarm = { workspace = true } diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index af0d3373ec1..5b6b89fea87 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -24,6 +24,7 @@ use crate::protocol::{ }; use crate::topic::Topic; use crate::FloodsubConfig; +use bytes::Bytes; use cuckoofilter::{CuckooError, CuckooFilter}; use fnv::FnvHashSet; use libp2p_core::{Endpoint, Multiaddr}; @@ -171,12 +172,12 @@ impl Floodsub { } /// Publishes a message to the network, if we're subscribed to the topic only. - pub fn publish(&mut self, topic: impl Into, data: impl Into>) { + pub fn publish(&mut self, topic: impl Into, data: impl Into) { self.publish_many(iter::once(topic), data) } /// Publishes a message to the network, even if we're not subscribed to the topic. - pub fn publish_any(&mut self, topic: impl Into, data: impl Into>) { + pub fn publish_any(&mut self, topic: impl Into, data: impl Into) { self.publish_many_any(iter::once(topic), data) } @@ -187,7 +188,7 @@ impl Floodsub { pub fn publish_many( &mut self, topic: impl IntoIterator>, - data: impl Into>, + data: impl Into, ) { self.publish_many_inner(topic, data, true) } @@ -196,7 +197,7 @@ impl Floodsub { pub fn publish_many_any( &mut self, topic: impl IntoIterator>, - data: impl Into>, + data: impl Into, ) { self.publish_many_inner(topic, data, false) } @@ -204,7 +205,7 @@ impl Floodsub { fn publish_many_inner( &mut self, topic: impl IntoIterator>, - data: impl Into>, + data: impl Into, check_self_subscriptions: bool, ) { let message = FloodsubMessage { diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index ebd3d8b3bc8..edc842be8ce 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -21,6 +21,7 @@ use crate::proto; use crate::topic::Topic; use asynchronous_codec::Framed; +use bytes::Bytes; use futures::{ io::{AsyncRead, AsyncWrite}, Future, @@ -81,7 +82,7 @@ where messages.push(FloodsubMessage { source: PeerId::from_bytes(&publish.from.unwrap_or_default()) .map_err(|_| FloodsubError::InvalidPeerId)?, - data: publish.data.unwrap_or_default(), + data: publish.data.unwrap_or_default().into(), sequence_number: publish.seqno.unwrap_or_default(), topics: publish.topic_ids.into_iter().map(Topic::new).collect(), }); @@ -172,7 +173,7 @@ impl FloodsubRpc { .into_iter() .map(|msg| proto::Message { from: Some(msg.source.to_bytes()), - data: Some(msg.data), + data: Some(msg.data.to_vec()), seqno: Some(msg.sequence_number), topic_ids: msg.topics.into_iter().map(|topic| topic.into()).collect(), }) @@ -197,7 +198,7 @@ pub struct FloodsubMessage { pub source: PeerId, /// Content of the message. Its meaning is out of scope of this library. - pub data: Vec, + pub data: Bytes, /// An incrementing sequence number. pub sequence_number: Vec,