Skip to content

Commit

Permalink
Split websocket state
Browse files Browse the repository at this point in the history
- Allow reading and writing to socket simultaneously, i.e. allow
  subscribing to new stuff while reading stream

Signed-off-by: Ayush Singh <[email protected]>
  • Loading branch information
Ayush1325 committed Dec 28, 2023
1 parent 5bf8b72 commit 05cb440
Showing 1 changed file with 27 additions and 20 deletions.
47 changes: 27 additions & 20 deletions src/tokio_tungstenite.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::websocket::Stream;
use futures_util::SinkExt;
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_tungstenite::{
Expand All @@ -15,7 +16,14 @@ pub struct BinanceWebSocketClient;
impl BinanceWebSocketClient {
pub async fn connect_async(
url: &str,
) -> Result<(WebSocketState<MaybeTlsStream<TcpStream>>, Response), Error> {
) -> Result<
(
WebSocketState<MaybeTlsStream<TcpStream>>,
WebSocketReader<MaybeTlsStream<TcpStream>>,
Response,
),
Error,
> {
let (socket, response) = connect_async(Url::parse(&url).unwrap()).await?;

log::info!("Connected to {}", url);
Expand All @@ -25,22 +33,33 @@ impl BinanceWebSocketClient {
log::debug!("* {}", header);
}

Ok((WebSocketState::new(socket), response))
let (sink, stream) = socket.split();

Ok((WebSocketState::new(sink), stream, response))
}

pub async fn connect_async_default(
) -> Result<(WebSocketState<MaybeTlsStream<TcpStream>>, Response), Error> {
pub async fn connect_async_default() -> Result<
(
WebSocketState<MaybeTlsStream<TcpStream>>,
WebSocketReader<MaybeTlsStream<TcpStream>>,
Response,
),
Error,
> {
BinanceWebSocketClient::connect_async("wss://stream.binance.com:9443/stream").await
}
}

pub type WebSocketWriter<T> = SplitSink<WebSocketStream<T>, tokio_tungstenite::tungstenite::Message>;
pub type WebSocketReader<T> = SplitStream<WebSocketStream<T>>;

pub struct WebSocketState<T> {
socket: WebSocketStream<T>,
socket: WebSocketWriter<T>,
id: u64,
}

impl<T: AsyncRead + AsyncWrite + Unpin> WebSocketState<T> {
pub fn new(socket: WebSocketStream<T>) -> Self {
pub fn new(socket: WebSocketWriter<T>) -> Self {
Self { socket, id: 0 }
}

Expand Down Expand Up @@ -128,18 +147,6 @@ impl<T: AsyncRead + AsyncWrite + Unpin> WebSocketState<T> {
}

pub async fn close(mut self) -> Result<(), Error> {
self.socket.close(None).await
}
}

impl<T> From<WebSocketState<T>> for WebSocketStream<T> {
fn from(conn: WebSocketState<T>) -> WebSocketStream<T> {
conn.socket
}
}

impl<T> AsMut<WebSocketStream<T>> for WebSocketState<T> {
fn as_mut(&mut self) -> &mut WebSocketStream<T> {
&mut self.socket
self.socket.close().await
}
}

0 comments on commit 05cb440

Please sign in to comment.