Skip to content

Commit

Permalink
fix(floodsub): use Bytes for FloodsubMessage::data
Browse files Browse the repository at this point in the history
This should help avoid potentially costly clones.

Pull-Request: #4754.
  • Loading branch information
joshuef authored Oct 29, 2023
1 parent 459c9d4 commit c8600cc
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion protocols/floodsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
## 0.44.0 - unreleased

- Change publish to require `data: impl Into<Bytes>` to internally avoid any costly cloning / allocation.
See [PR 4754](https://github.com/libp2p/rust-libp2p/pull/4754).

## 0.43.0
## 0.43.0

- Raise MSRV to 1.65.
See [PR 3715].
Expand Down
1 change: 1 addition & 0 deletions protocols/floodsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ categories = ["network-programming", "asynchronous"]
asynchronous-codec = "0.6"
cuckoofilter = "0.5.0"
fnv = "1.0"
bytes = "1.5"
futures = "0.3.28"
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
Expand Down
11 changes: 6 additions & 5 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::protocol::{
};
use crate::topic::Topic;
use crate::FloodsubConfig;
use bytes::Bytes;
use cuckoofilter::{CuckooError, CuckooFilter};
use fnv::FnvHashSet;
use libp2p_core::{Endpoint, Multiaddr};
Expand Down Expand Up @@ -171,12 +172,12 @@ impl Floodsub {
}

/// Publishes a message to the network, if we're subscribed to the topic only.
pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Bytes>) {
self.publish_many(iter::once(topic), data)
}

/// Publishes a message to the network, even if we're not subscribed to the topic.
pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Bytes>) {
self.publish_many_any(iter::once(topic), data)
}

Expand All @@ -187,7 +188,7 @@ impl Floodsub {
pub fn publish_many(
&mut self,
topic: impl IntoIterator<Item = impl Into<Topic>>,
data: impl Into<Vec<u8>>,
data: impl Into<Bytes>,
) {
self.publish_many_inner(topic, data, true)
}
Expand All @@ -196,15 +197,15 @@ impl Floodsub {
pub fn publish_many_any(
&mut self,
topic: impl IntoIterator<Item = impl Into<Topic>>,
data: impl Into<Vec<u8>>,
data: impl Into<Bytes>,
) {
self.publish_many_inner(topic, data, false)
}

fn publish_many_inner(
&mut self,
topic: impl IntoIterator<Item = impl Into<Topic>>,
data: impl Into<Vec<u8>>,
data: impl Into<Bytes>,
check_self_subscriptions: bool,
) {
let message = FloodsubMessage {
Expand Down
7 changes: 4 additions & 3 deletions protocols/floodsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use crate::proto;
use crate::topic::Topic;
use asynchronous_codec::Framed;
use bytes::Bytes;
use futures::{
io::{AsyncRead, AsyncWrite},
Future,
Expand Down Expand Up @@ -81,7 +82,7 @@ where
messages.push(FloodsubMessage {
source: PeerId::from_bytes(&publish.from.unwrap_or_default())
.map_err(|_| FloodsubError::InvalidPeerId)?,
data: publish.data.unwrap_or_default(),
data: publish.data.unwrap_or_default().into(),
sequence_number: publish.seqno.unwrap_or_default(),
topics: publish.topic_ids.into_iter().map(Topic::new).collect(),
});
Expand Down Expand Up @@ -172,7 +173,7 @@ impl FloodsubRpc {
.into_iter()
.map(|msg| proto::Message {
from: Some(msg.source.to_bytes()),
data: Some(msg.data),
data: Some(msg.data.to_vec()),
seqno: Some(msg.sequence_number),
topic_ids: msg.topics.into_iter().map(|topic| topic.into()).collect(),
})
Expand All @@ -197,7 +198,7 @@ pub struct FloodsubMessage {
pub source: PeerId,

/// Content of the message. Its meaning is out of scope of this library.
pub data: Vec<u8>,
pub data: Bytes,

/// An incrementing sequence number.
pub sequence_number: Vec<u8>,
Expand Down

0 comments on commit c8600cc

Please sign in to comment.