Skip to content

Commit

Permalink
Merge branch 'main' into cov
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Nov 26, 2024
2 parents 87542b6 + e949cf7 commit 1d987d9
Show file tree
Hide file tree
Showing 15 changed files with 1,542 additions and 329 deletions.
1 change: 1 addition & 0 deletions include/ylt/reflection/member_count.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

#pragma once
#include <cstdint>
#include <optional>
#include <tuple>
#include <type_traits>
Expand Down
34 changes: 34 additions & 0 deletions include/ylt/standalone/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,40 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
#endif
}

#ifdef INJECT_FOR_HTTP_CLIENT_TEST
async_simple::coro::Lazy<std::error_code> async_write_raw(
std::string_view data) {
auto [ec, _] = co_await async_write(asio::buffer(data));
co_return ec;
}

async_simple::coro::Lazy<resp_data> async_read_raw(
http_method method, bool clear_buffer = false) {
if (clear_buffer) {
body_.clear();
}

char buf[1024];
std::error_code ec{};
size_t size{};
#ifdef CINATRA_ENABLE_SSL
if (has_init_ssl_) {
std::tie(ec, size) = co_await coro_io::async_read_some(
*socket_->ssl_stream_, asio::buffer(buf, 1024));
}
else {
#endif
std::tie(ec, size) = co_await coro_io::async_read_some(
socket_->impl_, asio::buffer(buf, 1024));
#ifdef CINATRA_ENABLE_SSL
}
#endif
body_.append(buf, size);

co_return resp_data{ec, {}, {}, body_};
}
#endif

inline void set_proxy(const std::string &host, const std::string &port) {
proxy_host_ = host;
proxy_port_ = port;
Expand Down
148 changes: 65 additions & 83 deletions include/ylt/standalone/cinatra/coro_http_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,9 @@
#include "sha1.hpp"
#include "string_resize.hpp"
#include "websocket.hpp"
#include "ylt/metric/counter.hpp"
#include "ylt/metric/gauge.hpp"
#include "ylt/metric/histogram.hpp"
#include "ylt/metric/metric.hpp"
#ifdef CINATRA_ENABLE_GZIP
#include "gzip.hpp"
#endif
#include "metric_conf.hpp"
#include "ylt/coro_io/coro_file.hpp"
#include "ylt/coro_io/coro_io.hpp"

Expand All @@ -52,14 +47,9 @@ class coro_http_connection
request_(parser_, this),
response_(this) {
buffers_.reserve(3);

cinatra_metric_conf::server_total_fd_inc();
}

~coro_http_connection() {
cinatra_metric_conf::server_total_fd_dec();
close();
}
~coro_http_connection() { close(); }

#ifdef CINATRA_ENABLE_SSL
bool init_ssl(const std::string &cert_file, const std::string &key_file,
Expand Down Expand Up @@ -126,21 +116,17 @@ class coro_http_connection
CINATRA_LOG_WARNING << "read http header error: " << ec.message();
}

cinatra_metric_conf::server_failed_req_inc();
close();
break;
}

if (cinatra_metric_conf::enable_metric) {
start = std::chrono::system_clock::now();
cinatra_metric_conf::server_total_req_inc();
}

const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
int head_len = parser_.parse_request(data_ptr, size, 0);
if (head_len <= 0) {
cinatra_metric_conf::server_failed_req_inc();
CINATRA_LOG_ERROR << "parse http header error";
response_.set_status_and_content(status_type::bad_request,
"invalid http protocol");
co_await reply();
close();
break;
}
Expand All @@ -153,9 +139,6 @@ class coro_http_connection
if (type != content_type::chunked && type != content_type::multipart) {
size_t body_len = parser_.body_len();
if (body_len == 0) {
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_recv_bytes_inc(head_len);
}
if (parser_.method() == "GET"sv) {
if (request_.is_upgrade()) {
#ifdef CINATRA_ENABLE_GZIP
Expand All @@ -175,16 +158,6 @@ class coro_http_connection
}
response_.set_delay(true);
}
else {
if (cinatra_metric_conf::enable_metric) {
mid = std::chrono::system_clock::now();
double count =
std::chrono::duration_cast<std::chrono::microseconds>(mid -
start)
.count();
cinatra_metric_conf::server_read_latency_observe(count);
}
}
}
}
else if (body_len <= head_buf_.size()) {
Expand All @@ -194,7 +167,6 @@ class coro_http_connection
memcpy(body_.data(), data_ptr, body_len);
head_buf_.consume(head_buf_.size());
}
cinatra_metric_conf::server_total_recv_bytes_inc(head_len + body_len);
}
else {
size_t part_size = head_buf_.size();
Expand All @@ -209,22 +181,9 @@ class coro_http_connection
size_to_read);
if (ec) {
CINATRA_LOG_ERROR << "async_read error: " << ec.message();
cinatra_metric_conf::server_failed_req_inc();
close();
break;
}
else {
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_recv_bytes_inc(head_len +
body_len);
mid = std::chrono::system_clock::now();
double count =
std::chrono::duration_cast<std::chrono::microseconds>(mid -
start)
.count();
cinatra_metric_conf::server_read_latency_observe(count);
}
}
}
}

Expand Down Expand Up @@ -358,37 +317,44 @@ class coro_http_connection

while (true) {
size_t left_size = head_buf_.size();
auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
std::string_view left_content{data_ptr, left_size};
auto next_data_ptr =
asio::buffer_cast<const char *>(head_buf_.data());
std::string_view left_content{next_data_ptr, left_size};
size_t pos = left_content.find(TWO_CRCF);
if (pos == std::string_view::npos) {
break;
}
http_parser parser;
int head_len = parser.parse_request(data_ptr, size, 0);
int head_len = parser.parse_request(next_data_ptr, left_size, 0);
if (head_len <= 0) {
CINATRA_LOG_ERROR << "parse http header error";
response_.set_status_and_content(status_type::bad_request,
"invalid http protocol");
co_await reply();
close();
break;
}

head_buf_.consume(pos + TWO_CRCF.length());

std::string_view key = {
parser_.method().data(),
parser_.method().length() + 1 + parser_.url().length()};
std::string_view next_key = {
parser.method().data(),
parser.method().length() + 1 + parser.url().length()};

coro_http_request req(parser, this);
coro_http_response resp(this);
resp.need_date_head(response_.need_date());
if (auto handler = router_.get_handler(key); handler) {
if (auto handler = router_.get_handler(next_key); handler) {
router_.route(handler, req, resp, key);
}
else {
if (auto coro_handler = router_.get_coro_handler(key);
if (auto coro_handler = router_.get_coro_handler(next_key);
coro_handler) {
co_await router_.route_coro(coro_handler, req, resp, key);
}
else {
resp.set_status(status_type::not_found);
}
}

resp.build_resp_str(resp_str_);
Expand All @@ -409,14 +375,6 @@ class coro_http_connection
}
}

if (cinatra_metric_conf::enable_metric) {
mid = std::chrono::system_clock::now();
double count =
std::chrono::duration_cast<std::chrono::microseconds>(mid - start)
.count();
cinatra_metric_conf::server_req_latency_observe(count);
}

response_.clear();
request_.clear();
buffers_.clear();
Expand All @@ -430,10 +388,6 @@ class coro_http_connection
}

async_simple::coro::Lazy<bool> reply(bool need_to_bufffer = true) {
if (response_.status() >= status_type::bad_request) {
if (cinatra_metric_conf::enable_metric)
cinatra_metric_conf::server_failed_req_inc();
}
std::error_code ec;
size_t size;
if (multi_buf_) {
Expand All @@ -444,18 +398,12 @@ class coro_http_connection
for (auto &buf : buffers_) {
send_size += buf.size();
}
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_send_bytes_inc(send_size);
}
std::tie(ec, size) = co_await async_write(buffers_);
}
else {
if (need_to_bufffer) {
response_.build_resp_str(resp_str_);
}
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_send_bytes_inc(resp_str_.size());
}
std::tie(ec, size) = co_await async_write(asio::buffer(resp_str_));
}

Expand Down Expand Up @@ -495,6 +443,12 @@ class coro_http_connection
default_handler_ = handler;
}

#ifdef INJECT_FOR_HTTP_SEVER_TEST
void set_write_failed_forever(bool r) { write_failed_forever_ = r; }

void set_read_failed_forever(bool r) { read_failed_forever_ = r; }
#endif

async_simple::coro::Lazy<bool> write_data(std::string_view message) {
std::vector<asio::const_buffer> buffers;
buffers.push_back(asio::buffer(message));
Expand All @@ -513,8 +467,6 @@ class coro_http_connection
co_return true;
}

bool sync_reply() { return async_simple::coro::syncAwait(reply()); }

async_simple::coro::Lazy<bool> begin_chunked() {
response_.set_delay(true);
response_.set_status(status_type::ok);
Expand Down Expand Up @@ -681,6 +633,18 @@ class coro_http_connection
head_buf_.consume(head_buf_.size());
std::span<char> payload{};
auto payload_length = ws_.payload_length();

if (max_part_size_ != 0 && payload_length > max_part_size_) {
std::string close_reason = "message_too_big";
std::string close_msg = ws_.format_close_payload(
close_code::too_big, close_reason.data(), close_reason.size());
co_await write_websocket(close_msg, opcode::close);
close();
result.ec = std::error_code(asio::error::message_size,
asio::error::get_system_category());
break;
}

if (payload_length > 0) {
detail::resize(body_, payload_length);
auto [ec, read_sz] =
Expand All @@ -693,19 +657,11 @@ class coro_http_connection
payload = body_;
}

if (max_part_size_ != 0 && payload_length > max_part_size_) {
std::string close_reason = "message_too_big";
std::string close_msg = ws_.format_close_payload(
close_code::too_big, close_reason.data(), close_reason.size());
co_await write_websocket(close_msg, opcode::close);
close();
break;
}

ws_frame_type type = ws_.parse_payload(payload);

switch (type) {
case cinatra::ws_frame_type::WS_ERROR_FRAME:
close();
result.ec = std::make_error_code(std::errc::protocol_error);
break;
case cinatra::ws_frame_type::WS_OPENING_FRAME:
Expand Down Expand Up @@ -787,7 +743,7 @@ class coro_http_connection
inflate_str_.clear();
if (!cinatra::gzip_codec::inflate({payload.data(), payload.size()},
inflate_str_)) {
CINATRA_LOG_ERROR << "uncompuress data error";
CINATRA_LOG_ERROR << "compress data error";
result.ec = std::make_error_code(std::errc::protocol_error);
return false;
}
Expand All @@ -812,9 +768,26 @@ class coro_http_connection
response_.set_shrink_to_fit(r);
}

#ifdef INJECT_FOR_HTTP_SEVER_TEST
async_simple::coro::Lazy<std::pair<std::error_code, size_t>>
async_write_failed() {
co_return std::make_pair(std::make_error_code(std::errc::io_error), 0);
}

async_simple::coro::Lazy<std::pair<std::error_code, size_t>>
async_read_failed() {
co_return std::make_pair(std::make_error_code(std::errc::io_error), 0);
}
#endif

template <typename AsioBuffer>
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read(
AsioBuffer &&buffer, size_t size_to_read) noexcept {
#ifdef INJECT_FOR_HTTP_SEVER_TEST
if (read_failed_forever_) {
return async_read_failed();
}
#endif
set_last_time();
#ifdef CINATRA_ENABLE_SSL
if (use_ssl_) {
Expand All @@ -831,6 +804,11 @@ class coro_http_connection
template <typename AsioBuffer>
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_write(
AsioBuffer &&buffer) {
#ifdef INJECT_FOR_HTTP_SEVER_TEST
if (write_failed_forever_) {
return async_write_failed();
}
#endif
set_last_time();
#ifdef CINATRA_ENABLE_SSL
if (use_ssl_) {
Expand Down Expand Up @@ -997,5 +975,9 @@ class coro_http_connection
default_handler_ = nullptr;
std::string chunk_size_str_;
std::string remote_addr_;
#ifdef INJECT_FOR_HTTP_SEVER_TEST
bool write_failed_forever_ = false;
bool read_failed_forever_ = false;
#endif
};
} // namespace cinatra
Loading

0 comments on commit 1d987d9

Please sign in to comment.