Skip to content

Commit

Permalink
better handle "queue full" states
Browse files Browse the repository at this point in the history
we already have a request state which indicates that the request
is owned by the queue.  We add a new request state which says that
the queue code should free the request on dequeue.

Double-checked the rest of the code, and added more cleanups
for requests which should handle the "queue full" case a bit
better.
  • Loading branch information
alandekok committed Jan 4, 2025
1 parent e77325e commit 43e6c33
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 29 deletions.
2 changes: 2 additions & 0 deletions src/include/radiusd.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ typedef struct main_config {
typedef enum {
REQUEST_ACTIVE = 1,
REQUEST_STOP_PROCESSING,
REQUEST_TO_FREE, //!< in the queue, and the queue should free it
} rad_master_state_t;
#define REQUEST_MASTER_NUM_STATES (REQUEST_STOP_PROCESSING + 1)

Expand Down Expand Up @@ -548,6 +549,7 @@ int radius_copy_vp(TALLOC_CTX *ctx, VALUE_PAIR **out, REQUEST *request, char con
/* threads.c */
int thread_pool_init(CONF_SECTION *cs, bool *spawn_flag);
void thread_pool_stop(void);
void thread_pool_free(void);
int thread_pool_addrequest(REQUEST *, RAD_REQUEST_FUNP);
pid_t rad_fork(void);
pid_t rad_waitpid(pid_t pid, int *status);
Expand Down
65 changes: 58 additions & 7 deletions src/main/process.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ time_t fr_start_time = (time_t)-1;
static rbtree_t *pl = NULL;
static fr_event_list_t *el = NULL;

/*
* These are shared with threads.c, and nothing else.
*/
void request_free(REQUEST *request) CC_HINT(nonnull);
void request_done(REQUEST *request, int original) CC_HINT(nonnull);

fr_event_list_t *radius_event_list_corral(UNUSED event_corral_t hint) {
/* Currently we do not run a second event loop for modules. */
return el;
Expand Down Expand Up @@ -100,7 +106,7 @@ static char const *master_state_names[REQUEST_MASTER_NUM_STATES] = {
"?",
"active",
"stop-processing",
"counted"
"in-queue-waiting-to-free",
};

static char const *child_state_names[REQUEST_CHILD_NUM_STATES] = {
Expand Down Expand Up @@ -396,7 +402,6 @@ STATE_MACHINE_DECL(request_ping) CC_HINT(nonnull);
STATE_MACHINE_DECL(request_response_delay) CC_HINT(nonnull);
STATE_MACHINE_DECL(request_cleanup_delay) CC_HINT(nonnull);
STATE_MACHINE_DECL(request_running) CC_HINT(nonnull);
STATE_MACHINE_DECL(request_done) CC_HINT(nonnull);

STATE_MACHINE_DECL(proxy_no_reply) CC_HINT(nonnull);
STATE_MACHINE_DECL(proxy_running) CC_HINT(nonnull);
Expand Down Expand Up @@ -590,14 +595,23 @@ static void request_timer(void *ctx)
* request. If there is a parent, free the parent INSTEAD of the
* request.
*/
static void request_free(REQUEST *request)
void request_free(REQUEST *request)
{
void *ptr;

rad_assert(request->ev == NULL);
rad_assert(!request->in_request_hash);
rad_assert(!request->in_proxy_hash);

/*
* Don't free requests which are in the queue. The code
* in threads.c will take care of doing that.
*/
if (request->child_state == REQUEST_QUEUED) {
request->master_state = REQUEST_TO_FREE;
return;
}

if ((request->options & RAD_REQUEST_OPTION_CTX) == 0) {
talloc_free(request);
return;
Expand Down Expand Up @@ -687,7 +701,7 @@ static void proxy_reply_too_late(REQUEST *request)
* }
* \enddot
*/
static void request_done(REQUEST *request, int original)
void request_done(REQUEST *request, int original)
{
struct timeval now, when;
int action = original;
Expand Down Expand Up @@ -802,7 +816,17 @@ static void request_done(REQUEST *request, int original)
case FR_ACTION_DONE:
#ifdef HAVE_PTHREAD_H
/*
* If the child is still running, leave it alone.
* If the child is still queued or running, don't
* mark it as DONE.
*
* For queued requests, the request_free()
* function will mark up the request so that the
* queue will free it.
*
* For running requests, the child thread will
* eventually call request_done(). A timer in
* the master thread will then take care of
* cleaning up the request.
*/
if (spawn_flag && (request->child_state <= REQUEST_RUNNING)) {
break;
Expand Down Expand Up @@ -2013,6 +2037,7 @@ int request_receive(TALLOC_CTX *ctx, rad_listen_t *listener, RADIUS_PACKET *pack
/*
* Don't do delayed reject. Oh well.
*/
request->child_state = REQUEST_DONE;
request_free(request);
return 1;
}
Expand Down Expand Up @@ -6527,9 +6552,25 @@ static int request_delete_cb(UNUSED void *ctx, void *data)
/*
* Not done, or the child thread is still processing it.
*/
if (request->child_state < REQUEST_RESPONSE_DELAY) return 0; /* continue */
switch (request->child_state) {
default:
case REQUEST_QUEUED:
case REQUEST_RESPONSE_DELAY:
case REQUEST_CLEANUP_DELAY:
case REQUEST_DONE:
break;

case REQUEST_RUNNING:
case REQUEST_PROXIED:
return 0;
}

#ifdef HAVE_PTHREAD_H
/*
* The request is being processed by a child thread.
* This should never happen, but perhaps race condition
* could cause this to be set?
*/
if (pthread_equal(request->child_pid, NO_SUCH_CHILD_PID) == 0) return 0;
#endif

Expand Down Expand Up @@ -6566,6 +6607,16 @@ void radius_event_free(void)
{
ASSERT_MASTER;

#ifdef HAVE_PTHREAD_H
/*
* Stop all threads from processing requests. Do this
* before trying to clean up or free outstanding requests.
*/
if (spawn_flag) {
thread_pool_stop();
}
#endif

#ifdef WITH_PROXY
/*
* There are requests in the proxy hash that aren't
Expand All @@ -6584,7 +6635,7 @@ void radius_event_free(void)
* ensure that all of the threads have exited.
*/
#ifdef HAVE_PTHREAD_H
thread_pool_stop();
thread_pool_free();
#endif

/*
Expand Down
139 changes: 118 additions & 21 deletions src/main/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ typedef struct fr_pps_t {
} fr_pps_t;
#endif

/*
* In process.c, but no one else should be calling these
* functions.
*/
extern void request_free(REQUEST *request);
extern void request_done(REQUEST *request, int original);

#ifndef HAVE_STDATOMIC_H
static int request_discard_lower_priority(int priority);
#endif

/*
* A data structure to manage the thread pool. There's no real
Expand Down Expand Up @@ -375,6 +385,13 @@ int request_enqueue(REQUEST *request)
managed = true;
}

/*
* Update the request state.
*/
request->component = "<core>";
request->module = "<queue>";
request->child_state = REQUEST_QUEUED;

#ifdef HAVE_STDATOMIC_H
if (!managed) {
uint32_t num;
Expand All @@ -393,18 +410,12 @@ int request_enqueue(REQUEST *request)
}
}

/*
* Use atomic queues where possible. They're substantially faster than mutexes.
*/
request->component = "<core>";
request->module = "<queue>";
request->child_state = REQUEST_QUEUED;

/*
* Push the request onto the appropriate fifo for that
*/
if (!fr_atomic_queue_push(thread_pool.queue[request->priority], request)) {
ERROR("!!! ERROR !!! Failed inserting request %d into the queue", request->number);
RATE_LIMIT(ERROR("Something is blocking the server. There are too many packets in the queue, "
"waiting to be processed. Ignoring the new request."));
return 0;
}

Expand Down Expand Up @@ -499,27 +510,24 @@ int request_enqueue(REQUEST *request)

thread_pool.request_count++;

if (thread_pool.num_queued >= thread_pool.max_queue_size) {
/*
* If there are too many packets _overall_, then try to delete a lower priority one.
*/
if ((thread_pool.num_queued >= thread_pool.max_queue_size) &&
(request_discard_lower_priority(request->priority) == 0)) {
pthread_mutex_unlock(&thread_pool.queue_mutex);

/*
* Mark the request as done.
*/
RATE_LIMIT(ERROR("Something is blocking the server. There are %d packets in the queue, "
"waiting to be processed. Ignoring the new request.", thread_pool.num_queued));
return 0;
}

request->component = "<core>";
request->module = "<queue>";
request->child_state = REQUEST_QUEUED;

/*
* Push the request onto the appropriate fifo for that
* Push the request onto the appropriate fifo for that priority.
*/
if (!fr_fifo_push(thread_pool.fifo[request->priority], request)) {
pthread_mutex_unlock(&thread_pool.queue_mutex);
ERROR("!!! ERROR !!! Failed inserting request %d into the queue", request->number);
RATE_LIMIT(ERROR("Something is blocking the server. There are too many packets in the queue, "
"waiting to be processed. Ignoring the new request."));
return 0;
}

Expand All @@ -542,6 +550,31 @@ int request_enqueue(REQUEST *request)
return 1;
}

#ifndef HAVE_STDATOMIC_H
/*
* Try to free up requests by discarding requests of lower priority.
*/
static int request_discard_lower_priority(int priority)
{
int i;
REQUEST *request;

if (priority == 0) return 0;

for (i = NUM_FIFOS - 1; i < priority; i--) {
request = fr_fifo_pop(thread_pool.fifo[i]);
if (!request) continue;

fr_assert(request->child_state == REQUEST_QUEUED);
request->child_state = REQUEST_DONE;
request_done(request, FR_ACTION_DONE);
return 1;
}

return 0;
}
#endif

/*
* Remove a request from the queue.
*/
Expand Down Expand Up @@ -575,8 +608,32 @@ static int request_dequeue(REQUEST **prequest)

/*
* This entry was marked to be stopped. Acknowledge it.
*
* If we own the request, we delete it. Otherwise
* we run the "done" callback now, which will
* stop timers, remove it from the request hash,
* update listener counts, etc.
*
* Running request_done() here means that old
* requests are cleaned up immediately, which
* frees up more resources for new requests. It
* also means that we don't need to rely on
* timers to free up the old requests, as those
* timers will run much much later.
*
* Catching this corner case doesn't change the
* normal operation of the server. Most requests
* should NOT be marked "stop processing" when
* they're in the queue. This situation
* generally happens when the server is blocked,
* due to a slow back-end database.
*/
request->child_state = REQUEST_DONE;
if (request->master_state == REQUEST_TO_FREE) {
request_free(request);
} else {
request_done(request, REQUEST_DONE);
}
}

/*
Expand Down Expand Up @@ -631,7 +688,13 @@ static int request_dequeue(REQUEST **prequest)
request = fr_fifo_pop(thread_pool.fifo[i]);
rad_assert(request != NULL);
VERIFY_REQUEST(request);

request->child_state = REQUEST_DONE;
if (request->master_state == REQUEST_TO_FREE) {
request_free(request);
} else {
request_done(request, REQUEST_DONE);
}
thread_pool.num_queued--;
}

Expand Down Expand Up @@ -663,6 +726,7 @@ static int request_dequeue(REQUEST **prequest)

rad_assert(*prequest != NULL);
rad_assert(request->magic == REQUEST_MAGIC);
rad_assert(request->child_state == REQUEST_QUEUED);

request->component = "<core>";
request->module = "";
Expand Down Expand Up @@ -1196,10 +1260,30 @@ int thread_pool_init(CONF_SECTION *cs, bool *spawn_flag)
}
DIAG_ON(deprecated-declarations)

void thread_pool_stop(void)
{
int i, total_threads;

if (!pool_initialized) return;

/*
* Set pool stop flag.
*/
thread_pool.stop_flag = true;

/*
* Wakeup all threads to make them see stop flag.
*/
total_threads = thread_pool.total_threads;
for (i = 0; i != total_threads; i++) {
sem_post(&thread_pool.semaphore);
}
}

/*
* Stop all threads in the pool.
* Free all thread-related information
*/
void thread_pool_stop(void)
void thread_pool_free(void)
{
#ifndef WITH_GCD
int i;
Expand Down Expand Up @@ -1231,6 +1315,19 @@ void thread_pool_stop(void)
delete_thread(handle);
}

/*
* Free any requests which were blocked in the queue, but
* only if we're checking that no memory leaked.
*/
if (main_config.memory_report) {
REQUEST *request;

while (request_dequeue(&request) == 1) {
request->child_state = REQUEST_DONE;
request_free(request);
}
}

for (i = 0; i < NUM_FIFOS; i++) {
#ifdef HAVE_STDATOMIC_H
fr_atomic_queue_free(&thread_pool.queue[i]);
Expand Down
Loading

0 comments on commit 43e6c33

Please sign in to comment.