diff --git a/dht-cache/src/cache.rs b/dht-cache/src/cache.rs index d874473..aed3b40 100644 --- a/dht-cache/src/cache.rs +++ b/dht-cache/src/cache.rs @@ -57,7 +57,7 @@ impl Builder { std::fs::write(pk_path, pem)?; der } - Err(e) => Err(e)?, + Err(e) => return Err(e.into()), } } else { generate_rsa_key().1 @@ -110,11 +110,11 @@ impl Cache { /// Persist a value within the DHT /// /// It is identified by the topic and uuid value - pub async fn put(&self, topic: &str, uuid: &str, value: &Value) -> Result<(), Error> { + pub async fn put(&self, topic: impl Into, uuid: impl Into, value: Value) -> Result<(), Error> { let elem = DomoCacheElement { - topic_name: topic.to_string(), - topic_uuid: uuid.to_string(), - value: value.to_owned(), + topic_name: topic.into(), + topic_uuid: uuid.into(), + value, publication_timestamp: utils::get_epoch_ms(), publisher_peer_id: self.peer_id.clone(), ..Default::default() @@ -133,10 +133,10 @@ impl Cache { /// /// It inserts the deletion entry and the entry value will be marked as deleted and removed /// from the stored cache. - pub async fn del(&self, topic: &str, uuid: &str) -> Result<(), Error> { + pub async fn del(&self, topic: impl Into, uuid: impl Into) -> Result<(), Error> { let elem = DomoCacheElement { - topic_name: topic.to_string(), - topic_uuid: uuid.to_string(), + topic_name: topic.into(), + topic_uuid: uuid.into(), publication_timestamp: utils::get_epoch_ms(), publisher_peer_id: self.peer_id.clone(), deleted: true, @@ -317,6 +317,10 @@ pub fn cache_channel( let mut elem = elem.to_owned(); log::debug!("resending {}", elem.topic_uuid); elem.republication_timestamp = utils::get_epoch_ms(); + + // This cannot fail because `cmd` is the sender part of the + // `stream` we are currently reading. In practice, we are + // queueing the commands in order to read them later. cmd.send(Command::Publish(serde_json::to_value(&elem).unwrap())) .unwrap(); });