From caffe744c1ff59031f6653374ceaaecdbe9e0a44 Mon Sep 17 00:00:00 2001 From: Josh Wilson Date: Fri, 27 Oct 2023 21:24:00 +0200 Subject: [PATCH] feat(kad): convert kad record.value to Bytes. This should help avoid potentially costly clones --- examples/distributed-key-value-store/src/main.rs | 3 ++- examples/ipfs-kad/src/main.rs | 7 +++++-- protocols/kad/CHANGELOG.md | 3 ++- protocols/kad/src/behaviour/test.rs | 9 +++++---- protocols/kad/src/handler.rs | 5 +++-- protocols/kad/src/protocol.rs | 12 ++++++------ protocols/kad/src/record.rs | 6 +++--- 7 files changed, 26 insertions(+), 19 deletions(-) diff --git a/examples/distributed-key-value-store/src/main.rs b/examples/distributed-key-value-store/src/main.rs index b8ecd059fc8e..f4ab07694082 100644 --- a/examples/distributed-key-value-store/src/main.rs +++ b/examples/distributed-key-value-store/src/main.rs @@ -22,6 +22,7 @@ use async_std::io; use futures::{prelude::*, select}; +use libp2p::bytes::Bytes; use libp2p::kad; use libp2p::kad::store::MemoryStore; use libp2p::kad::Mode; @@ -181,7 +182,7 @@ fn handle_input_line(kademlia: &mut kad::Behaviour, line: String) { }; let value = { match args.next() { - Some(value) => value.as_bytes().to_vec(), + Some(value) => Bytes::from(value.as_bytes().to_owned()), None => { eprintln!("Expected value"); return; diff --git a/examples/ipfs-kad/src/main.rs b/examples/ipfs-kad/src/main.rs index dcb0ef953355..ddf35c077766 100644 --- a/examples/ipfs-kad/src/main.rs +++ b/examples/ipfs-kad/src/main.rs @@ -27,6 +27,7 @@ use std::time::{Duration, Instant}; use anyhow::{bail, Result}; use clap::Parser; use futures::StreamExt; +use libp2p::bytes::Bytes; use libp2p::{bytes::BufMut, identity, kad, noise, swarm::SwarmEvent, tcp, yamux, PeerId}; const BOOTNODES: [&str; 4] = [ @@ -85,8 +86,10 @@ async fn main() -> Result<()> { pk_record_key.put_slice("/pk/".as_bytes()); pk_record_key.put_slice(swarm.local_peer_id().to_bytes().as_slice()); - let mut pk_record = - kad::Record::new(pk_record_key, local_key.public().encode_protobuf()); + let mut pk_record = kad::Record::new( + pk_record_key, + Bytes::from(local_key.public().encode_protobuf()), + ); pk_record.publisher = Some(*swarm.local_peer_id()); pk_record.expires = Some(Instant::now().add(Duration::from_secs(60))); diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index d2b92195ab38..c45f522e78d4 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -10,7 +10,8 @@ See [PR 4698](https://github.com/libp2p/rust-libp2p/pull/4698). - Remove previously deprecated type-aliases. Users should follow the convention of importing the `libp2p::kad` module and referring to symbols as `kad::Behaviour` etc. - See [PR 4733](https://github.com/libp2p/rust-libp2p/pull/4733). + See [PR 4733](https://github.com/libp2p/rust-libp2p/pull/4733). +- Set Record.value to be `Bytes` instead of `Vec` to make clones lighter ## 0.44.6 diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index f75c59b64b06..c89707c9ac6d 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -25,6 +25,7 @@ use super::*; use crate::kbucket::Distance; use crate::record::{store::MemoryStore, Key}; use crate::{K_VALUE, SHA_256_MH}; +use bytes::Bytes; use futures::{executor::block_on, future::poll_fn, prelude::*}; use futures_timer::Delay; use libp2p_core::{ @@ -755,7 +756,7 @@ fn get_record() { .map(|(_addr, swarm)| swarm) .collect::>(); - let record = Record::new(random_multihash(), vec![4, 5, 6]); + let record = Record::new(random_multihash(), Bytes::from(vec![4, 5, 6])); swarms[2].behaviour_mut().store.put(record.clone()).unwrap(); let qid = swarms[0].behaviour_mut().get_record(record.key.clone()); @@ -808,7 +809,7 @@ fn get_record_many() { .collect::>(); let num_results = 10; - let record = Record::new(random_multihash(), vec![4, 5, 6]); + let record = Record::new(random_multihash(), Bytes::from(vec![4, 5, 6])); for swarm in swarms.iter_mut().take(num_nodes) { swarm.behaviour_mut().store.put(record.clone()).unwrap(); @@ -1101,8 +1102,8 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { Multihash::<64>::wrap(SHA_256_MH, &thread_rng().gen::<[u8; 32]>()) .expect("32 array to fit into 64 byte multihash"), ); - let record_bob = Record::new(key.clone(), b"bob".to_vec()); - let record_trudy = Record::new(key.clone(), b"trudy".to_vec()); + let record_bob = Record::new(key.clone(), Bytes::from(b"bob".to_vec())); + let record_trudy = Record::new(key.clone(), Bytes::from(b"trudy".to_vec())); // Make `bob` and `trudy` aware of their version of the record searched by // `alice`. diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index fce77bc13e40..f7dfc1bcbf70 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -24,6 +24,7 @@ use crate::protocol::{ }; use crate::record::{self, Record}; use crate::QueryId; +use bytes::Bytes; use either::Either; use futures::prelude::*; use futures::stream::SelectAll; @@ -287,7 +288,7 @@ pub enum HandlerEvent { /// The key of the stored record. key: record::Key, /// The value of the stored record. - value: Vec, + value: Bytes, /// The user data passed to the `PutValue`. query_id: QueryId, }, @@ -435,7 +436,7 @@ pub enum HandlerIn { /// Key of the value that was put. key: record::Key, /// Value that was put. - value: Vec, + value: Bytes, /// Identifier of the request that was made by the remote. request_id: RequestId, }, diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 1cf147456756..93b16627bcac 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -29,7 +29,7 @@ use crate::proto; use crate::record::{self, Record}; use asynchronous_codec::{Decoder, Encoder, Framed}; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use futures::prelude::*; use instant::Instant; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; @@ -314,7 +314,7 @@ pub enum KadResponseMsg { /// The key of the record. key: record::Key, /// Value of the record. - value: Vec, + value: Bytes, }, } @@ -422,7 +422,7 @@ fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message { key: key.to_vec(), record: Some(proto::Record { key: key.to_vec(), - value, + value: value.to_vec(), ..proto::Record::default() }), ..proto::Message::default() @@ -528,7 +528,7 @@ fn proto_to_resp_msg(message: proto::Message) -> Result Result Result { let key = record::Key::from(record.key); - let value = record.value; + let value = Bytes::from(record.value); let publisher = if !record.publisher.is_empty() { PeerId::from_bytes(&record.publisher) @@ -567,7 +567,7 @@ fn record_from_proto(record: proto::Record) -> Result { fn record_to_proto(record: Record) -> proto::Record { proto::Record { key: record.key.to_vec(), - value: record.value, + value: record.value.to_vec(), publisher: record.publisher.map(|id| id.to_bytes()).unwrap_or_default(), ttl: record .expires diff --git a/protocols/kad/src/record.rs b/protocols/kad/src/record.rs index 4eb8e861c6f6..812ed5099cee 100644 --- a/protocols/kad/src/record.rs +++ b/protocols/kad/src/record.rs @@ -78,7 +78,7 @@ pub struct Record { /// Key of the record. pub key: Key, /// Value of the record. - pub value: Vec, + pub value: Bytes, /// The (original) publisher of the record. pub publisher: Option, /// The expiration time as measured by a local, monotonic clock. @@ -87,7 +87,7 @@ pub struct Record { impl Record { /// Creates a new record for insertion into the DHT. - pub fn new(key: K, value: Vec) -> Self + pub fn new(key: K, value: Bytes) -> Self where K: Into, { @@ -176,7 +176,7 @@ mod tests { fn arbitrary(g: &mut Gen) -> Record { Record { key: Key::arbitrary(g), - value: Vec::arbitrary(g), + value: Bytes::from(Vec::arbitrary(g)), publisher: if bool::arbitrary(g) { Some(PeerId::random()) } else {