Skip to content

Commit

Permalink
UDP relay completion.
Browse files Browse the repository at this point in the history
  • Loading branch information
ssrlive committed May 18, 2020
1 parent 8f8bad5 commit 403b1eb
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 19 deletions.
91 changes: 74 additions & 17 deletions src/client/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "udprelay.h"
#include "s5.h"
#include "base64.h"
#include "cstl_lib.h"

/* Session states. */
#define TUNNEL_STAGE_MAP(V) \
Expand Down Expand Up @@ -58,7 +59,8 @@ struct client_ctx;
struct udp_data_context {
union sockaddr_universal src_addr;
struct socks5_address target_addr;
struct buffer_t *data;
struct cstl_deque* send_deque; // std::deque<struct buffer_t *>
struct cstl_deque* recv_deque;

struct client_ctx *owner; // __weak_ptr
struct udp_listener_ctx_t* udp_ctx; // __weak_ptr
Expand Down Expand Up @@ -1060,7 +1062,11 @@ static void tunnel_tls_on_connection_established(struct tunnel_ctx *tunnel) {
{
struct buffer_t *tmp = buffer_create(SSR_BUFF_SIZE); buffer_replace(tmp, ctx->init_pkg);
if (ctx->udp_data_ctx) {
buffer_replace(tmp, ctx->udp_data_ctx->data);
const void* udp_pkg = cstl_deque_front(ctx->udp_data_ctx->send_deque);
if (udp_pkg) {
buffer_replace(tmp, *((const struct buffer_t**)udp_pkg));
cstl_deque_pop_front(ctx->udp_data_ctx->send_deque);
}
}
if (ssr_ok != tunnel_cipher_client_encrypt(ctx->cipher, tmp)) {
tunnel->tunnel_shutdown(tunnel);
Expand Down Expand Up @@ -1131,6 +1137,21 @@ static void tunnel_tls_on_data_received(struct tunnel_ctx *tunnel, const uint8_t
// it as payload of WebSocket authenticate package in function
// `tunnel_tls_on_connection_established`.
ctx->stage = tunnel_stage_tls_streaming;
do {
struct buffer_t* tmp; const uint8_t* p; size_t size = 0;
const void* udp_pkg = cstl_deque_front(ctx->udp_data_ctx->send_deque);
if (udp_pkg == NULL) {
break;
}
tmp = *((struct buffer_t**)udp_pkg);

tunnel_cipher_client_encrypt(ctx->cipher, tmp);
p = buffer_get_data(tmp, &size);
tunnel_tls_send_websocket_data(tunnel, p, size);

cstl_deque_pop_front(ctx->udp_data_ctx->send_deque);
} while (true);

} else {
do_action_after_auth_server_success(tunnel);
}
Expand Down Expand Up @@ -1171,6 +1192,11 @@ static void tunnel_tls_on_data_received(struct tunnel_ctx *tunnel, const uint8_t
e = tunnel_cipher_client_decrypt(ctx->cipher, tmp, &feedback);
assert(!feedback);

if (ctx->udp_data_ctx) {
struct buffer_t* t2 = buffer_clone(tmp);
cstl_deque_push_back(ctx->udp_data_ctx->recv_deque, &t2, sizeof(struct buffer_t*));
}

buffer_concatenate2(ctx->local_write_cache, tmp);

buffer_release(tmp);
Expand All @@ -1184,10 +1210,20 @@ static void tunnel_tls_on_data_received(struct tunnel_ctx *tunnel, const uint8_t

if (ctx->udp_data_ctx) {
// Write the received remote data back to the connected UDP client.
struct udp_data_context* udp_data_ctx = ctx->udp_data_ctx;
size_t s = 0;
const uint8_t* p = buffer_get_data(ctx->local_write_cache, &s);
udp_relay_send_data(udp_data_ctx->udp_ctx, &udp_data_ctx->src_addr, p, s);
do {
const struct buffer_t* tmp; size_t s = 0; const uint8_t* p;
const void* udp_pkg = cstl_deque_front(ctx->udp_data_ctx->recv_deque);
if (udp_pkg == NULL) {
break;
}
tmp = *((struct buffer_t**)udp_pkg);

p = buffer_get_data(tmp, &s);
udp_relay_send_data(ctx->udp_data_ctx->udp_ctx, &ctx->udp_data_ctx->src_addr, p, s);

cstl_deque_pop_front(ctx->udp_data_ctx->recv_deque);
} while (true);

buffer_reset(ctx->local_write_cache);
return;
}
Expand Down Expand Up @@ -1264,16 +1300,31 @@ static bool can_access(const struct tunnel_ctx *cx, const struct sockaddr *addr)
return false;
}

static int deque_compare_e_ptr(const void* left, const void* right) {
struct buffer_t* l = *((struct buffer_t**)left);
struct buffer_t* r = *((struct buffer_t**)right);
return (int)((ssize_t)l - (ssize_t)r);
}

static void deque_free_e(void* ptr) {
if (ptr) {
struct buffer_t* p = *((struct buffer_t**)ptr);
buffer_release(p);
}
}

struct udp_data_context * udp_data_context_create(void) {
struct udp_data_context *ptr;
ptr = (struct udp_data_context *) calloc(1, sizeof(*ptr));
ptr->data = buffer_create(SSR_BUFF_SIZE);
ptr->send_deque = cstl_deque_new(10, deque_compare_e_ptr, deque_free_e);
ptr->recv_deque = cstl_deque_new(10, deque_compare_e_ptr, deque_free_e);
return ptr;
}

void udp_data_context_destroy(struct udp_data_context *ptr) {
if (ptr) {
buffer_release(ptr->data);
cstl_deque_delete(ptr->send_deque);
cstl_deque_delete(ptr->recv_deque);
free(ptr);
}
}
Expand Down Expand Up @@ -1304,6 +1355,7 @@ void udp_on_recv_data(struct udp_listener_ctx_t *udp_ctx, const union sockaddr_u
const uint8_t *data_p = buffer_get_data(data, &data_len);
struct udp_data_context *query_data;
const uint8_t *raw_p = NULL; size_t raw_len = 0;
struct buffer_t* out_ref;

query_data = udp_data_context_create();
if (src_addr) {
Expand All @@ -1317,21 +1369,26 @@ void udp_on_recv_data(struct udp_listener_ctx_t *udp_ctx, const union sockaddr_u
return;
}

out_ref = buffer_create_from(raw_p, raw_len);

cstl_set_container_traverse(env->tunnel_set, &_do_find_upd_tunnel, query_data);
if (query_data->owner) {
struct buffer_t* out_ref;
ctx = query_data->owner;
ASSERT(ctx->udp_data_ctx);
udp_data_context_destroy(query_data);
ASSERT(ctx->stage > tunnel_stage_tls_connecting);
out_ref = (ctx && ctx->udp_data_ctx) ? ctx->udp_data_ctx->data : NULL;
buffer_store(out_ref, raw_p, raw_len);
tunnel = ctx->tunnel;
if (ssr_ok != tunnel_cipher_client_encrypt(ctx->cipher, out_ref)) {
tunnel->tunnel_shutdown(tunnel);
if (tunnel && tunnel->tunnel_is_in_streaming && tunnel->tunnel_is_in_streaming(tunnel)) {
if (ssr_ok != tunnel_cipher_client_encrypt(ctx->cipher, out_ref)) {
tunnel->tunnel_shutdown(tunnel);
} else {
size_t len = 0; const uint8_t* p = buffer_get_data(out_ref, &len);
tunnel_tls_send_websocket_data(tunnel, p, len);
}
buffer_release(out_ref);
} else if (ctx->udp_data_ctx) {
cstl_deque_push_back(ctx->udp_data_ctx->send_deque, &out_ref, sizeof(struct buffer_t*));
} else {
size_t len = 0; const uint8_t* p = buffer_get_data(out_ref, &len);
tunnel_tls_send_websocket_data(tunnel, p, len);
UNREACHABLE();
}
} else {
tunnel = tunnel_initialize(loop, NULL, config->idle_timeout, &init_done_cb, env);
Expand All @@ -1347,7 +1404,7 @@ void udp_on_recv_data(struct udp_listener_ctx_t *udp_ctx, const union sockaddr_u

client_tunnel_connecting_print_info(tunnel);

buffer_store(ctx->udp_data_ctx->data, raw_p, raw_len);
cstl_deque_push_back(ctx->udp_data_ctx->send_deque, &out_ref, sizeof(struct buffer_t*));
}
(void)p;
}
39 changes: 37 additions & 2 deletions src/server/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ip_addr_cache.h"
#include "s5.h"
#include "base64.h"
#include "cstl_lib.h"

#ifndef SSR_MAX_CONN
#define SSR_MAX_CONN 1024
Expand Down Expand Up @@ -84,6 +85,7 @@ struct server_ctx {
struct buffer_t *client_delivery_cache;
struct tunnel_ctx *tunnel; // __weak_ptr
struct udp_remote_ctx_t *udp_relay;
struct cstl_deque* udp_recv_deque;
};

static int ssr_server_run_loop(struct server_config *config, bool force_quit);
Expand Down Expand Up @@ -320,6 +322,19 @@ void ssr_server_shutdown(struct ssr_server_state *state) {
}
}

static int deque_compare_e_ptr(const void* left, const void* right) {
struct buffer_t* l = *((struct buffer_t**)left);
struct buffer_t* r = *((struct buffer_t**)right);
return (int)((ssize_t)l - (ssize_t)r);
}

static void deque_free_e(void* ptr) {
if (ptr) {
struct buffer_t* p = *((struct buffer_t**)ptr);
buffer_release(p);
}
}

bool _init_done_cb(struct tunnel_ctx *tunnel, void *p) {
struct server_env_t *env = (struct server_env_t *)p;

Expand Down Expand Up @@ -350,6 +365,8 @@ bool _init_done_cb(struct tunnel_ctx *tunnel, void *p) {
#define SOCKET_DATA_BUFFER_SIZE 0x8000
ctx->client_delivery_cache = buffer_create(SOCKET_DATA_BUFFER_SIZE);

ctx->udp_recv_deque = cstl_deque_new(10, deque_compare_e_ptr, deque_free_e);

return is_incoming_ip_legal(tunnel);
}

Expand Down Expand Up @@ -413,6 +430,7 @@ static void tunnel_dying(struct tunnel_ctx *tunnel) {
buffer_release(ctx->init_pkg);
if (ctx->sec_websocket_key) { free(ctx->sec_websocket_key); }
buffer_release(ctx->client_delivery_cache);
cstl_deque_delete(ctx->udp_recv_deque);
free(ctx);
}

Expand Down Expand Up @@ -1264,8 +1282,20 @@ static void tunnel_udp_streaming(struct tunnel_ctx *tunnel, struct socket_ctx *s

src = buffer_create_from((uint8_t *)socket->buf->base, (size_t)socket->result);
buf = extract_data_from_assembled_websocket_frame(ctx, src);
p = buffer_get_data(buf, &p_len);
udp_remote_send_data(ctx->udp_relay, p, p_len);

do {
const struct buffer_t* tmp;
const void* udp_pkg = cstl_deque_front(ctx->udp_recv_deque);
if (udp_pkg == NULL) {
break;
}
tmp = *((struct buffer_t**)udp_pkg);

p = buffer_get_data(tmp, &p_len);
udp_remote_send_data(ctx->udp_relay, p, p_len);

cstl_deque_pop_front(ctx->udp_recv_deque);
} while (true);

buffer_release(src);
buffer_release(buf);
Expand Down Expand Up @@ -1324,6 +1354,11 @@ static struct buffer_t * extract_data_from_assembled_websocket_frame(struct serv
ASSERT(obfs_receipt == NULL);
ASSERT(proto_confirm == NULL);

if (ctx->udp_relay) {
struct buffer_t* t2 = buffer_clone(tmp);
cstl_deque_push_back(ctx->udp_recv_deque, &t2, sizeof(struct buffer_t*));
}

buffer_concatenate2(buf, tmp);

buffer_release(tmp);
Expand Down

0 comments on commit 403b1eb

Please sign in to comment.