From 43e6c3343ce0b370fec70997e242d80b876f28d5 Mon Sep 17 00:00:00 2001 From: "Alan T. DeKok" Date: Sat, 4 Jan 2025 13:27:11 -0500 Subject: [PATCH] better handle "queue full" states we already have a request state which indicates that the request is owned by the queue. We add a new request state which says that the queue code should free the request on dequeue. Double-checked the rest of the code, and added more cleanups for requests which should handle the "queue full" case a bit better. --- src/include/radiusd.h | 2 + src/main/process.c | 65 ++++++++++++-- src/main/threads.c | 139 ++++++++++++++++++++++++----- src/main/unittest.c | 17 ++++ src/modules/rlm_eap/radeapclient.c | 17 +++- 5 files changed, 211 insertions(+), 29 deletions(-) diff --git a/src/include/radiusd.h b/src/include/radiusd.h index ff1aaa30e5f73..da7a6b8421fba 100644 --- a/src/include/radiusd.h +++ b/src/include/radiusd.h @@ -202,6 +202,7 @@ typedef struct main_config { typedef enum { REQUEST_ACTIVE = 1, REQUEST_STOP_PROCESSING, + REQUEST_TO_FREE, //!< in the queue, and the queue should free it } rad_master_state_t; #define REQUEST_MASTER_NUM_STATES (REQUEST_STOP_PROCESSING + 1) @@ -548,6 +549,7 @@ int radius_copy_vp(TALLOC_CTX *ctx, VALUE_PAIR **out, REQUEST *request, char con /* threads.c */ int thread_pool_init(CONF_SECTION *cs, bool *spawn_flag); void thread_pool_stop(void); +void thread_pool_free(void); int thread_pool_addrequest(REQUEST *, RAD_REQUEST_FUNP); pid_t rad_fork(void); pid_t rad_waitpid(pid_t pid, int *status); diff --git a/src/main/process.c b/src/main/process.c index 478fa7ac6b08f..d30a95e3cade6 100644 --- a/src/main/process.c +++ b/src/main/process.c @@ -62,6 +62,12 @@ time_t fr_start_time = (time_t)-1; static rbtree_t *pl = NULL; static fr_event_list_t *el = NULL; +/* + * These are shared with threads.c, and nothing else. + */ +void request_free(REQUEST *request) CC_HINT(nonnull); +void request_done(REQUEST *request, int original) CC_HINT(nonnull); + fr_event_list_t *radius_event_list_corral(UNUSED event_corral_t hint) { /* Currently we do not run a second event loop for modules. */ return el; @@ -100,7 +106,7 @@ static char const *master_state_names[REQUEST_MASTER_NUM_STATES] = { "?", "active", "stop-processing", - "counted" + "in-queue-waiting-to-free", }; static char const *child_state_names[REQUEST_CHILD_NUM_STATES] = { @@ -396,7 +402,6 @@ STATE_MACHINE_DECL(request_ping) CC_HINT(nonnull); STATE_MACHINE_DECL(request_response_delay) CC_HINT(nonnull); STATE_MACHINE_DECL(request_cleanup_delay) CC_HINT(nonnull); STATE_MACHINE_DECL(request_running) CC_HINT(nonnull); -STATE_MACHINE_DECL(request_done) CC_HINT(nonnull); STATE_MACHINE_DECL(proxy_no_reply) CC_HINT(nonnull); STATE_MACHINE_DECL(proxy_running) CC_HINT(nonnull); @@ -590,7 +595,7 @@ static void request_timer(void *ctx) * request. If there is a parent, free the parent INSTEAD of the * request. */ -static void request_free(REQUEST *request) +void request_free(REQUEST *request) { void *ptr; @@ -598,6 +603,15 @@ static void request_free(REQUEST *request) rad_assert(!request->in_request_hash); rad_assert(!request->in_proxy_hash); + /* + * Don't free requests which are in the queue. The code + * in threads.c will take care of doing that. + */ + if (request->child_state == REQUEST_QUEUED) { + request->master_state = REQUEST_TO_FREE; + return; + } + if ((request->options & RAD_REQUEST_OPTION_CTX) == 0) { talloc_free(request); return; @@ -687,7 +701,7 @@ static void proxy_reply_too_late(REQUEST *request) * } * \enddot */ -static void request_done(REQUEST *request, int original) +void request_done(REQUEST *request, int original) { struct timeval now, when; int action = original; @@ -802,7 +816,17 @@ static void request_done(REQUEST *request, int original) case FR_ACTION_DONE: #ifdef HAVE_PTHREAD_H /* - * If the child is still running, leave it alone. + * If the child is still queued or running, don't + * mark it as DONE. + * + * For queued requests, the request_free() + * function will mark up the request so that the + * queue will free it. + * + * For running requests, the child thread will + * eventually call request_done(). A timer in + * the master thread will then take care of + * cleaning up the request. */ if (spawn_flag && (request->child_state <= REQUEST_RUNNING)) { break; @@ -2013,6 +2037,7 @@ int request_receive(TALLOC_CTX *ctx, rad_listen_t *listener, RADIUS_PACKET *pack /* * Don't do delayed reject. Oh well. */ + request->child_state = REQUEST_DONE; request_free(request); return 1; } @@ -6527,9 +6552,25 @@ static int request_delete_cb(UNUSED void *ctx, void *data) /* * Not done, or the child thread is still processing it. */ - if (request->child_state < REQUEST_RESPONSE_DELAY) return 0; /* continue */ + switch (request->child_state) { + default: + case REQUEST_QUEUED: + case REQUEST_RESPONSE_DELAY: + case REQUEST_CLEANUP_DELAY: + case REQUEST_DONE: + break; + + case REQUEST_RUNNING: + case REQUEST_PROXIED: + return 0; + } #ifdef HAVE_PTHREAD_H + /* + * The request is being processed by a child thread. + * This should never happen, but perhaps race condition + * could cause this to be set? + */ if (pthread_equal(request->child_pid, NO_SUCH_CHILD_PID) == 0) return 0; #endif @@ -6566,6 +6607,16 @@ void radius_event_free(void) { ASSERT_MASTER; +#ifdef HAVE_PTHREAD_H + /* + * Stop all threads from processing requests. Do this + * before trying to clean up or free outstanding requests. + */ + if (spawn_flag) { + thread_pool_stop(); + } +#endif + #ifdef WITH_PROXY /* * There are requests in the proxy hash that aren't @@ -6584,7 +6635,7 @@ void radius_event_free(void) * ensure that all of the threads have exited. */ #ifdef HAVE_PTHREAD_H - thread_pool_stop(); + thread_pool_free(); #endif /* diff --git a/src/main/threads.c b/src/main/threads.c index 5730b5e8448e7..6d5baf829bc41 100644 --- a/src/main/threads.c +++ b/src/main/threads.c @@ -136,6 +136,16 @@ typedef struct fr_pps_t { } fr_pps_t; #endif +/* + * In process.c, but no one else should be calling these + * functions. + */ +extern void request_free(REQUEST *request); +extern void request_done(REQUEST *request, int original); + +#ifndef HAVE_STDATOMIC_H +static int request_discard_lower_priority(int priority); +#endif /* * A data structure to manage the thread pool. There's no real @@ -375,6 +385,13 @@ int request_enqueue(REQUEST *request) managed = true; } + /* + * Update the request state. + */ + request->component = ""; + request->module = ""; + request->child_state = REQUEST_QUEUED; + #ifdef HAVE_STDATOMIC_H if (!managed) { uint32_t num; @@ -393,18 +410,12 @@ int request_enqueue(REQUEST *request) } } - /* - * Use atomic queues where possible. They're substantially faster than mutexes. - */ - request->component = ""; - request->module = ""; - request->child_state = REQUEST_QUEUED; - /* * Push the request onto the appropriate fifo for that */ if (!fr_atomic_queue_push(thread_pool.queue[request->priority], request)) { - ERROR("!!! ERROR !!! Failed inserting request %d into the queue", request->number); + RATE_LIMIT(ERROR("Something is blocking the server. There are too many packets in the queue, " + "waiting to be processed. Ignoring the new request.")); return 0; } @@ -499,27 +510,24 @@ int request_enqueue(REQUEST *request) thread_pool.request_count++; - if (thread_pool.num_queued >= thread_pool.max_queue_size) { + /* + * If there are too many packets _overall_, then try to delete a lower priority one. + */ + if ((thread_pool.num_queued >= thread_pool.max_queue_size) && + (request_discard_lower_priority(request->priority) == 0)) { pthread_mutex_unlock(&thread_pool.queue_mutex); - /* - * Mark the request as done. - */ RATE_LIMIT(ERROR("Something is blocking the server. There are %d packets in the queue, " "waiting to be processed. Ignoring the new request.", thread_pool.num_queued)); return 0; } - request->component = ""; - request->module = ""; - request->child_state = REQUEST_QUEUED; - /* - * Push the request onto the appropriate fifo for that + * Push the request onto the appropriate fifo for that priority. */ if (!fr_fifo_push(thread_pool.fifo[request->priority], request)) { - pthread_mutex_unlock(&thread_pool.queue_mutex); - ERROR("!!! ERROR !!! Failed inserting request %d into the queue", request->number); + RATE_LIMIT(ERROR("Something is blocking the server. There are too many packets in the queue, " + "waiting to be processed. Ignoring the new request.")); return 0; } @@ -542,6 +550,31 @@ int request_enqueue(REQUEST *request) return 1; } +#ifndef HAVE_STDATOMIC_H +/* + * Try to free up requests by discarding requests of lower priority. + */ +static int request_discard_lower_priority(int priority) +{ + int i; + REQUEST *request; + + if (priority == 0) return 0; + + for (i = NUM_FIFOS - 1; i < priority; i--) { + request = fr_fifo_pop(thread_pool.fifo[i]); + if (!request) continue; + + fr_assert(request->child_state == REQUEST_QUEUED); + request->child_state = REQUEST_DONE; + request_done(request, FR_ACTION_DONE); + return 1; + } + + return 0; +} +#endif + /* * Remove a request from the queue. */ @@ -575,8 +608,32 @@ static int request_dequeue(REQUEST **prequest) /* * This entry was marked to be stopped. Acknowledge it. + * + * If we own the request, we delete it. Otherwise + * we run the "done" callback now, which will + * stop timers, remove it from the request hash, + * update listener counts, etc. + * + * Running request_done() here means that old + * requests are cleaned up immediately, which + * frees up more resources for new requests. It + * also means that we don't need to rely on + * timers to free up the old requests, as those + * timers will run much much later. + * + * Catching this corner case doesn't change the + * normal operation of the server. Most requests + * should NOT be marked "stop processing" when + * they're in the queue. This situation + * generally happens when the server is blocked, + * due to a slow back-end database. */ request->child_state = REQUEST_DONE; + if (request->master_state == REQUEST_TO_FREE) { + request_free(request); + } else { + request_done(request, REQUEST_DONE); + } } /* @@ -631,7 +688,13 @@ static int request_dequeue(REQUEST **prequest) request = fr_fifo_pop(thread_pool.fifo[i]); rad_assert(request != NULL); VERIFY_REQUEST(request); + request->child_state = REQUEST_DONE; + if (request->master_state == REQUEST_TO_FREE) { + request_free(request); + } else { + request_done(request, REQUEST_DONE); + } thread_pool.num_queued--; } @@ -663,6 +726,7 @@ static int request_dequeue(REQUEST **prequest) rad_assert(*prequest != NULL); rad_assert(request->magic == REQUEST_MAGIC); + rad_assert(request->child_state == REQUEST_QUEUED); request->component = ""; request->module = ""; @@ -1196,10 +1260,30 @@ int thread_pool_init(CONF_SECTION *cs, bool *spawn_flag) } DIAG_ON(deprecated-declarations) +void thread_pool_stop(void) +{ + int i, total_threads; + + if (!pool_initialized) return; + + /* + * Set pool stop flag. + */ + thread_pool.stop_flag = true; + + /* + * Wakeup all threads to make them see stop flag. + */ + total_threads = thread_pool.total_threads; + for (i = 0; i != total_threads; i++) { + sem_post(&thread_pool.semaphore); + } +} + /* - * Stop all threads in the pool. + * Free all thread-related information */ -void thread_pool_stop(void) +void thread_pool_free(void) { #ifndef WITH_GCD int i; @@ -1231,6 +1315,19 @@ void thread_pool_stop(void) delete_thread(handle); } + /* + * Free any requests which were blocked in the queue, but + * only if we're checking that no memory leaked. + */ + if (main_config.memory_report) { + REQUEST *request; + + while (request_dequeue(&request) == 1) { + request->child_state = REQUEST_DONE; + request_free(request); + } + } + for (i = 0; i < NUM_FIFOS; i++) { #ifdef HAVE_STDATOMIC_H fr_atomic_queue_free(&thread_pool.queue[i]); diff --git a/src/main/unittest.c b/src/main/unittest.c index c82d31dcfa223..feb72d6598abb 100644 --- a/src/main/unittest.c +++ b/src/main/unittest.c @@ -72,6 +72,23 @@ void request_inject(UNUSED REQUEST *request) /* do nothing */ } +/* + * These are shared with threads.c, and nothing else. + */ +void request_free(REQUEST *request) CC_HINT(nonnull); +void request_done(REQUEST *request, int original) CC_HINT(nonnull); + +void request_free(UNUSED REQUEST *request) +{ + /* do nothing */ +} + +void request_done(UNUSED REQUEST *request, UNUSED int original) +{ + /* do nothing */ +} + + #ifdef WITH_RADIUSV11 int fr_radiusv11_client_init(UNUSED fr_tls_server_conf_t *tls); diff --git a/src/modules/rlm_eap/radeapclient.c b/src/modules/rlm_eap/radeapclient.c index cdf8fa47bd0d3..66d9f040f924a 100644 --- a/src/modules/rlm_eap/radeapclient.c +++ b/src/modules/rlm_eap/radeapclient.c @@ -190,12 +190,27 @@ rlm_rcode_t process_post_auth(UNUSED int postauth_type, UNUSED REQUEST *request) return RLM_MODULE_FAIL; } - fr_event_list_t *radius_event_list_corral(UNUSED event_corral_t hint) { return NULL; } +/* + * These are shared with threads.c, and nothing else. + */ +void request_free(REQUEST *request) CC_HINT(nonnull); +void request_done(REQUEST *request, int original) CC_HINT(nonnull); + +void request_free(UNUSED REQUEST *request) +{ + /* do nothing */ +} + +void request_done(UNUSED REQUEST *request, UNUSED int original) +{ + /* do nothing */ +} + static void NEVER_RETURNS usage(void) { fprintf(stdout, "Usage: radeapclient [options] server[:port] []\n");