Skip to content

Commit

Permalink
chore(volo-http): introduce hyper_util::server::auto
Browse files Browse the repository at this point in the history
Since we are preparing to support http2, `hyper_util::server::auto` is
necessary.

Signed-off-by: Yu Li <[email protected]>
  • Loading branch information
yukiiiteru committed Nov 27, 2024
1 parent c4bdad9 commit 66ed99d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 23 deletions.
6 changes: 4 additions & 2 deletions volo-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ full = [
"cookie", "multipart", "ws", # exts
]

client = ["hyper/client", "hyper/http1"] # client core
server = ["hyper/server", "hyper/http1", "dep:matchit"] # server core
http1 = ["hyper/http1", "hyper-util/http1"]

client = ["http1", "hyper/client"] # client core
server = ["http1", "hyper-util/server", "dep:matchit"] # server core

__serde = ["dep:serde"] # a private feature for enabling `serde` by `serde_xxx`
query = ["__serde", "dep:serde_urlencoded"]
Expand Down
47 changes: 26 additions & 21 deletions volo-http/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::{
cell::RefCell,
convert::Infallible,
error::Error,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Expand All @@ -13,8 +14,10 @@ use std::{
};

use futures::future::BoxFuture;
use hyper::server::conn::http1;
use hyper_util::rt::TokioIo;
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto,
};
use metainfo::{MetaInfo, METAINFO};
use motore::{
layer::{Identity, Layer, Stack},
Expand Down Expand Up @@ -91,7 +94,7 @@ pub mod prelude {
pub struct Server<S, L> {
service: S,
layer: L,
server: http1::Builder,
server: auto::Builder<TokioExecutor>,
config: Config,
shutdown_hooks: Vec<Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>>,
#[cfg(feature = "__tls")]
Expand All @@ -104,7 +107,7 @@ impl<S> Server<S, Identity> {
Self {
service,
layer: Identity::new(),
server: http1::Builder::new(),
server: auto::Builder::new(TokioExecutor::new()),
config: Config::default(),
shutdown_hooks: Vec::new(),
#[cfg(feature = "__tls")]
Expand Down Expand Up @@ -204,15 +207,15 @@ impl<S, L> Server<S, L> {
///
/// Default is `false`.
pub fn set_half_close(&mut self, half_close: bool) -> &mut Self {
self.server.half_close(half_close);
self.server.http1().half_close(half_close);
self
}

/// Enables or disables HTTP/1 keep-alive.
///
/// Default is true.
pub fn set_keep_alive(&mut self, keep_alive: bool) -> &mut Self {
self.server.keep_alive(keep_alive);
self.server.http1().keep_alive(keep_alive);
self
}

Expand All @@ -221,7 +224,7 @@ impl<S, L> Server<S, L> {
///
/// Default is false.
pub fn set_title_case_headers(&mut self, title_case_headers: bool) -> &mut Self {
self.server.title_case_headers(title_case_headers);
self.server.http1().title_case_headers(title_case_headers);
self
}

Expand All @@ -237,7 +240,9 @@ impl<S, L> Server<S, L> {
///
/// Default is false.
pub fn set_preserve_header_case(&mut self, preserve_header_case: bool) -> &mut Self {
self.server.preserve_header_case(preserve_header_case);
self.server
.http1()
.preserve_header_case(preserve_header_case);
self
}

Expand All @@ -255,7 +260,7 @@ impl<S, L> Server<S, L> {
///
/// Default is 100.
pub fn set_max_headers(&mut self, max_headers: usize) -> &mut Self {
self.server.max_headers(max_headers);
self.server.http1().max_headers(max_headers);
self
}

Expand Down Expand Up @@ -357,7 +362,7 @@ impl<S, L> Server<S, L> {

#[allow(clippy::too_many_arguments)]
async fn serve<I, S, E>(
server: Arc<http1::Builder>,
server: Arc<auto::Builder<TokioExecutor>>,
mut incoming: I,
service: S,
config: Config,
Expand All @@ -367,7 +372,7 @@ async fn serve<I, S, E>(
#[cfg(feature = "__tls")] tls_config: Option<ServerTlsConfig>,
) where
I: Incoming,
S: Service<ServerContext, Request, Error = E> + Clone + Send + Sync + 'static,
S: Service<ServerContext, Request, Error = E> + Clone + Unpin + Send + Sync + 'static,
S::Response: IntoResponse,
E: IntoResponse,
{
Expand Down Expand Up @@ -426,13 +431,16 @@ async fn serve<I, S, E>(
}

async fn serve_conn<S>(
server: Arc<http1::Builder>,
server: Arc<auto::Builder<TokioExecutor>>,
conn: Conn,
service: S,
conn_cnt: Arc<AtomicUsize>,
exit_notify: Arc<Notify>,
) where
S: hyper::service::HttpService<hyper::body::Incoming, ResBody = Body>,
// S: hyper::service::HttpService<hyper::body::Incoming, ResBody = Body>,
S: hyper::service::Service<HyperRequest, Response = Response> + Unpin,
S::Future: Send + 'static,
S::Error: Error + Send + Sync + 'static,
{
conn_cnt.fetch_add(1, Ordering::Relaxed);
defer! {
Expand All @@ -442,24 +450,21 @@ async fn serve_conn<S>(
let notified = exit_notify.notified();
tokio::pin!(notified);

let mut http_conn = server
.serve_connection(TokioIo::new(conn), service)
.with_upgrades();
let http_conn = server.serve_connection_with_upgrades(TokioIo::new(conn), service);
futures::pin_mut!(http_conn);

tokio::select! {
_ = &mut notified => {
tracing::trace!("[Volo-HTTP] closing a pending connection");
// Graceful shutdown.
hyper::server::conn::http1::UpgradeableConnection::graceful_shutdown(
Pin::new(&mut http_conn)
);
http_conn.as_mut().graceful_shutdown();
// Continue to poll this connection until shutdown can finish.
let result = http_conn.await;
let result = http_conn.as_mut().await;
if let Err(err) = result {
tracing::debug!("[Volo-HTTP] connection error: {:?}", err);
}
}
result = &mut http_conn => {
result = http_conn.as_mut() => {
if let Err(err) = result {
tracing::debug!("[Volo-HTTP] connection error: {:?}", err);
}
Expand Down

0 comments on commit 66ed99d

Please sign in to comment.