Skip to content

Commit

Permalink
feat(kad): convert kad record.value to Bytes.
Browse files Browse the repository at this point in the history
This should help avoid potentially costly clones
  • Loading branch information
joshuef committed Oct 28, 2023
1 parent 459c9d4 commit caffe74
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 19 deletions.
3 changes: 2 additions & 1 deletion examples/distributed-key-value-store/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

use async_std::io;
use futures::{prelude::*, select};
use libp2p::bytes::Bytes;
use libp2p::kad;
use libp2p::kad::store::MemoryStore;
use libp2p::kad::Mode;
Expand Down Expand Up @@ -181,7 +182,7 @@ fn handle_input_line(kademlia: &mut kad::Behaviour<MemoryStore>, line: String) {
};
let value = {
match args.next() {
Some(value) => value.as_bytes().to_vec(),
Some(value) => Bytes::from(value.as_bytes().to_owned()),
None => {
eprintln!("Expected value");
return;
Expand Down
7 changes: 5 additions & 2 deletions examples/ipfs-kad/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::time::{Duration, Instant};
use anyhow::{bail, Result};
use clap::Parser;
use futures::StreamExt;
use libp2p::bytes::Bytes;
use libp2p::{bytes::BufMut, identity, kad, noise, swarm::SwarmEvent, tcp, yamux, PeerId};

const BOOTNODES: [&str; 4] = [
Expand Down Expand Up @@ -85,8 +86,10 @@ async fn main() -> Result<()> {
pk_record_key.put_slice("/pk/".as_bytes());
pk_record_key.put_slice(swarm.local_peer_id().to_bytes().as_slice());

let mut pk_record =
kad::Record::new(pk_record_key, local_key.public().encode_protobuf());
let mut pk_record = kad::Record::new(
pk_record_key,
Bytes::from(local_key.public().encode_protobuf()),
);
pk_record.publisher = Some(*swarm.local_peer_id());
pk_record.expires = Some(Instant::now().add(Duration::from_secs(60)));

Expand Down
3 changes: 2 additions & 1 deletion protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
See [PR 4698](https://github.com/libp2p/rust-libp2p/pull/4698).
- Remove previously deprecated type-aliases.
Users should follow the convention of importing the `libp2p::kad` module and referring to symbols as `kad::Behaviour` etc.
See [PR 4733](https://github.com/libp2p/rust-libp2p/pull/4733).
See [PR 4733](https://github.com/libp2p/rust-libp2p/pull/4733).
- Set Record.value to be `Bytes` instead of `Vec<u8>` to make clones lighter

## 0.44.6

Expand Down
9 changes: 5 additions & 4 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::*;
use crate::kbucket::Distance;
use crate::record::{store::MemoryStore, Key};
use crate::{K_VALUE, SHA_256_MH};
use bytes::Bytes;
use futures::{executor::block_on, future::poll_fn, prelude::*};
use futures_timer::Delay;
use libp2p_core::{
Expand Down Expand Up @@ -755,7 +756,7 @@ fn get_record() {
.map(|(_addr, swarm)| swarm)
.collect::<Vec<_>>();

let record = Record::new(random_multihash(), vec![4, 5, 6]);
let record = Record::new(random_multihash(), Bytes::from(vec![4, 5, 6]));

swarms[2].behaviour_mut().store.put(record.clone()).unwrap();
let qid = swarms[0].behaviour_mut().get_record(record.key.clone());
Expand Down Expand Up @@ -808,7 +809,7 @@ fn get_record_many() {
.collect::<Vec<_>>();
let num_results = 10;

let record = Record::new(random_multihash(), vec![4, 5, 6]);
let record = Record::new(random_multihash(), Bytes::from(vec![4, 5, 6]));

for swarm in swarms.iter_mut().take(num_nodes) {
swarm.behaviour_mut().store.put(record.clone()).unwrap();
Expand Down Expand Up @@ -1101,8 +1102,8 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
Multihash::<64>::wrap(SHA_256_MH, &thread_rng().gen::<[u8; 32]>())
.expect("32 array to fit into 64 byte multihash"),
);
let record_bob = Record::new(key.clone(), b"bob".to_vec());
let record_trudy = Record::new(key.clone(), b"trudy".to_vec());
let record_bob = Record::new(key.clone(), Bytes::from(b"bob".to_vec()));
let record_trudy = Record::new(key.clone(), Bytes::from(b"trudy".to_vec()));

// Make `bob` and `trudy` aware of their version of the record searched by
// `alice`.
Expand Down
5 changes: 3 additions & 2 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::protocol::{
};
use crate::record::{self, Record};
use crate::QueryId;
use bytes::Bytes;
use either::Either;
use futures::prelude::*;
use futures::stream::SelectAll;
Expand Down Expand Up @@ -287,7 +288,7 @@ pub enum HandlerEvent {
/// The key of the stored record.
key: record::Key,
/// The value of the stored record.
value: Vec<u8>,
value: Bytes,
/// The user data passed to the `PutValue`.
query_id: QueryId,
},
Expand Down Expand Up @@ -435,7 +436,7 @@ pub enum HandlerIn {
/// Key of the value that was put.
key: record::Key,
/// Value that was put.
value: Vec<u8>,
value: Bytes,
/// Identifier of the request that was made by the remote.
request_id: RequestId,
},
Expand Down
12 changes: 6 additions & 6 deletions protocols/kad/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
use crate::proto;
use crate::record::{self, Record};
use asynchronous_codec::{Decoder, Encoder, Framed};
use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use futures::prelude::*;
use instant::Instant;
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
Expand Down Expand Up @@ -314,7 +314,7 @@ pub enum KadResponseMsg {
/// The key of the record.
key: record::Key,
/// Value of the record.
value: Vec<u8>,
value: Bytes,
},
}

Expand Down Expand Up @@ -422,7 +422,7 @@ fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
key: key.to_vec(),
record: Some(proto::Record {
key: key.to_vec(),
value,
value: value.to_vec(),
..proto::Record::default()
}),
..proto::Message::default()
Expand Down Expand Up @@ -528,7 +528,7 @@ fn proto_to_resp_msg(message: proto::Message) -> Result<KadResponseMsg, io::Erro

Ok(KadResponseMsg::PutValue {
key,
value: rec.value,
value: Bytes::from(rec.value),
})
}

Expand All @@ -540,7 +540,7 @@ fn proto_to_resp_msg(message: proto::Message) -> Result<KadResponseMsg, io::Erro

fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
let key = record::Key::from(record.key);
let value = record.value;
let value = Bytes::from(record.value);

let publisher = if !record.publisher.is_empty() {
PeerId::from_bytes(&record.publisher)
Expand All @@ -567,7 +567,7 @@ fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
fn record_to_proto(record: Record) -> proto::Record {
proto::Record {
key: record.key.to_vec(),
value: record.value,
value: record.value.to_vec(),
publisher: record.publisher.map(|id| id.to_bytes()).unwrap_or_default(),
ttl: record
.expires
Expand Down
6 changes: 3 additions & 3 deletions protocols/kad/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub struct Record {
/// Key of the record.
pub key: Key,
/// Value of the record.
pub value: Vec<u8>,
pub value: Bytes,
/// The (original) publisher of the record.
pub publisher: Option<PeerId>,
/// The expiration time as measured by a local, monotonic clock.
Expand All @@ -87,7 +87,7 @@ pub struct Record {

impl Record {
/// Creates a new record for insertion into the DHT.
pub fn new<K>(key: K, value: Vec<u8>) -> Self
pub fn new<K>(key: K, value: Bytes) -> Self
where
K: Into<Key>,
{
Expand Down Expand Up @@ -176,7 +176,7 @@ mod tests {
fn arbitrary(g: &mut Gen) -> Record {
Record {
key: Key::arbitrary(g),
value: Vec::arbitrary(g),
value: Bytes::from(Vec::arbitrary(g)),
publisher: if bool::arbitrary(g) {
Some(PeerId::random())
} else {
Expand Down

0 comments on commit caffe74

Please sign in to comment.