Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype message patterns: RecvSend, SendRecv and Send #5

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
Draft
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ futures-sink = "0.3"
futures-util = { version = "0.3", features = ["io"] }
memchr = "2"
pin-project-lite = "0.2"
log = "0.4.17"

[dev-dependencies]
futures = "0.3"
tokio = { version = "1.25.0", features = ["full"] }
futures_ringbuf = "0.3.1"

[dependencies.serde]
version = "1"
Expand Down
11 changes: 11 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,16 @@ pub use framed_read::{FramedRead, FramedReadParts};

mod framed_write;
pub use framed_write::{FramedWrite, FramedWriteParts};
pub use message_patterns::CloseStream;
pub use message_patterns::ReturnStream;

mod fuse;

mod message_patterns;
mod recvsend;
mod send;
mod sendrecv;

pub use recvsend::{Event, RecvSend, Responder};
pub use send::Send;
pub use sendrecv::SendRecv;
70 changes: 70 additions & 0 deletions src/message_patterns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use crate::{recvsend, Encoder, Framed};
use futures_util::{AsyncRead, AsyncWrite, SinkExt};
use std::task::{Context, Poll};
use std::{fmt, io};

/// Marker type for a [`SendingResponse`] future that will close the stream after the message has been sent.
pub enum CloseStream {}

/// Marker type for a [`SendingResponse`] future that will return the stream back to the user after the message has been sent.
///
/// This may be useful if multiple request-response exchanges should happen on the same stream.
pub enum ReturnStream {}

#[derive(Debug)]
pub enum Error<Enc> {
Recv(Enc),
Send(Enc),
}

impl<Enc> fmt::Display for Error<Enc> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::Recv(_) => write!(f, "failed to recv on stream"),
Error::Send(_) => write!(f, "failed to send on stream"),
}
}
}

impl<Enc> std::error::Error for Error<Enc>
where
Enc: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::Recv(inner) => Some(inner),
Error::Send(inner) => Some(inner),
}
}
}

pub struct Sending<S, C>
where
C: Encoder,
{
pub(crate) framed: Framed<S, C>,
pub(crate) message: Option<C::Item>,
}

impl<S, C, E> Sending<S, C>
where
S: AsyncRead + AsyncWrite + Unpin,
C: Encoder<Error = E>,
E: std::error::Error + Send + Sync + 'static,
{
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
futures_util::ready!(self.framed.poll_ready_unpin(cx).map_err(Error::Send))
.map_err(recvsend::into_io_error)?;

self.framed
.start_send_unpin(
self.message
.take()
.expect("to not be polled after completion"),
)
.map_err(Error::Send)
.map_err(recvsend::into_io_error)?;

Poll::Ready(Ok(()))
}
}
Loading