Skip to content

Commit

Permalink
add poll function in kademlia behaviour that polls bootstrap function
Browse files Browse the repository at this point in the history
  • Loading branch information
PanGan21 committed Nov 12, 2023
1 parent 7f4ba69 commit dc770aa
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
16 changes: 1 addition & 15 deletions misc/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use base64::Engine;
use clap::Parser;
use futures::stream::StreamExt;
use futures_timer::Delay;
use libp2p::identity;
use libp2p::identity::PeerId;
use libp2p::kad;
Expand All @@ -14,17 +13,13 @@ use prometheus_client::registry::Registry;
use std::error::Error;
use std::path::PathBuf;
use std::str::FromStr;
use std::task::Poll;
use std::time::Duration;
use tracing_subscriber::EnvFilter;
use zeroize::Zeroizing;

mod behaviour;
mod config;
mod http_service;

const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60);

#[derive(Debug, Parser)]
#[clap(name = "libp2p server", about = "A rust-libp2p server binary.")]
struct Opts {
Expand Down Expand Up @@ -125,17 +120,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
});

let mut bootstrap_timer = Delay::new(BOOTSTRAP_INTERVAL);

loop {
if let Poll::Ready(()) = futures::poll!(&mut bootstrap_timer) {
bootstrap_timer.reset(BOOTSTRAP_INTERVAL);
let _ = swarm
.behaviour_mut()
.kademlia
.as_mut()
.map(|k| k.bootstrap());
}
let _ = swarm.behaviour_mut().kademlia.as_mut().map(|k| k.poll());

let event = swarm.next().await.expect("Swarm not to terminate.");
metrics.record(&event);
Expand Down
33 changes: 33 additions & 0 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::record::{
};
use crate::K_VALUE;
use fnv::{FnvHashMap, FnvHashSet};
use futures_timer::Delay;
use instant::Instant;
use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
Expand Down Expand Up @@ -116,6 +117,9 @@ pub struct Behaviour<TStore> {

/// The record storage.
store: TStore,

/// The interval used by [`Behaviour::poll`] to call [`Behaviour::bootstrap`].
refresh_interval: Option<Duration>,
}

/// The configurable strategies for the insertion of peers
Expand Down Expand Up @@ -181,6 +185,7 @@ pub struct Config {
provider_publication_interval: Option<Duration>,
kbucket_inserts: BucketInserts,
caching: Caching,
refresh_interval: Option<Duration>,
}

impl Default for Config {
Expand All @@ -197,6 +202,7 @@ impl Default for Config {
provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
kbucket_inserts: BucketInserts::OnConnected,
caching: Caching::Enabled { max_peers: 1 },
refresh_interval: Some(Duration::from_secs(5 * 60)),
}
}
}
Expand Down Expand Up @@ -391,6 +397,14 @@ impl Config {
self.caching = c;
self
}

/// Sets the interval on which [`Behaviour::bootstrap`] is called from [`Behaviour::poll`]
///
/// `None` means that [`Behaviour::bootstrap`] is not called from [`Behaviour::poll`]
pub fn set_refresh_interval(&mut self, interval: Option<Duration>) -> &mut Self {
self.refresh_interval = interval;
self
}
}

impl<TStore> Behaviour<TStore>
Expand Down Expand Up @@ -448,6 +462,7 @@ where
mode: Mode::Client,
auto_mode: true,
no_events_waker: None,
refresh_interval: config.refresh_interval,
}
}

Expand Down Expand Up @@ -1005,6 +1020,24 @@ where
}
}

/// Asynchronously polls the Kademlia behavior, triggering [`Behaviour::bootstrap`] if necessary.
///
/// This function checks the refresh interval and, if ready, resets the timer and
/// triggers the bootstrap operation. It returns a `Result<(), NoKnownPeers>` where
/// Ok(()) indicates success, and Err(NoKnownPeers) is returned if there are no known peers
/// during the bootstrap operation. See [`Behaviour::bootstrap`] for more details.
pub async fn poll(&mut self) -> Result<(), NoKnownPeers> {
if let Some(refresh_interval) = &mut self.refresh_interval {
let mut bootstrap_timer = Delay::new(*refresh_interval);
if let Poll::Ready(()) = futures::poll!(&mut bootstrap_timer) {
bootstrap_timer.reset(*refresh_interval);
self.bootstrap()?;
};
}

Ok(())
}

fn reconfigure_mode(&mut self) {
if self.connections.is_empty() {
return;
Expand Down

0 comments on commit dc770aa

Please sign in to comment.