Skip to content

Commit

Permalink
current state of refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
LimpidCrypto committed Aug 19, 2024
1 parent 7769131 commit 955c773
Show file tree
Hide file tree
Showing 22 changed files with 659 additions and 286 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ embedded-websocket = { git = "https://github.com/LimpidCrypto/embedded-websocket
reqwless = "0.12.1"
webpki-roots = { version = "0.26.3", optional = true }
rand_core = "0.6.4"
strum_macros = "0.26.4"


# strum_macros = { version = "0.26.4", default-features = false }
Expand All @@ -62,6 +63,7 @@ rand_core = "0.6.4"
# rand_core = { version = "0.6.4", default-features = false }

[dev-dependencies]
rand = "0.8.5"
tokio = { version = "1.27.0", features = ["full"] }

[features]
Expand Down
50 changes: 50 additions & 0 deletions examples/async_web_socket_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use em_as_net::{
client::websocket::{
AsyncWebSocketClient, ReadResult, WebSocketRead, WebSocketSendMessageType, WebSocketWrite,
},
core::{tcp::TcpStream, tls::TlsStream},
};
use rand::thread_rng;
use url::Url;

#[tokio::main]
async fn main() {
let uri = Url::parse("wss://ws.vi-server.org:443/mirror/").unwrap();
let stream = TcpStream::connect(&uri).await.unwrap();
println!("TCP Connected");
let mut tls_stream = TlsStream::connect(stream, &uri).await.unwrap();
println!("TLS Handshake Done");
let mut buffer = [0u8; 4096];
let rng = thread_rng();
let mut websocket =
AsyncWebSocketClient::open(&mut tls_stream, &mut buffer, &uri, rng, None, None)
.await
.unwrap();
println!("WebSocket Connected");
websocket
.write(
&mut tls_stream,
&mut buffer,
WebSocketSendMessageType::Text,
true,
"Hello World".as_bytes(),
)
.await
.unwrap();
println!("Message Sent");
loop {
let message = websocket
.try_read(&mut tls_stream, &mut buffer)
.await
.unwrap()
.unwrap();
match message {
ReadResult::Text(text) => {
assert_eq!("Hello World".to_string(), text);
println!("Received message: {}", text);
}
_ => panic!("Expected 'Hello World' as text message."),
}
break;
}
}
86 changes: 43 additions & 43 deletions src/client/websocket/async_websocket_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{client::websocket::errors::WebsocketError, Err};
use crate::{client::websocket::errors::WebSocketError, Err};

use alloc::string::ToString;
use anyhow::Result;
Expand All @@ -10,8 +10,8 @@ use core::{
task::Poll,
};
use embedded_websocket::{
framer_async::Framer as EmbeddedWebsocketFramer, Client as EmbeddedWebsocketClient,
WebSocket as EmbeddedWebsocket,
framer_async::Framer as EmbeddedWebSocketFramer, Client as EmbeddedWebSocketClient,
WebSocket as EmbeddedWebSocket,
};
use futures::{Sink, Stream};
use rand_core::RngCore;
Expand All @@ -22,43 +22,43 @@ use tokio::net::TcpStream;
#[cfg(feature = "std")]
use tokio_tungstenite::{
connect_async as tungstenite_connect_async, MaybeTlsStream as TungsteniteMaybeTlsStream,
WebSocketStream as TungsteniteWebsocketStream,
WebSocketStream as TungsteniteWebSocketStream,
};

// Exports
pub use embedded_websocket::{
framer_async::{
FramerError as EmbeddedWebsocketFramerError, ReadResult as EmbeddedWebsocketReadMessageType,
FramerError as EmbeddedWebSocketFramerError, ReadResult as EmbeddedWebSocketReadMessageType,
},
Error as EmbeddedWebsocketError, WebSocketCloseStatusCode as EmbeddedWebsocketCloseStatusCode,
WebSocketOptions as EmbeddedWebsocketOptions,
WebSocketSendMessageType as EmbeddedWebsocketSendMessageType,
WebSocketState as EmbeddedWebsocketState,
Error as EmbeddedWebSocketError, WebSocketCloseStatusCode as EmbeddedWebSocketCloseStatusCode,
WebSocketOptions as EmbeddedWebSocketOptions,
WebSocketSendMessageType as EmbeddedWebSocketSendMessageType,
WebSocketState as EmbeddedWebSocketState,
};

#[cfg(feature = "std")]
pub type AsyncWebsocketClientTungstenite<Status> =
AsyncWebsocketClient<TungsteniteWebsocketStream<TungsteniteMaybeTlsStream<TcpStream>>, Status>;
pub type AsyncWebsocketClientEmbeddedWebsocketTokio<Rng, Status> =
AsyncWebsocketClient<EmbeddedWebsocketFramer<Rng, EmbeddedWebsocketClient>, Status>;
pub type AsyncWebSocketClientTungstenite<Status> =
AsyncWebSocketClient<TungsteniteWebSocketStream<TungsteniteMaybeTlsStream<TcpStream>>, Status>;
pub type AsyncWebSocketClientEmbeddedWebSocketTokio<Rng, Status> =
AsyncWebSocketClient<EmbeddedWebSocketFramer<Rng, EmbeddedWebSocketClient>, Status>;
#[cfg(feature = "std")]
pub use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;

pub struct WebsocketOpen;
pub struct WebsocketClosed;
pub struct WebSocketOpen;
pub struct WebSocketClosed;

pub struct AsyncWebsocketClient<T, Status = WebsocketClosed> {
pub struct AsyncWebSocketClient<T, Status = WebSocketClosed> {
inner: T,
status: PhantomData<Status>,
}

impl<T, Status> AsyncWebsocketClient<T, Status> {
impl<T, Status> AsyncWebSocketClient<T, Status> {
pub fn is_open(&self) -> bool {
core::any::type_name::<Status>() == core::any::type_name::<WebsocketOpen>()
core::any::type_name::<Status>() == core::any::type_name::<WebSocketOpen>()
}
}

impl<T, I> Sink<I> for AsyncWebsocketClient<T, WebsocketOpen>
impl<T, I> Sink<I> for AsyncWebSocketClient<T, WebSocketOpen>
where
T: Sink<I> + Unpin,
<T as Sink<I>>::Error: Display,
Expand Down Expand Up @@ -109,7 +109,7 @@ where
}
}

impl<T> Stream for AsyncWebsocketClient<T, WebsocketOpen>
impl<T> Stream for AsyncWebSocketClient<T, WebSocketOpen>
where
T: Stream + Unpin,
{
Expand All @@ -129,66 +129,66 @@ where

#[cfg(feature = "std")]
impl
AsyncWebsocketClient<
TungsteniteWebsocketStream<TungsteniteMaybeTlsStream<TcpStream>>,
WebsocketClosed,
AsyncWebSocketClient<
TungsteniteWebSocketStream<TungsteniteMaybeTlsStream<TcpStream>>,
WebSocketClosed,
>
{
pub async fn open(
uri: Url,
) -> Result<
AsyncWebsocketClient<
TungsteniteWebsocketStream<TungsteniteMaybeTlsStream<TcpStream>>,
WebsocketOpen,
AsyncWebSocketClient<
TungsteniteWebSocketStream<TungsteniteMaybeTlsStream<TcpStream>>,
WebSocketOpen,
>,
> {
let (websocket_stream, _) = tungstenite_connect_async(uri.to_string()).await.unwrap();

Ok(AsyncWebsocketClient {
Ok(AsyncWebSocketClient {
inner: websocket_stream,
status: PhantomData::<WebsocketOpen>,
status: PhantomData::<WebSocketOpen>,
})
}
}

impl<Rng>
AsyncWebsocketClient<EmbeddedWebsocketFramer<Rng, EmbeddedWebsocketClient>, WebsocketClosed>
AsyncWebSocketClient<EmbeddedWebSocketFramer<Rng, EmbeddedWebSocketClient>, WebSocketClosed>
where
Rng: RngCore,
{
pub async fn open<B, E>(
stream: &mut (impl Stream<Item = Result<B, E>> + for<'a> Sink<&'a [u8], Error = E> + Unpin),
buffer: &mut [u8],
rng: Rng,
websocket_options: &EmbeddedWebsocketOptions<'_>,
websocket_options: &EmbeddedWebSocketOptions<'_>,
) -> Result<
AsyncWebsocketClient<EmbeddedWebsocketFramer<Rng, EmbeddedWebsocketClient>, WebsocketOpen>,
AsyncWebSocketClient<EmbeddedWebSocketFramer<Rng, EmbeddedWebSocketClient>, WebSocketOpen>,
>
where
B: AsRef<[u8]>,
E: Debug,
{
let websocket = EmbeddedWebsocket::<Rng, EmbeddedWebsocketClient>::new_client(rng);
let mut framer = EmbeddedWebsocketFramer::new(websocket);
let websocket = EmbeddedWebSocket::<Rng, EmbeddedWebSocketClient>::new_client(rng);
let mut framer = EmbeddedWebSocketFramer::new(websocket);
framer
.connect(stream, buffer, websocket_options)
.await
.unwrap();

Ok(AsyncWebsocketClient {
Ok(AsyncWebSocketClient {
inner: framer,
status: PhantomData::<WebsocketOpen>,
status: PhantomData::<WebSocketOpen>,
})
}
}

impl<Rng> AsyncWebsocketClient<EmbeddedWebsocketFramer<Rng, EmbeddedWebsocketClient>, WebsocketOpen>
impl<Rng> AsyncWebSocketClient<EmbeddedWebSocketFramer<Rng, EmbeddedWebSocketClient>, WebSocketOpen>
where
Rng: RngCore,
{
pub fn encode<E>(
&mut self,
message_type: EmbeddedWebsocketSendMessageType,
message_type: EmbeddedWebSocketSendMessageType,
end_of_message: bool,
from: &[u8],
to: &mut [u8],
Expand All @@ -208,7 +208,7 @@ where
&mut self,
stream: &mut (impl Sink<&'b [u8], Error = E> + Unpin),
stream_buf: &'b mut [u8],
message_type: EmbeddedWebsocketSendMessageType,
message_type: EmbeddedWebSocketSendMessageType,
end_of_message: bool,
frame_buf: &'b [u8],
) -> Result<()>
Expand All @@ -227,7 +227,7 @@ where
&mut self,
stream: &mut (impl Sink<&'b [u8], Error = E> + Unpin),
stream_buf: &'b mut [u8],
close_status: EmbeddedWebsocketCloseStatusCode,
close_status: EmbeddedWebSocketCloseStatusCode,
status_description: Option<&str>,
) -> Result<()>
where
Expand All @@ -245,13 +245,13 @@ where
&'a mut self,
stream: &mut (impl Stream<Item = Result<B, E>> + Sink<&'a [u8], Error = E> + Unpin),
buffer: &'a mut [u8],
) -> Option<Result<EmbeddedWebsocketReadMessageType<'_>>>
) -> Option<Result<EmbeddedWebSocketReadMessageType<'_>>>
where
E: Debug,
{
match self.inner.read(stream, buffer).await {
Some(Ok(read_result)) => Some(Ok(read_result)),
Some(Err(error)) => Some(Err!(WebsocketError::from(error))),
Some(Err(error)) => Some(Err!(WebSocketError::from(error))),
None => None,
}
}
Expand All @@ -260,13 +260,13 @@ where
&'a mut self,
stream: &mut (impl Stream<Item = Result<B, E>> + Sink<&'a [u8], Error = E> + Unpin),
buffer: &'a mut [u8],
) -> Result<Option<EmbeddedWebsocketReadMessageType<'_>>>
) -> Result<Option<EmbeddedWebSocketReadMessageType<'_>>>
where
E: Debug,
{
match self.inner.read(stream, buffer).await {
Some(Ok(read_result)) => Ok(Some(read_result)),
Some(Err(error)) => Err!(WebsocketError::from(error)),
Some(Err(error)) => Err!(WebSocketError::from(error)),
None => Ok(None),
}
}
Expand Down
30 changes: 16 additions & 14 deletions src/client/websocket/exceptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use embedded_websocket::framer_async::FramerError;
use thiserror_no_std::Error;

#[derive(Debug, PartialEq, Eq, Error)]
pub enum WebsocketError<E: Debug> {
#[error("Stream is not connected.")]
NotConnected,
pub enum WebSocketException<E: Debug> {
#[error("Invalid domain")]
InvalidDomain,
#[error("Invalid scheme")]
InvalidScheme,
// FramerError
#[error("I/O error: {0:?}")]
Io(E),
Expand All @@ -17,33 +19,33 @@ pub enum WebsocketError<E: Debug> {
Utf8(Utf8Error),
#[error("Invalid HTTP header")]
HttpHeader,
#[error("Websocket error: {0:?}")]
#[error("WebSocket error: {0:?}")]
WebSocket(embedded_websocket::Error),
#[error("Disconnected")]
Disconnected,
#[error("Read buffer is too small (size: {0:?})")]
RxBufferTooSmall(usize),
}

impl<E: Debug> From<FramerError<E>> for WebsocketError<E> {
impl<E: Debug> From<FramerError<E>> for WebSocketException<E> {
fn from(e: FramerError<E>) -> Self {
match e {
FramerError::Io(e) => WebsocketError::Io(e),
FramerError::FrameTooLarge(size) => WebsocketError::FrameTooLarge(size),
FramerError::Utf8(e) => WebsocketError::Utf8(e),
FramerError::HttpHeader(_) => WebsocketError::HttpHeader,
FramerError::WebSocket(e) => WebsocketError::WebSocket(e),
FramerError::Disconnected => WebsocketError::Disconnected,
FramerError::RxBufferTooSmall(size) => WebsocketError::RxBufferTooSmall(size),
FramerError::Io(e) => WebSocketException::Io(e),
FramerError::FrameTooLarge(size) => WebSocketException::FrameTooLarge(size),
FramerError::Utf8(e) => WebSocketException::Utf8(e),
FramerError::HttpHeader(_) => WebSocketException::HttpHeader,
FramerError::WebSocket(e) => WebSocketException::WebSocket(e),
FramerError::Disconnected => WebSocketException::Disconnected,
FramerError::RxBufferTooSmall(size) => WebSocketException::RxBufferTooSmall(size),
}
}
}

impl<E: Debug> Into<anyhow::Error> for WebsocketError<E> {
impl<E: Debug> Into<anyhow::Error> for WebSocketException<E> {
fn into(self) -> anyhow::Error {
anyhow!(self)
}
}

#[cfg(feature = "std")]
impl<E: Debug> alloc::error::Error for WebsocketError<E> {}
impl<E: Debug> alloc::error::Error for WebSocketException<E> {}
Loading

0 comments on commit 955c773

Please sign in to comment.