Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fetch interval to control database poll rate #717

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions django_q/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ class Conf:
# Optional attempt count. set to 0 for infinite attempts
MAX_ATTEMPTS = conf.get("max_attempts", 0)

# How often fetch is polling database
FETCH_INTERVAL = conf.get("fetch_interval", 0.01)

# OSX doesn't implement qsize because of missing sem_getvalue()
try:
QSIZE = Queue().qsize() == 0
Expand Down
18 changes: 9 additions & 9 deletions django_q/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def result_group_cached(group_id, failures=False, wait=0, count=None, broker=Non
sleep(0.01)


def fetch(task_id, wait=0, cached=Conf.CACHED):
def fetch(task_id, wait=0, cached=Conf.CACHED, interval=Conf.FETCH_INTERVAL):
"""
Return the processed task.

Expand All @@ -241,18 +241,18 @@ def fetch(task_id, wait=0, cached=Conf.CACHED):
:rtype: Task
"""
if cached:
return fetch_cached(task_id, wait)
return fetch_cached(task_id, wait, interval=interval)
start = time()
while True:
t = Task.get_task(task_id)
if t:
return t
if (time() - start) * 1000 >= wait >= 0:
break
sleep(0.01)
sleep(interval)


def fetch_cached(task_id, wait=0, broker=None):
def fetch_cached(task_id, wait=0, broker=None, interval=Conf.FETCH_INTERVAL):
"""
Return the processed task from the cache backend
"""
Expand All @@ -277,7 +277,7 @@ def fetch_cached(task_id, wait=0, broker=None):
)
if (time() - start) * 1000 >= wait >= 0:
break
sleep(0.01)
sleep(interval)


def fetch_group(group_id, failures=True, wait=0, count=None, cached=Conf.CACHED):
Expand All @@ -300,14 +300,14 @@ def fetch_group(group_id, failures=True, wait=0, count=None, cached=Conf.CACHED)
and (time() - start) * 1000 >= wait >= 0
):
break
sleep(0.01)
sleep(Conf.FETCH_INTERVAL)
while True:
r = Task.get_task_group(group_id, failures)
if r:
return r
if (time() - start) * 1000 >= wait >= 0:
break
sleep(0.01)
sleep(Conf.FETCH_INTERVAL)


def fetch_group_cached(group_id, failures=True, wait=0, count=None, broker=None):
Expand All @@ -325,7 +325,7 @@ def fetch_group_cached(group_id, failures=True, wait=0, count=None, broker=None)
and (time() - start) * 1000 >= wait >= 0
):
break
sleep(0.01)
sleep(Conf.FETCH_INTERVAL)
while True:
group_list = broker.cache.get(f"{broker.list_key}:{group_id}:keys")
if group_list:
Expand All @@ -350,7 +350,7 @@ def fetch_group_cached(group_id, failures=True, wait=0, count=None, broker=None)
return task_list
if (time() - start) * 1000 >= wait >= 0:
break
sleep(0.01)
sleep(Conf.FETCH_INTERVAL)


def count_group(group_id, failures=False, cached=Conf.CACHED):
Expand Down