From 8b4aedf085a9dffbd116ec45ba32218a7784a3ac Mon Sep 17 00:00:00 2001 From: Tor-Christian Eriksen Date: Thu, 2 Feb 2023 14:15:14 +0100 Subject: [PATCH] Add fetch interval to control database poll rate --- django_q/conf.py | 3 +++ django_q/tasks.py | 18 +++++++++--------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/django_q/conf.py b/django_q/conf.py index 3a8f3982..7f2bc2f5 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -179,6 +179,9 @@ class Conf: # Optional attempt count. set to 0 for infinite attempts MAX_ATTEMPTS = conf.get("max_attempts", 0) + # How often fetch is polling database + FETCH_INTERVAL = conf.get("fetch_interval", 0.01) + # OSX doesn't implement qsize because of missing sem_getvalue() try: QSIZE = Queue().qsize() == 0 diff --git a/django_q/tasks.py b/django_q/tasks.py index a6694e16..e4327ffd 100644 --- a/django_q/tasks.py +++ b/django_q/tasks.py @@ -228,7 +228,7 @@ def result_group_cached(group_id, failures=False, wait=0, count=None, broker=Non sleep(0.01) -def fetch(task_id, wait=0, cached=Conf.CACHED): +def fetch(task_id, wait=0, cached=Conf.CACHED, interval=Conf.FETCH_INTERVAL): """ Return the processed task. @@ -241,7 +241,7 @@ def fetch(task_id, wait=0, cached=Conf.CACHED): :rtype: Task """ if cached: - return fetch_cached(task_id, wait) + return fetch_cached(task_id, wait, interval=interval) start = time() while True: t = Task.get_task(task_id) @@ -249,10 +249,10 @@ def fetch(task_id, wait=0, cached=Conf.CACHED): return t if (time() - start) * 1000 >= wait >= 0: break - sleep(0.01) + sleep(interval) -def fetch_cached(task_id, wait=0, broker=None): +def fetch_cached(task_id, wait=0, broker=None, interval=Conf.FETCH_INTERVAL): """ Return the processed task from the cache backend """ @@ -277,7 +277,7 @@ def fetch_cached(task_id, wait=0, broker=None): ) if (time() - start) * 1000 >= wait >= 0: break - sleep(0.01) + sleep(interval) def fetch_group(group_id, failures=True, wait=0, count=None, cached=Conf.CACHED): @@ -300,14 +300,14 @@ def fetch_group(group_id, failures=True, wait=0, count=None, cached=Conf.CACHED) and (time() - start) * 1000 >= wait >= 0 ): break - sleep(0.01) + sleep(Conf.FETCH_INTERVAL) while True: r = Task.get_task_group(group_id, failures) if r: return r if (time() - start) * 1000 >= wait >= 0: break - sleep(0.01) + sleep(Conf.FETCH_INTERVAL) def fetch_group_cached(group_id, failures=True, wait=0, count=None, broker=None): @@ -325,7 +325,7 @@ def fetch_group_cached(group_id, failures=True, wait=0, count=None, broker=None) and (time() - start) * 1000 >= wait >= 0 ): break - sleep(0.01) + sleep(Conf.FETCH_INTERVAL) while True: group_list = broker.cache.get(f"{broker.list_key}:{group_id}:keys") if group_list: @@ -350,7 +350,7 @@ def fetch_group_cached(group_id, failures=True, wait=0, count=None, broker=None) return task_list if (time() - start) * 1000 >= wait >= 0: break - sleep(0.01) + sleep(Conf.FETCH_INTERVAL) def count_group(group_id, failures=False, cached=Conf.CACHED):