Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coro_http_client]fix timeout #658

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 82 additions & 76 deletions include/ylt/standalone/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,13 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
if (!ok) {
co_return resp_data{std::make_error_code(std::errc::protocol_error), 404};
}

auto future = start_timer(conn_timeout_duration_, "connect timer");

data = co_await connect(u);
if (auto ec = co_await wait_future(std::move(future)); ec) {
co_return resp_data{ec, 404};
{
auto time_out_guard =
timer_guard(this, conn_timeout_duration_, "connect timer");
data = co_await connect(u);
}
if (socket_->is_timeout_) {
co_return resp_data{std::make_error_code(std::errc::timed_out), 404};
}
if (!data.net_err) {
data.status = 200;
Expand Down Expand Up @@ -623,38 +624,27 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

void set_max_single_part_size(size_t size) { max_single_part_size_ = size; }

async_simple::Future<async_simple::Unit> start_timer(
std::chrono::steady_clock::duration duration, std::string msg) {
is_timeout_ = false;

async_simple::Promise<async_simple::Unit> promise;
auto fut = promise.getFuture();
struct timer_guard {
timer_guard(coro_http_client *self,
std::chrono::steady_clock::duration duration, std::string msg)
: self(self) {
self->socket_->is_timeout_ = false;

if (enable_timeout_) {
timeout(timer_, std::move(promise), duration, std::move(msg))
.via(&executor_wrapper_)
.detach();
}
else {
promise.setValue(async_simple::Unit{});
}
return fut;
}

async_simple::coro::Lazy<std::error_code> wait_future(
async_simple::Future<async_simple::Unit> &&future) {
if (!enable_timeout_) {
co_return std::error_code{};
if (self->enable_timeout_) {
self->timeout(self->timer_, duration, std::move(msg))
.start([](auto &&) {
});
}
return;
}
std::error_code err_code;
timer_.cancel(err_code);
co_await std::move(future);
if (is_timeout_) {
co_return std::make_error_code(std::errc::timed_out);
~timer_guard() {
if (self->enable_timeout_ && self->socket_->is_timeout_ == false) {
std::error_code ignore_ec;
self->timer_.cancel(ignore_ec);
}
}

co_return std::error_code{};
}
coro_http_client *self;
};

async_simple::coro::Lazy<resp_data> async_upload_multipart(std::string uri) {
std::shared_ptr<int> guard(nullptr, [this](auto) {
Expand Down Expand Up @@ -684,18 +674,21 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
size_t size = 0;

if (socket_->has_closed_) {
auto future = start_timer(conn_timeout_duration_, "connect timer");

data = co_await connect(u);
if (ec = co_await wait_future(std::move(future)); ec) {
co_return resp_data{ec, 404};
{
auto time_out_guard =
timer_guard(this, conn_timeout_duration_, "connect timer");
data = co_await connect(u);
}
if (socket_->is_timeout_) {
co_return resp_data{std::make_error_code(std::errc::timed_out), 404};
}
if (data.net_err) {
co_return data;
}
}

auto future = start_timer(req_timeout_duration_, "upload timer");
auto time_out_guard =
timer_guard(this, conn_timeout_duration_, "request timer");
std::tie(ec, size) = co_await async_write(asio::buffer(header_str));
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
if (inject_write_failed == ClientInjectAction::write_failed) {
Expand All @@ -714,7 +707,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
data = co_await send_single_part(key, part);

if (data.net_err) {
if (data.net_err == asio::error::operation_aborted) {
if (socket_->is_timeout_) {
data.net_err = std::make_error_code(std::errc::timed_out);
}
co_return data;
Expand All @@ -725,16 +718,18 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
last_part.append("--").append(BOUNDARY).append("--").append(CRCF);
if (std::tie(ec, size) = co_await async_write(asio::buffer(last_part));
ec) {
if (socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}
co_return resp_data{ec, 404};
}

bool is_keep_alive = true;
data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx),
http_method::POST);
if (auto errc = co_await wait_future(std::move(future)); errc) {
ec = errc;
if (socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}

handle_result(data, ec, is_keep_alive);
co_return data;
}
Expand Down Expand Up @@ -880,20 +875,25 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
size_t size = 0;

if (socket_->has_closed_) {
auto future = start_timer(conn_timeout_duration_, "connect timer");

data = co_await connect(u);
if (ec = co_await wait_future(std::move(future)); ec) {
co_return resp_data{ec, 404};
{
auto guard = timer_guard(this, conn_timeout_duration_, "connect timer");
data = co_await connect(u);
}
if (socket_->is_timeout_) {
co_return resp_data{std::make_error_code(std::errc::timed_out), 404};
}
if (data.net_err) {
co_return data;
}
}

auto future = start_timer(req_timeout_duration_, "upload timer");
auto time_guard =
timer_guard(this, conn_timeout_duration_, "request timer");
std::tie(ec, size) = co_await async_write(asio::buffer(header_str));
if (ec) {
if (socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}
co_return resp_data{ec, 404};
}

Expand Down Expand Up @@ -945,19 +945,19 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
}
}

if (ec && ec == asio::error::operation_aborted) {
ec = std::make_error_code(std::errc::timed_out);
if (ec) {
if (socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}
co_return resp_data{ec, 404};
}

bool is_keep_alive = true;
data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx),
http_method::POST);
if (auto errc = co_await wait_future(std::move(future)); errc) {
ec = errc;
if (ec && socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}

handle_result(data, ec, is_keep_alive);
co_return data;
}
Expand Down Expand Up @@ -1020,15 +1020,20 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
u.path = uri;
}
if (socket_->has_closed_) {
auto conn_future = start_timer(conn_timeout_duration_, "connect timer");
host_ = proxy_host_.empty() ? u.get_host() : proxy_host_;
port_ = proxy_port_.empty() ? u.get_port() : proxy_port_;
auto guard = timer_guard(this, conn_timeout_duration_, "connect timer");
if (ec = co_await coro_io::async_connect(&executor_wrapper_,
socket_->impl_, host_, port_);
ec) {
break;
}

if (socket_->is_timeout_) {
data.net_err = std::make_error_code(std::errc::timed_out);
co_return data;
}

if (enable_tcp_no_delay_) {
socket_->impl_.set_option(asio::ip::tcp::no_delay(true), ec);
if (ec) {
Expand Down Expand Up @@ -1059,9 +1064,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
}
socket_->has_closed_ = false;
if (ec = co_await wait_future(std::move(conn_future)); ec) {
break;
}
}

std::vector<asio::const_buffer> vec;
Expand All @@ -1080,7 +1082,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
#ifdef CORO_HTTP_PRINT_REQ_HEAD
CINATRA_LOG_DEBUG << req_head_str;
#endif
auto future = start_timer(req_timeout_duration_, "request timer");
auto guard = timer_guard(this, req_timeout_duration_, "request timer");
if (has_body) {
std::tie(ec, size) = co_await async_write(vec);
}
Expand All @@ -1090,14 +1092,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
if (ec) {
break;
}

data =
co_await handle_read(ec, size, is_keep_alive, std::move(ctx), method);
if (auto errc = co_await wait_future(std::move(future)); errc) {
ec = errc;
}
} while (0);

if (ec && socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}
handle_result(data, ec, is_keep_alive);
co_return data;
}
Expand Down Expand Up @@ -1179,6 +1179,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
struct socket_t {
asio::ip::tcp::socket impl_;
std::atomic<bool> has_closed_ = true;
bool is_timeout_ = false;
asio::streambuf head_buf_;
asio::streambuf chunked_buf_;
#ifdef CINATRA_ENABLE_SSL
Expand Down Expand Up @@ -1665,6 +1666,11 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return resp_data{ec, 404};
}

if (socket_->is_timeout_) {
auto ec = std::make_error_code(std::errc::timed_out);
co_return resp_data{ec, 404};
}

if (enable_tcp_no_delay_) {
std::error_code ec;
socket_->impl_.set_option(asio::ip::tcp::no_delay(true), ec);
Expand Down Expand Up @@ -1960,17 +1966,19 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}

async_simple::coro::Lazy<bool> timeout(
auto &timer, auto promise, std::chrono::steady_clock::duration duration,
auto &timer, std::chrono::steady_clock::duration duration,
std::string msg) {
auto watcher = std::weak_ptr(socket_);
timer.expires_after(duration);
is_timeout_ = co_await timer.async_await();
if (!is_timeout_) {
promise.setValue(async_simple::Unit());
auto is_timeout = co_await timer.async_await();
if (!is_timeout) {
co_return false;
}
CINATRA_LOG_WARNING << msg << " timeout";
close_socket(*socket_);
promise.setValue(async_simple::Unit());
if (auto socket = watcher.lock(); socket) {
socket_->is_timeout_ = true;
CINATRA_LOG_WARNING << msg << " timeout";
close_socket(*socket_);
}
co_return true;
}

Expand Down Expand Up @@ -2025,8 +2033,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
#endif
std::string redirect_uri_;
bool enable_follow_redirect_ = false;

bool is_timeout_ = false;
bool enable_timeout_ = false;
std::chrono::steady_clock::duration conn_timeout_duration_ =
std::chrono::seconds(8);
Expand Down
Loading