From 05cb440624e520fc79e11a7657c6738438b2e51e Mon Sep 17 00:00:00 2001 From: Ayush Singh Date: Wed, 27 Dec 2023 15:31:23 +0530 Subject: [PATCH] Split websocket state - Allow reading and writing to socket simultaneously, i.e. allow subscribing to new stuff while reading stream Signed-off-by: Ayush Singh --- src/tokio_tungstenite.rs | 47 +++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/src/tokio_tungstenite.rs b/src/tokio_tungstenite.rs index 40cc1f1..8bc563a 100644 --- a/src/tokio_tungstenite.rs +++ b/src/tokio_tungstenite.rs @@ -1,5 +1,6 @@ use crate::websocket::Stream; -use futures_util::SinkExt; +use futures_util::stream::{SplitSink, SplitStream}; +use futures_util::{SinkExt, StreamExt}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio_tungstenite::{ @@ -15,7 +16,14 @@ pub struct BinanceWebSocketClient; impl BinanceWebSocketClient { pub async fn connect_async( url: &str, - ) -> Result<(WebSocketState>, Response), Error> { + ) -> Result< + ( + WebSocketState>, + WebSocketReader>, + Response, + ), + Error, + > { let (socket, response) = connect_async(Url::parse(&url).unwrap()).await?; log::info!("Connected to {}", url); @@ -25,22 +33,33 @@ impl BinanceWebSocketClient { log::debug!("* {}", header); } - Ok((WebSocketState::new(socket), response)) + let (sink, stream) = socket.split(); + + Ok((WebSocketState::new(sink), stream, response)) } - pub async fn connect_async_default( - ) -> Result<(WebSocketState>, Response), Error> { + pub async fn connect_async_default() -> Result< + ( + WebSocketState>, + WebSocketReader>, + Response, + ), + Error, + > { BinanceWebSocketClient::connect_async("wss://stream.binance.com:9443/stream").await } } +pub type WebSocketWriter = SplitSink, tokio_tungstenite::tungstenite::Message>; +pub type WebSocketReader = SplitStream>; + pub struct WebSocketState { - socket: WebSocketStream, + socket: WebSocketWriter, id: u64, } impl WebSocketState { - pub fn new(socket: WebSocketStream) -> Self { + pub fn new(socket: WebSocketWriter) -> Self { Self { socket, id: 0 } } @@ -128,18 +147,6 @@ impl WebSocketState { } pub async fn close(mut self) -> Result<(), Error> { - self.socket.close(None).await - } -} - -impl From> for WebSocketStream { - fn from(conn: WebSocketState) -> WebSocketStream { - conn.socket - } -} - -impl AsMut> for WebSocketState { - fn as_mut(&mut self) -> &mut WebSocketStream { - &mut self.socket + self.socket.close().await } }