Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/muxing: Use total number of alive inbound streams for backpressure #2878

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
10861b9
Add tests
thomaseizinger Sep 8, 2022
dbd0e24
Track number of active inbound and outbound streams
thomaseizinger Sep 8, 2022
21ca446
Update changelog with PR number
thomaseizinger Sep 8, 2022
17f8139
Merge branch 'master' into track-alive-substreams
thomaseizinger Sep 22, 2022
f3e17b1
Make ctor module private
thomaseizinger Sep 22, 2022
63f16d1
Use total number of inbound streams for back-pressure
thomaseizinger Sep 22, 2022
9fba25f
Add changelog entry to swarm
thomaseizinger Sep 22, 2022
40ab076
Introduce `ConnectionHandler::max_inbound_streams`
thomaseizinger Sep 30, 2022
b679d8c
Merge branch 'master' into track-alive-substreams
thomaseizinger Sep 30, 2022
a008719
Don't override `max_inbound_streams` for `DummyConnectionHandler`
thomaseizinger Sep 30, 2022
9cd4083
Remove outdated rustdoc links
thomaseizinger Sep 30, 2022
e2b4969
protocols/{autonat,dcutr}: Fixing filename collision in examples (#2959)
plauche Sep 30, 2022
7e02538
Merge branch 'master' into track-alive-substreams
thomaseizinger Oct 6, 2022
311e674
Fix typo in log
thomaseizinger Oct 6, 2022
cbb6dd5
Use `max_inbound_streams` for kademlia handler
thomaseizinger Oct 6, 2022
534c3f6
Use `max_inbound_streams` in GossipSub handler
thomaseizinger Oct 6, 2022
a6bd2e4
Use `max_inbound_streams` for ping handler
thomaseizinger Oct 6, 2022
f921261
Merge branch 'master' into track-alive-substreams
thomaseizinger Oct 11, 2022
088301b
Merge branch 'master' into track-alive-substreams
thomaseizinger Oct 14, 2022
5531d96
Remove use of deprecated functions
thomaseizinger Oct 14, 2022
f3d36eb
Rewrite changelog section
thomaseizinger Oct 14, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 0.36.1

- Add `StreamMuxerBox::active_inbound_streams` and `StreamMuxerBox::active_outbound_streams`. See [PR 2878].

[PR 2878]: https://github.com/libp2p/rust-libp2p/pull/2878

# 0.36.0

- Make RSA keypair support optional. To enable RSA support, `rsa` feature should be enabled.
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-core"
edition = "2021"
rust-version = "1.56.1"
description = "Core traits and structs of libp2p"
version = "0.36.0"
version = "0.36.1"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
273 changes: 225 additions & 48 deletions core/src/muxing/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,27 @@ use std::fmt;
use std::io;
use std::io::{IoSlice, IoSliceMut};
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll};

/// Abstract `StreamMuxer`.
pub struct StreamMuxerBox {
inner: Pin<Box<dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send>>,
inner: Pin<
Box<
dyn private::InstrumentedStreamMuxer<Substream = SubstreamBox, Error = io::Error>
+ Send,
>,
>,
}

/// Abstract type for asynchronous reading and writing.
///
/// A [`SubstreamBox`] erases the concrete type it is given and only retains its `AsyncRead`
/// and `AsyncWrite` capabilities.
pub struct SubstreamBox(Pin<Box<dyn AsyncReadWrite + Send>>);
pub struct SubstreamBox {
inner: Pin<Box<dyn AsyncReadWrite + Send>>,
_counter: Weak<()>,
}

#[pin_project]
struct Wrap<T>
Expand All @@ -26,49 +35,79 @@ where
{
#[pin]
inner: T,
inbound_counter: Arc<()>,
outbound_counter: Arc<()>,
}

impl<T> StreamMuxer for Wrap<T>
where
T: StreamMuxer,
T::Substream: Send + 'static,
T::Error: Send + Sync + 'static,
{
type Substream = SubstreamBox;
type Error = io::Error;
mod private {
use super::*;

fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.project()
.inner
.poll_inbound(cx)
.map_ok(SubstreamBox::new)
.map_err(into_io_error)
pub trait InstrumentedStreamMuxer: StreamMuxer {
fn active_inbound_streams(&self) -> usize;
fn active_outbound_streams(&self) -> usize;
}

fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.project()
.inner
.poll_outbound(cx)
.map_ok(SubstreamBox::new)
.map_err(into_io_error)
}
impl<T> StreamMuxer for Wrap<T>
where
T: StreamMuxer,
T::Substream: Send + 'static,
T::Error: Send + Sync + 'static,
{
type Substream = SubstreamBox;
type Error = io::Error;

#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(into_io_error)
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let counter = Arc::downgrade(&self.inbound_counter);

self.project()
.inner
.poll_inbound(cx)
.map_ok(|stream| SubstreamBox::new(stream, counter))
.map_err(into_io_error)
}

fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let counter = Arc::downgrade(&self.outbound_counter);

self.project()
.inner
.poll_outbound(cx)
.map_ok(|stream| SubstreamBox::new(stream, counter))
.map_err(into_io_error)
}

#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(into_io_error)
}

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
self.project().inner.poll(cx).map_err(into_io_error)
}
}

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
self.project().inner.poll(cx).map_err(into_io_error)
impl<T> InstrumentedStreamMuxer for Wrap<T>
where
T: StreamMuxer,
T::Substream: Send + 'static,
T::Error: Send + Sync + 'static,
{
fn active_inbound_streams(&self) -> usize {
Arc::weak_count(&self.inbound_counter)
}

fn active_outbound_streams(&self) -> usize {
Arc::weak_count(&self.outbound_counter)
}
}
}

Expand All @@ -87,16 +126,35 @@ impl StreamMuxerBox {
T::Substream: Send + 'static,
T::Error: Send + Sync + 'static,
{
let wrap = Wrap { inner: muxer };
let wrap = Wrap {
inner: muxer,
inbound_counter: Arc::new(()),
outbound_counter: Arc::new(()),
};

StreamMuxerBox {
inner: Box::pin(wrap),
}
}

/// Returns the number of active inbound streams i.e. streams that
/// have been returned from [`StreamMuxer::poll_inbound`] and have not been dropped since.
pub fn active_inbound_streams(&self) -> usize {
self.inner.active_inbound_streams()
}

/// Returns the number of active outbound streams i.e. streams that
/// have been returned from [`StreamMuxer::poll_outbound`] and have not been dropped since.
pub fn active_outbound_streams(&self) -> usize {
self.inner.active_outbound_streams()
}

fn project(
self: Pin<&mut Self>,
) -> Pin<&mut (dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send)> {
) -> Pin<
&mut (dyn private::InstrumentedStreamMuxer<Substream = SubstreamBox, Error = io::Error>
+ Send),
> {
self.get_mut().inner.as_mut()
}
}
Expand Down Expand Up @@ -134,14 +192,19 @@ impl StreamMuxer for StreamMuxerBox {

impl SubstreamBox {
/// Construct a new [`SubstreamBox`] from something that implements [`AsyncRead`] and [`AsyncWrite`].
pub fn new<S: AsyncRead + AsyncWrite + Send + 'static>(stream: S) -> Self {
Self(Box::pin(stream))
///
/// TOOD: Remove this constructor.
pub fn new<S: AsyncRead + AsyncWrite + Send + 'static>(stream: S, counter: Weak<()>) -> Self {
Self {
inner: Box::pin(stream),
_counter: counter,
}
}
}

impl fmt::Debug for SubstreamBox {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SubstreamBox({})", self.0.type_name())
write!(f, "SubstreamBox({})", self.inner.type_name())
}
}

Expand All @@ -168,15 +231,15 @@ impl AsyncRead for SubstreamBox {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
self.0.as_mut().poll_read(cx, buf)
self.inner.as_mut().poll_read(cx, buf)
}

fn poll_read_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<std::io::Result<usize>> {
self.0.as_mut().poll_read_vectored(cx, bufs)
self.inner.as_mut().poll_read_vectored(cx, bufs)
}
}

Expand All @@ -186,22 +249,136 @@ impl AsyncWrite for SubstreamBox {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
self.0.as_mut().poll_write(cx, buf)
self.inner.as_mut().poll_write(cx, buf)
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<std::io::Result<usize>> {
self.0.as_mut().poll_write_vectored(cx, bufs)
self.inner.as_mut().poll_write_vectored(cx, bufs)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.0.as_mut().poll_flush(cx)
self.inner.as_mut().poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.0.as_mut().poll_close(cx)
self.inner.as_mut().poll_close(cx)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::muxing::StreamMuxerExt;

#[async_std::test]
async fn stream_muxer_box_tracks_alive_inbound_streams() {
let mut muxer = StreamMuxerBox::new(DummyStreamMuxer);

let _stream1 = muxer.next_inbound().await.unwrap();
let _stream2 = muxer.next_inbound().await.unwrap();

assert_eq!(muxer.active_inbound_streams(), 2);

drop(_stream1);

assert_eq!(muxer.active_inbound_streams(), 1);
}

#[async_std::test]
async fn stream_muxer_box_tracks_alive_outbound_streams() {
let mut muxer = StreamMuxerBox::new(DummyStreamMuxer);

let _stream1 = muxer.next_outbound().await.unwrap();
let _stream2 = muxer.next_outbound().await.unwrap();

assert_eq!(muxer.active_outbound_streams(), 2);

drop(_stream1);

assert_eq!(muxer.active_outbound_streams(), 1);
}

#[test]
fn stream_muxer_box_starts_with_zero_active_inbound_streams() {
let muxer = StreamMuxerBox::new(DummyStreamMuxer);

let num_active_inbound_streams = muxer.active_inbound_streams();

assert_eq!(num_active_inbound_streams, 0);
}

#[test]
fn stream_muxer_box_starts_with_zero_active_outbound_streams() {
let muxer = StreamMuxerBox::new(DummyStreamMuxer);

let num_active_outbound_streams = muxer.active_outbound_streams();

assert_eq!(num_active_outbound_streams, 0);
}

struct DummyStreamMuxer;

impl StreamMuxer for DummyStreamMuxer {
type Substream = PendingSubstream;
type Error = void::Void;

fn poll_inbound(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
Poll::Ready(Ok(PendingSubstream))
}

fn poll_outbound(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
Poll::Ready(Ok(PendingSubstream))
}

fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Pending
}

fn poll(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}
}

struct PendingSubstream;

impl AsyncRead for PendingSubstream {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Poll::Pending
}
}

impl AsyncWrite for PendingSubstream {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &[u8],
) -> Poll<io::Result<usize>> {
Poll::Pending
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Pending
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Pending
}
}
}
Loading