diff --git a/src/client/client.c b/src/client/client.c index 67cb2ef90..8c8dd7205 100644 --- a/src/client/client.c +++ b/src/client/client.c @@ -15,6 +15,7 @@ #include "udprelay.h" #include "s5.h" #include "base64.h" +#include "cstl_lib.h" /* Session states. */ #define TUNNEL_STAGE_MAP(V) \ @@ -58,7 +59,8 @@ struct client_ctx; struct udp_data_context { union sockaddr_universal src_addr; struct socks5_address target_addr; - struct buffer_t *data; + struct cstl_deque* send_deque; // std::deque + struct cstl_deque* recv_deque; struct client_ctx *owner; // __weak_ptr struct udp_listener_ctx_t* udp_ctx; // __weak_ptr @@ -1060,7 +1062,11 @@ static void tunnel_tls_on_connection_established(struct tunnel_ctx *tunnel) { { struct buffer_t *tmp = buffer_create(SSR_BUFF_SIZE); buffer_replace(tmp, ctx->init_pkg); if (ctx->udp_data_ctx) { - buffer_replace(tmp, ctx->udp_data_ctx->data); + const void* udp_pkg = cstl_deque_front(ctx->udp_data_ctx->send_deque); + if (udp_pkg) { + buffer_replace(tmp, *((const struct buffer_t**)udp_pkg)); + cstl_deque_pop_front(ctx->udp_data_ctx->send_deque); + } } if (ssr_ok != tunnel_cipher_client_encrypt(ctx->cipher, tmp)) { tunnel->tunnel_shutdown(tunnel); @@ -1131,6 +1137,21 @@ static void tunnel_tls_on_data_received(struct tunnel_ctx *tunnel, const uint8_t // it as payload of WebSocket authenticate package in function // `tunnel_tls_on_connection_established`. ctx->stage = tunnel_stage_tls_streaming; + do { + struct buffer_t* tmp; const uint8_t* p; size_t size = 0; + const void* udp_pkg = cstl_deque_front(ctx->udp_data_ctx->send_deque); + if (udp_pkg == NULL) { + break; + } + tmp = *((struct buffer_t**)udp_pkg); + + tunnel_cipher_client_encrypt(ctx->cipher, tmp); + p = buffer_get_data(tmp, &size); + tunnel_tls_send_websocket_data(tunnel, p, size); + + cstl_deque_pop_front(ctx->udp_data_ctx->send_deque); + } while (true); + } else { do_action_after_auth_server_success(tunnel); } @@ -1171,6 +1192,11 @@ static void tunnel_tls_on_data_received(struct tunnel_ctx *tunnel, const uint8_t e = tunnel_cipher_client_decrypt(ctx->cipher, tmp, &feedback); assert(!feedback); + if (ctx->udp_data_ctx) { + struct buffer_t* t2 = buffer_clone(tmp); + cstl_deque_push_back(ctx->udp_data_ctx->recv_deque, &t2, sizeof(struct buffer_t*)); + } + buffer_concatenate2(ctx->local_write_cache, tmp); buffer_release(tmp); @@ -1184,10 +1210,20 @@ static void tunnel_tls_on_data_received(struct tunnel_ctx *tunnel, const uint8_t if (ctx->udp_data_ctx) { // Write the received remote data back to the connected UDP client. - struct udp_data_context* udp_data_ctx = ctx->udp_data_ctx; - size_t s = 0; - const uint8_t* p = buffer_get_data(ctx->local_write_cache, &s); - udp_relay_send_data(udp_data_ctx->udp_ctx, &udp_data_ctx->src_addr, p, s); + do { + const struct buffer_t* tmp; size_t s = 0; const uint8_t* p; + const void* udp_pkg = cstl_deque_front(ctx->udp_data_ctx->recv_deque); + if (udp_pkg == NULL) { + break; + } + tmp = *((struct buffer_t**)udp_pkg); + + p = buffer_get_data(tmp, &s); + udp_relay_send_data(ctx->udp_data_ctx->udp_ctx, &ctx->udp_data_ctx->src_addr, p, s); + + cstl_deque_pop_front(ctx->udp_data_ctx->recv_deque); + } while (true); + buffer_reset(ctx->local_write_cache); return; } @@ -1264,16 +1300,31 @@ static bool can_access(const struct tunnel_ctx *cx, const struct sockaddr *addr) return false; } +static int deque_compare_e_ptr(const void* left, const void* right) { + struct buffer_t* l = *((struct buffer_t**)left); + struct buffer_t* r = *((struct buffer_t**)right); + return (int)((ssize_t)l - (ssize_t)r); +} + +static void deque_free_e(void* ptr) { + if (ptr) { + struct buffer_t* p = *((struct buffer_t**)ptr); + buffer_release(p); + } +} + struct udp_data_context * udp_data_context_create(void) { struct udp_data_context *ptr; ptr = (struct udp_data_context *) calloc(1, sizeof(*ptr)); - ptr->data = buffer_create(SSR_BUFF_SIZE); + ptr->send_deque = cstl_deque_new(10, deque_compare_e_ptr, deque_free_e); + ptr->recv_deque = cstl_deque_new(10, deque_compare_e_ptr, deque_free_e); return ptr; } void udp_data_context_destroy(struct udp_data_context *ptr) { if (ptr) { - buffer_release(ptr->data); + cstl_deque_delete(ptr->send_deque); + cstl_deque_delete(ptr->recv_deque); free(ptr); } } @@ -1304,6 +1355,7 @@ void udp_on_recv_data(struct udp_listener_ctx_t *udp_ctx, const union sockaddr_u const uint8_t *data_p = buffer_get_data(data, &data_len); struct udp_data_context *query_data; const uint8_t *raw_p = NULL; size_t raw_len = 0; + struct buffer_t* out_ref; query_data = udp_data_context_create(); if (src_addr) { @@ -1317,21 +1369,26 @@ void udp_on_recv_data(struct udp_listener_ctx_t *udp_ctx, const union sockaddr_u return; } + out_ref = buffer_create_from(raw_p, raw_len); + cstl_set_container_traverse(env->tunnel_set, &_do_find_upd_tunnel, query_data); if (query_data->owner) { - struct buffer_t* out_ref; ctx = query_data->owner; ASSERT(ctx->udp_data_ctx); udp_data_context_destroy(query_data); - ASSERT(ctx->stage > tunnel_stage_tls_connecting); - out_ref = (ctx && ctx->udp_data_ctx) ? ctx->udp_data_ctx->data : NULL; - buffer_store(out_ref, raw_p, raw_len); tunnel = ctx->tunnel; - if (ssr_ok != tunnel_cipher_client_encrypt(ctx->cipher, out_ref)) { - tunnel->tunnel_shutdown(tunnel); + if (tunnel && tunnel->tunnel_is_in_streaming && tunnel->tunnel_is_in_streaming(tunnel)) { + if (ssr_ok != tunnel_cipher_client_encrypt(ctx->cipher, out_ref)) { + tunnel->tunnel_shutdown(tunnel); + } else { + size_t len = 0; const uint8_t* p = buffer_get_data(out_ref, &len); + tunnel_tls_send_websocket_data(tunnel, p, len); + } + buffer_release(out_ref); + } else if (ctx->udp_data_ctx) { + cstl_deque_push_back(ctx->udp_data_ctx->send_deque, &out_ref, sizeof(struct buffer_t*)); } else { - size_t len = 0; const uint8_t* p = buffer_get_data(out_ref, &len); - tunnel_tls_send_websocket_data(tunnel, p, len); + UNREACHABLE(); } } else { tunnel = tunnel_initialize(loop, NULL, config->idle_timeout, &init_done_cb, env); @@ -1347,7 +1404,7 @@ void udp_on_recv_data(struct udp_listener_ctx_t *udp_ctx, const union sockaddr_u client_tunnel_connecting_print_info(tunnel); - buffer_store(ctx->udp_data_ctx->data, raw_p, raw_len); + cstl_deque_push_back(ctx->udp_data_ctx->send_deque, &out_ref, sizeof(struct buffer_t*)); } (void)p; } diff --git a/src/server/server.c b/src/server/server.c index d7b6cb286..bf44db617 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -22,6 +22,7 @@ #include "ip_addr_cache.h" #include "s5.h" #include "base64.h" +#include "cstl_lib.h" #ifndef SSR_MAX_CONN #define SSR_MAX_CONN 1024 @@ -84,6 +85,7 @@ struct server_ctx { struct buffer_t *client_delivery_cache; struct tunnel_ctx *tunnel; // __weak_ptr struct udp_remote_ctx_t *udp_relay; + struct cstl_deque* udp_recv_deque; }; static int ssr_server_run_loop(struct server_config *config, bool force_quit); @@ -320,6 +322,19 @@ void ssr_server_shutdown(struct ssr_server_state *state) { } } +static int deque_compare_e_ptr(const void* left, const void* right) { + struct buffer_t* l = *((struct buffer_t**)left); + struct buffer_t* r = *((struct buffer_t**)right); + return (int)((ssize_t)l - (ssize_t)r); +} + +static void deque_free_e(void* ptr) { + if (ptr) { + struct buffer_t* p = *((struct buffer_t**)ptr); + buffer_release(p); + } +} + bool _init_done_cb(struct tunnel_ctx *tunnel, void *p) { struct server_env_t *env = (struct server_env_t *)p; @@ -350,6 +365,8 @@ bool _init_done_cb(struct tunnel_ctx *tunnel, void *p) { #define SOCKET_DATA_BUFFER_SIZE 0x8000 ctx->client_delivery_cache = buffer_create(SOCKET_DATA_BUFFER_SIZE); + ctx->udp_recv_deque = cstl_deque_new(10, deque_compare_e_ptr, deque_free_e); + return is_incoming_ip_legal(tunnel); } @@ -413,6 +430,7 @@ static void tunnel_dying(struct tunnel_ctx *tunnel) { buffer_release(ctx->init_pkg); if (ctx->sec_websocket_key) { free(ctx->sec_websocket_key); } buffer_release(ctx->client_delivery_cache); + cstl_deque_delete(ctx->udp_recv_deque); free(ctx); } @@ -1264,8 +1282,20 @@ static void tunnel_udp_streaming(struct tunnel_ctx *tunnel, struct socket_ctx *s src = buffer_create_from((uint8_t *)socket->buf->base, (size_t)socket->result); buf = extract_data_from_assembled_websocket_frame(ctx, src); - p = buffer_get_data(buf, &p_len); - udp_remote_send_data(ctx->udp_relay, p, p_len); + + do { + const struct buffer_t* tmp; + const void* udp_pkg = cstl_deque_front(ctx->udp_recv_deque); + if (udp_pkg == NULL) { + break; + } + tmp = *((struct buffer_t**)udp_pkg); + + p = buffer_get_data(tmp, &p_len); + udp_remote_send_data(ctx->udp_relay, p, p_len); + + cstl_deque_pop_front(ctx->udp_recv_deque); + } while (true); buffer_release(src); buffer_release(buf); @@ -1324,6 +1354,11 @@ static struct buffer_t * extract_data_from_assembled_websocket_frame(struct serv ASSERT(obfs_receipt == NULL); ASSERT(proto_confirm == NULL); + if (ctx->udp_relay) { + struct buffer_t* t2 = buffer_clone(tmp); + cstl_deque_push_back(ctx->udp_recv_deque, &t2, sizeof(struct buffer_t*)); + } + buffer_concatenate2(buf, tmp); buffer_release(tmp);