Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(relay): move stream-handling away from {In,Out}boundUpgrade #4275

Merged
merged 123 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
123 commits
Select commit Hold shift + click to select a range
79ee923
Refactoring of stop protocol
dgarus Jul 24, 2023
4db318f
Merge branch 'master' into 4075-relay
dgarus Jul 31, 2023
1b39aaf
fix review comment
dgarus Aug 1, 2023
aa09aa3
Merge branch 'master' into 4075-relay
dgarus Aug 1, 2023
de33b72
Refactoring of hop protocol
dgarus Aug 2, 2023
c5401be
Merge branch 'master' into 4075-relay
dgarus Aug 2, 2023
70bb6d1
Merge branch 'master' into 4075-relay
dgarus Aug 2, 2023
72b2882
Merge branch 'master' into 4075-relay
dgarus Aug 2, 2023
0b629da
fix review comments
dgarus Aug 3, 2023
91602d0
Merge branch 'master' into 4075-relay
dgarus Aug 3, 2023
dde7f89
fix review comments
dgarus Aug 3, 2023
332e933
Merge branch 'master' into 4075-relay
dgarus Aug 7, 2023
55fa4d6
fix review comments
dgarus Aug 7, 2023
3fb008c
fix review comments
dgarus Aug 7, 2023
4aac352
fix review comments
dgarus Aug 7, 2023
1a1f912
fix review comments
dgarus Aug 7, 2023
4e699dd
fix review comments
dgarus Aug 7, 2023
1625704
fix rustfmt
dgarus Aug 7, 2023
77f4aaa
fix review comments
dgarus Aug 7, 2023
4be941f
fix review comments
dgarus Aug 7, 2023
0f79f76
fix review comments
dgarus Aug 7, 2023
162c9c8
ConnectionCommand to Command
dgarus Aug 7, 2023
d163a6f
Merge branch 'master' into 4075-relay
dgarus Aug 7, 2023
fd346a1
rename `fn`s
dgarus Aug 7, 2023
cdc6bcf
fix rustfmt
dgarus Aug 7, 2023
ceb0032
Merge branch 'master' into 4075-relay
dgarus Aug 7, 2023
f71b97e
fix review comments
dgarus Aug 8, 2023
0bf78c9
Merge branch 'master' into 4075-relay
dgarus Aug 8, 2023
9bd9eb0
Merge branch 'master' into 4075-relay
dgarus Aug 8, 2023
6116e4d
added futures-bounded struct
dgarus Aug 9, 2023
13b690c
added futures-bounded struct
dgarus Aug 9, 2023
235608f
Merge branch 'master' into 4075-relay
dgarus Aug 10, 2023
4e1bee0
Removed extra changes
dgarus Aug 22, 2023
67f382c
Removed extra changes
dgarus Aug 22, 2023
379b23f
Merge branch 'master' into 4075-relay
dgarus Aug 22, 2023
d6e09d3
using futures-bounded struct
dgarus Aug 23, 2023
28e101d
using futures-bounded struct
dgarus Aug 23, 2023
ef892ad
Merge branch 'master' into 4075-relay
dgarus Aug 23, 2023
4f1b69d
using futures-bounded struct
dgarus Aug 23, 2023
c0d1b23
fix format
dgarus Aug 23, 2023
c154fcb
Merge branch 'master' into 4075-relay
dgarus Aug 23, 2023
5f0d826
Merge branch 'master' into 4075-relay
dgarus Aug 23, 2023
31661ae
Update protocols/relay/src/behaviour.rs
dgarus Aug 26, 2023
1a83ed1
rename `max_concurrent_streams` to `max_concurrent_streams_per_connec…
dgarus Aug 26, 2023
22f676d
Update protocols/relay/src/behaviour.rs
dgarus Aug 26, 2023
7c0076b
Update protocols/relay/src/behaviour/handler.rs
dgarus Aug 26, 2023
9a84a18
Merge remote-tracking branch 'origin/4075-relay' into 4075-relay
dgarus Aug 26, 2023
553a316
fix review comments
dgarus Aug 26, 2023
18afe79
removed config fields
dgarus Aug 26, 2023
a3f2839
Update protocols/relay/src/behaviour/handler.rs
dgarus Aug 26, 2023
3a265ab
fix review comments
dgarus Aug 26, 2023
347d6b0
Merge branch 'master' into 4075-relay
dgarus Aug 26, 2023
38f94c2
fix toml file
dgarus Aug 26, 2023
5ccf0d5
fix review comments
dgarus Aug 26, 2023
0a87fa6
fix review comments
dgarus Aug 26, 2023
e68966d
refactoring
dgarus Aug 26, 2023
f541410
аix failed tests
dgarus Aug 26, 2023
4ad061e
Merge branch 'master' into 4075-relay
dgarus Aug 29, 2023
f3d01ba
Merge branch 'master' into 4075-relay
dgarus Aug 31, 2023
1bf4136
Merge branch 'master' into 4075-relay
dgarus Sep 4, 2023
3887c6a
Merge branch 'master' into 4075-relay
dgarus Sep 4, 2023
c5e4257
Merge branch 'master' into 4075-relay
dgarus Sep 4, 2023
c15e365
Merge branch 'master' into 4075-relay
dgarus Sep 4, 2023
f728a51
Merge branch 'master' into 4075-relay
dgarus Sep 4, 2023
0724277
fix review comments
dgarus Sep 4, 2023
bb789c2
bounded and unique workers sets
dgarus Sep 5, 2023
57f416a
Merge branch 'master' into 4075-relay
dgarus Sep 5, 2023
4d26e27
Merge branch 'master' into 4075-relay
dgarus Sep 5, 2023
ec75ca8
Merge branch 'master' into 4075-relay
dgarus Sep 5, 2023
13bb338
rewrite unique workers
dgarus Sep 6, 2023
64b0806
Merge branch 'master' into 4075-relay
dgarus Sep 6, 2023
a02fd7e
fix docs
dgarus Sep 6, 2023
d36af74
fix clippy
dgarus Sep 6, 2023
05959c2
rewrite unique workers
dgarus Sep 6, 2023
92b456a
Merge branch 'master' into 4075-relay
dgarus Sep 6, 2023
11a36ee
wrapping UniqueWorkers
dgarus Sep 7, 2023
eb1a38d
rustfmt
dgarus Sep 7, 2023
5efe055
fix doc
dgarus Sep 7, 2023
acb75af
fix review comments
dgarus Sep 8, 2023
f4dd44f
Update misc/futures-bounded/src/unique.rs
dgarus Sep 8, 2023
adde2e5
Update protocols/relay/src/priv_client/handler.rs
dgarus Sep 8, 2023
407443b
Update protocols/relay/src/priv_client/handler.rs
dgarus Sep 8, 2023
8b5f307
Update protocols/relay/src/priv_client/handler.rs
dgarus Sep 8, 2023
bd67ebf
Update protocols/relay/src/priv_client/handler.rs
dgarus Sep 8, 2023
1655c59
Merge branch 'master' into 4075-relay
dgarus Sep 8, 2023
6dfc8c6
fix review comments
dgarus Sep 8, 2023
838595e
Renamed to FuturesList and FuturesMap
dgarus Sep 8, 2023
cc0ebe2
fix review comments
dgarus Sep 9, 2023
35d8fd2
refactoring of outbound_hop::handle_reserve_message_response
dgarus Sep 10, 2023
6be7123
refactoring of outbound_hop::handle_reserve_message_response
dgarus Sep 10, 2023
d68019e
Merge branch 'master' into 4075-relay
dgarus Sep 10, 2023
86a0ac7
Refactor `FuturesMap` to use `FuturesUnbounded` for efficiency
thomaseizinger Sep 11, 2023
7713aca
Remove tests that are covered by other impl
thomaseizinger Sep 11, 2023
036164c
Reformat docs
thomaseizinger Sep 11, 2023
ff30991
Improve panic message
thomaseizinger Sep 11, 2023
5cb43e0
Make `Timeout` a proper error
thomaseizinger Sep 11, 2023
0a4aa09
Handle outbound circuits using 1 `FuturesList`
thomaseizinger Sep 11, 2023
3c4bd99
Don't nest `match`
thomaseizinger Sep 11, 2023
d73f3e9
Don't publish `futures-bounded` for now
thomaseizinger Sep 11, 2023
ced1ae3
Inline type-alias that is only used once
thomaseizinger Sep 11, 2023
68feb38
Use `void::unreachable`
thomaseizinger Sep 11, 2023
c98f2e1
Log PeerIds using Display
thomaseizinger Sep 11, 2023
c37c8ee
Use `void::unreachable`
thomaseizinger Sep 11, 2023
21f4660
Reduce diff
thomaseizinger Sep 11, 2023
902aed8
Remove commented out code
thomaseizinger Sep 11, 2023
d30dcc2
Use `void::unreachable`
thomaseizinger Sep 11, 2023
ba8cb44
Rename function to resemble message that is being sent
thomaseizinger Sep 11, 2023
2883255
Rename `StopCommand` to `PendingConnect`
thomaseizinger Sep 11, 2023
e2c9df3
Remove circular dependencies
thomaseizinger Sep 11, 2023
8c6d61f
Merge branch 'master' into 4075-relay
dgarus Sep 11, 2023
0354fc5
Merge branch 'master' into 4075-relay
dgarus Sep 11, 2023
04c3704
Merge branch 'master' into 4075-relay
dgarus Sep 12, 2023
ebdcfc0
Merge branch 'master' into 4075-relay
thomaseizinger Sep 13, 2023
38d9a28
Fix clippy lint
thomaseizinger Sep 13, 2023
0cb3841
Allow beta lint
thomaseizinger Sep 13, 2023
5cbe9d7
Merge branch 'master' into 4075-relay
thomaseizinger Sep 13, 2023
a201778
Merge branch 'master' into 4075-relay
dgarus Sep 14, 2023
6e63600
Merge branch 'master' into 4075-relay
dgarus Sep 19, 2023
3b8b07d
fix review comments
dgarus Sep 19, 2023
d14eb90
Update misc/futures-bounded/src/map.rs
dgarus Sep 19, 2023
2de759c
feat(swarm): allow configuration to idle connection timeout
startup-dreamer Sep 19, 2023
9b3d48c
Fix formatting
thomaseizinger Sep 20, 2023
a872c0c
Merge branch 'master' into 4075-relay
dgarus Sep 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ members = [
"interop-tests",
"misc/allow-block-list",
"misc/connection-limits",
"misc/futures-bounded",
"misc/keygen",
"misc/memory-connection-limits",
"misc/metrics",
Expand Down Expand Up @@ -69,6 +70,7 @@ resolver = "2"
rust-version = "1.65.0"

[workspace.dependencies]
futures-bounded = { version = "0.1.0", path = "misc/futures-bounded" }
libp2p = { version = "0.52.3", path = "libp2p" }
libp2p-allow-block-list = { version = "0.2.0", path = "misc/allow-block-list" }
libp2p-autonat = { version = "0.11.0", path = "protocols/autonat" }
Expand Down
3 changes: 3 additions & 0 deletions misc/futures-bounded/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 0.1.0 - unreleased

Initial release.
20 changes: 20 additions & 0 deletions misc/futures-bounded/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "futures-bounded"
version = "0.1.0"
edition = "2021"
rust-version.workspace = true
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
keywords = ["futures", "async", "backpressure"]
categories = ["data-structures", "asynchronous"]
description = "Utilities for bounding futures in size and time."
publish = false # TEMP FIX until https://github.com/obi1kenobi/cargo-semver-checks-action/issues/53 is fixed.

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures-util = { version = "0.3.28" }
futures-timer = "3.0.2"

[dev-dependencies]
tokio = { version = "1.29.1", features = ["macros", "rt"] }
28 changes: 28 additions & 0 deletions misc/futures-bounded/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
mod map;
mod set;

pub use map::{FuturesMap, PushError};
pub use set::FuturesSet;
use std::fmt;
use std::fmt::Formatter;
use std::time::Duration;

/// A future failed to complete within the given timeout.
#[derive(Debug)]
pub struct Timeout {
limit: Duration,
}

impl Timeout {
fn new(duration: Duration) -> Self {
Self { limit: duration }
}
}

impl fmt::Display for Timeout {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "future failed to complete within {:?}", self.limit)
}
}

impl std::error::Error for Timeout {}
268 changes: 268 additions & 0 deletions misc/futures-bounded/src/map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
use std::future::Future;
use std::hash::Hash;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::time::Duration;

use futures_timer::Delay;
use futures_util::future::BoxFuture;
use futures_util::stream::FuturesUnordered;
use futures_util::{FutureExt, StreamExt};

use crate::Timeout;

/// Represents a map of [`Future`]s.
///
/// Each future must finish within the specified time and the map never outgrows its capacity.
pub struct FuturesMap<ID, O> {
timeout: Duration,
capacity: usize,
inner: FuturesUnordered<TaggedFuture<ID, TimeoutFuture<BoxFuture<'static, O>>>>,
empty_waker: Option<Waker>,
full_waker: Option<Waker>,
}

/// Error of a future pushing
#[derive(PartialEq, Debug)]
pub enum PushError<F> {
/// The length of the set is equal to the capacity
BeyondCapacity(F),
/// The set already contains the given future's ID
ReplacedFuture(F),
}

impl<ID, O> FuturesMap<ID, O> {
pub fn new(timeout: Duration, capacity: usize) -> Self {
Self {
timeout,
capacity,
inner: Default::default(),
empty_waker: None,
full_waker: None,
}
}
}

impl<ID, O> FuturesMap<ID, O>
where
ID: Clone + Hash + Eq + Send + Unpin + 'static,
{
/// Push a future into the map.
///
/// This method inserts the given future with defined `future_id` to the set.
/// If the length of the map is equal to the capacity, this method returns [PushError::BeyondCapacity],
/// that contains the passed future. In that case, the future is not inserted to the map.
/// If a future with the given `future_id` already exists, then the old future will be replaced by a new one.
/// In that case, the returned error [PushError::ReplacedFuture] contains the old future.
pub fn try_push<F>(&mut self, future_id: ID, future: F) -> Result<(), PushError<BoxFuture<O>>>
where
F: Future<Output = O> + Send + 'static,
{
if self.inner.len() >= self.capacity {
return Err(PushError::BeyondCapacity(future.boxed()));
}

if let Some(waker) = self.empty_waker.take() {
waker.wake();
}

match self.inner.iter_mut().find(|tagged| tagged.tag == future_id) {
None => {
self.inner.push(TaggedFuture {
tag: future_id,
inner: TimeoutFuture {
inner: future.boxed(),
timeout: Delay::new(self.timeout),
},
});

Ok(())
}
Some(existing) => {
let old_future = mem::replace(
&mut existing.inner,
TimeoutFuture {
inner: future.boxed(),
timeout: Delay::new(self.timeout),
},
);

Err(PushError::ReplacedFuture(old_future.inner))
}
}
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

#[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] // &mut Context is idiomatic.
pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> {
if self.inner.len() < self.capacity {
return Poll::Ready(());
}

self.full_waker = Some(cx.waker().clone());

Poll::Pending
}

pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(ID, Result<O, Timeout>)> {
let maybe_result = futures_util::ready!(self.inner.poll_next_unpin(cx));

match maybe_result {
None => {
self.empty_waker = Some(cx.waker().clone());
Poll::Pending
}
Some((id, Ok(output))) => Poll::Ready((id, Ok(output))),
Some((id, Err(_timeout))) => Poll::Ready((id, Err(Timeout::new(self.timeout)))),
}
}
}

struct TimeoutFuture<F> {
inner: F,
timeout: Delay,
}

impl<F> Future for TimeoutFuture<F>
where
F: Future + Unpin,
{
type Output = Result<F::Output, ()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.timeout.poll_unpin(cx).is_ready() {
return Poll::Ready(Err(()));
}

self.inner.poll_unpin(cx).map(Ok)
}
}

struct TaggedFuture<T, F> {
tag: T,
inner: F,
}

impl<T, F> Future for TaggedFuture<T, F>
where
T: Clone + Unpin,
F: Future + Unpin,
{
type Output = (T, F::Output);

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let output = futures_util::ready!(self.inner.poll_unpin(cx));

Poll::Ready((self.tag.clone(), output))
}
}

#[cfg(test)]
mod tests {
use std::future::{pending, poll_fn, ready};
use std::pin::Pin;
use std::time::Instant;

use super::*;

#[test]
fn cannot_push_more_than_capacity_tasks() {
let mut futures = FuturesMap::new(Duration::from_secs(10), 1);

assert!(futures.try_push("ID_1", ready(())).is_ok());
matches!(
futures.try_push("ID_2", ready(())),
Err(PushError::BeyondCapacity(_))
);
}

#[test]
fn cannot_push_the_same_id_few_times() {
let mut futures = FuturesMap::new(Duration::from_secs(10), 5);

assert!(futures.try_push("ID", ready(())).is_ok());
matches!(
futures.try_push("ID", ready(())),
Err(PushError::ReplacedFuture(_))
);
}

#[tokio::test]
async fn futures_timeout() {
let mut futures = FuturesMap::new(Duration::from_millis(100), 1);

let _ = futures.try_push("ID", pending::<()>());
Delay::new(Duration::from_millis(150)).await;
let (_, result) = poll_fn(|cx| futures.poll_unpin(cx)).await;

assert!(result.is_err())
}

// Each future causes a delay, `Task` only has a capacity of 1, meaning they must be processed in sequence.
// We stop after NUM_FUTURES tasks, meaning the overall execution must at least take DELAY * NUM_FUTURES.
#[tokio::test]
async fn backpressure() {
const DELAY: Duration = Duration::from_millis(100);
const NUM_FUTURES: u32 = 10;

let start = Instant::now();
Task::new(DELAY, NUM_FUTURES, 1).await;
let duration = start.elapsed();

assert!(duration >= DELAY * NUM_FUTURES);
}

struct Task {
future: Duration,
num_futures: usize,
num_processed: usize,
inner: FuturesMap<u8, ()>,
}

impl Task {
fn new(future: Duration, num_futures: u32, capacity: usize) -> Self {
Self {
future,
num_futures: num_futures as usize,
num_processed: 0,
inner: FuturesMap::new(Duration::from_secs(60), capacity),
}
}
}

impl Future for Task {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while this.num_processed < this.num_futures {
if let Poll::Ready((_, result)) = this.inner.poll_unpin(cx) {
if result.is_err() {
panic!("Timeout is great than future delay")
}

this.num_processed += 1;
continue;
}

if let Poll::Ready(()) = this.inner.poll_ready_unpin(cx) {
// We push the constant future's ID to prove that user can use the same ID
// if the future was finished
let maybe_future = this.inner.try_push(1u8, Delay::new(this.future));
assert!(maybe_future.is_ok(), "we polled for readiness");

continue;
}

return Poll::Pending;
}

Poll::Ready(())
}
}
}
Loading