Skip to content

Commit

Permalink
feat: Add Collection.active_job foreign key to simplify logic.
Browse files Browse the repository at this point in the history
admin: Order jobs by descending ID on collection form. Prevent manual creation of jobs.
  • Loading branch information
jpmckinney committed Nov 5, 2024
1 parent dcf29d5 commit a96af3d
Show file tree
Hide file tree
Showing 13 changed files with 276 additions and 171 deletions.
53 changes: 15 additions & 38 deletions data_registry/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from django import forms
from django.conf import settings
from django.contrib import admin, messages
from django.db.models import BooleanField, Case, Exists, OuterRef, Q, When
from django.db.models import Exists, F, OuterRef, Q
from django.forms.widgets import TextInput
from django.urls import NoReverseMatch, reverse
from django.utils import timezone
Expand Down Expand Up @@ -57,12 +57,6 @@ class CollectionAdminForm(forms.ModelForm):
help_text="The name of the spider in Kingfisher Collect. If a new spider is not listed, Kingfisher Collect "
"needs to be re-deployed to the registry's server.",
)
active_job = forms.ModelChoiceField(
queryset=None,
required=False,
help_text="A job is a set of tasks to collect and process data from a publication. A job can be selected once "
"it is completed. If a new job completes, it becomes the active job.",
)
country_flag = forms.ChoiceField(choices=[(None, "---------")], required=False)

def __init__(self, *args, request=None, **kwargs):
Expand All @@ -89,24 +83,16 @@ def __init__(self, *args, request=None, **kwargs):

# https://docs.djangoproject.com/en/4.2/ref/forms/fields/#fields-which-handle-relationships
# `self.instance.job_set` doesn't work, because `self.instance` might be an unsaved publication.
self.fields["active_job"].queryset = Job.objects.filter(collection=self.instance).complete()
self.fields["active_job"].initial = Job.objects.filter(collection=self.instance).active().first()
#
# It's not obvious how to use limit_choices_to to filter jobs by collection.
# https://docs.djangoproject.com/en/4.2/ref/models/fields/#django.db.models.ForeignKey.limit_choices_to
self.fields["active_job"].queryset = (
Job.objects.filter(collection=self.instance).complete().order_by(F("id").desc())
)

# Populate choices in the form, not the model, for easier migration between icon sets.
self.fields["country_flag"].choices += sorted((f.name, f.name) for f in FLAGS_DIR.iterdir() if f.is_file())

def save(self, *args, **kwargs):
jobs = self.instance.job_set

active_job = self.cleaned_data["active_job"]
if active_job:
if not active_job.active:
jobs.update(active=Case(When(id=active_job.id, then=True), default=False, output_field=BooleanField()))
elif self.instance.pk:
jobs.update(active=False)

return super().save(*args, **kwargs)

class Meta:
widgets = {
"title": TextInput(attrs={"class": "vTextField"}),
Expand All @@ -117,21 +103,6 @@ class Meta:
}


class UnavailableFilter(admin.SimpleListFilter):
title = _("no active job")

parameter_name = "unavailable"

def lookups(self, request, model_admin):
return (("1", _("Yes")),)

def queryset(self, request, queryset):
if self.value() == "1":
active_jobs = Job.objects.active().filter(collection=OuterRef("pk"))
return queryset.exclude(Exists(active_jobs))
return None


class IncompleteFilter(admin.SimpleListFilter):
title = _("incomplete")

Expand Down Expand Up @@ -279,7 +250,7 @@ class CollectionAdmin(CascadeTaskMixin, TabbedDjangoJqueryTranslationAdmin):
"retrieval_frequency",
("license_custom", admin.EmptyFieldListFilter),
("summary_en", admin.EmptyFieldListFilter),
UnavailableFilter,
("active_job", admin.EmptyFieldListFilter),
IncompleteFilter,
UntranslatedFilter,
("last_reviewed", CustomDateFieldListFilter),
Expand Down Expand Up @@ -352,7 +323,9 @@ class CollectionAdmin(CascadeTaskMixin, TabbedDjangoJqueryTranslationAdmin):

def get_form(self, request, obj=None, **kwargs):
kwargs["form"] = partialclass(self.form, request=request)
return super().get_form(request, obj, **kwargs)
form = super().get_form(request, obj, **kwargs)
form.base_fields["active_job"].widget.can_add_related = False
return form


class LicenseAdminForm(forms.ModelForm):
Expand Down Expand Up @@ -516,6 +489,10 @@ class Media:
def country(self, obj):
return obj.collection.country

@admin.display(description="Active")
def active(self, obj):
return obj.id == obj.collection.active_job_id

@admin.display(description="Last completed task")
def last_task(self, obj):
last_completed_task = (
Expand Down
36 changes: 36 additions & 0 deletions data_registry/migrations/0050_collection_active_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Generated by Django 4.2.15 on 2024-11-04 22:47

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("data_registry", "0049_alter_collection_license_custom_and_more"),
]

operations = [
migrations.AddField(
model_name="collection",
name="active_job",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="+",
to="data_registry.job",
verbose_name="active job",
help_text="A job is a set of tasks to collect and process data from a publication. A job can be selected once it is completed. If a new job completes, it becomes the active job.",
),
),
migrations.RunSQL(
"""
UPDATE data_registry_collection c
SET active_job_id = j.id
FROM data_registry_job j
WHERE
j.collection_id = c.id
AND active = TRUE
"""
),
]
16 changes: 16 additions & 0 deletions data_registry/migrations/0051_remove_task_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Generated by Django 4.2.15 on 2024-11-04 23:13

from django.db import migrations


class Migration(migrations.Migration):
dependencies = [
("data_registry", "0050_collection_active_job"),
]

operations = [
migrations.RemoveField(
model_name="task",
name="context",
),
]
26 changes: 13 additions & 13 deletions data_registry/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ def format_datetime(dt):


class JobQuerySet(models.QuerySet):
def active(self):
"""Return a query set of active jobs."""
return self.filter(active=True)

def complete(self):
"""Return a query set of complete jobs."""
return self.filter(status=Job.Status.COMPLETED)
Expand Down Expand Up @@ -134,9 +130,7 @@ def complete(self):
class CollectionQuerySet(models.QuerySet):
def visible(self):
"""Return a query set of public collections with active jobs."""
# https://docs.djangoproject.com/en/4.2/ref/models/expressions/#some-examples
active_jobs = Job.objects.active().filter(collection=models.OuterRef("pk"))
return self.filter(models.Exists(active_jobs), public=True)
return self.filter(public=True, active_job__isnull=False)


class Collection(models.Model):
Expand Down Expand Up @@ -279,9 +273,18 @@ class UpdateFrequency(models.TextChoices):
"the publication is updated.",
)
last_retrieved = models.DateField(
blank=True, null=True, help_text="The date on which the most recent 'collect' job task completed."
)
active_job = models.ForeignKey(
"Job",
on_delete=models.RESTRICT,
blank=True,
null=True,
help_text="The date on which the most recent 'collect' job task completed.",
db_index=True,
verbose_name="active job",
related_name="+",
help_text="A job is a set of tasks to collect and process data from a publication. "
"A job can be selected once it is completed. If a new job completes, it becomes the active job.",
)

# Visibility logic
Expand All @@ -307,10 +310,6 @@ def __str__(self):
def __repr__(self):
return f"{self.country}: {self}"

@property
def active_job(self):
return self.job_set.active().first()

def is_out_of_date(self):
"""
Return whether the publication is out-of-date.
Expand Down Expand Up @@ -417,7 +416,8 @@ class Type(models.TextChoices):
# Task result
result = models.TextField(choices=Result.choices, blank=True)
note = models.TextField(blank=True, help_text="Metadata about any failure.")
context = models.JSONField(blank=True, default=dict)

# Job logic (see `create_tasks`)
type = models.TextField(choices=Type.choices, blank=True)
order = models.IntegerField(blank=True, null=True)

Expand Down
6 changes: 1 addition & 5 deletions data_registry/process_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from django.conf import settings
from django.db import transaction
from django.db.models import BooleanField, Case, When

from data_registry import models
from data_registry.exceptions import IrrecoverableError, RecoverableError
Expand Down Expand Up @@ -116,10 +115,7 @@ def process(collection: models.Collection) -> None:
job.complete()

collection.last_retrieved = job.task_set.get(type=settings.JOB_TASKS_PLAN[0]).end
collection.active_job = job
collection.save()

collection.job_set.update(
active=Case(When(id=job.id, then=True), default=False, output_field=BooleanField())
)

logger.debug("Job %s has succeeded (%s: %s)", job, country, collection)
2 changes: 1 addition & 1 deletion data_registry/templates/search.html
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ <h2>{{ collection.country }}: {{ collection.title }}</h2>
<dl class="mb-0 mb-md-n2 list-desc-inline">
<dt>{% translate "Data date range:" %}</dt>
<dd>
{{ collection.date_from|date:"M Y" }} - {{ collection.date_to|date:"M Y" }}
{{ collection.active_job.date_from|date:"M Y" }} - {{ collection.active_job.date_to|date:"M Y" }}
{% if collection.retrieval_frequency == never %}
({{ collection.get_retrieval_frequency_display }})
{% endif %}
Expand Down
60 changes: 30 additions & 30 deletions data_registry/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dateutil.relativedelta import relativedelta
from django.conf import settings
from django.contrib.postgres.aggregates import ArrayAgg
from django.db.models import Count, OuterRef, Q, Subquery
from django.db.models import Count, F, Q
from django.db.models.functions import Substr
from django.http.response import FileResponse, HttpResponse, HttpResponseBadRequest, HttpResponseNotFound, JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
Expand Down Expand Up @@ -80,16 +80,13 @@ def facet_counts(qs, key):
"amendments_count": _("Amendments data"),
}

# https://docs.djangoproject.com/en/4.2/ref/models/expressions/#subquery-expressions
active_job = Job.objects.active().filter(collection=OuterRef("pk"))[:1]
qs = collection_queryset(request).annotate(
job_id=Subquery(active_job.values("pk")),
# Display
date_from=Subquery(active_job.values("date_from")),
date_to=Subquery(active_job.values("date_to")),
# Filter
letter=Substr(f"country_{language_code}", 1, 1),
**{count: Subquery(active_job.values(count)) for count in counts},
qs = (
collection_queryset(request)
.select_related("active_job")
.annotate(
letter=Substr(f"country_{language_code}", 1, 1),
**{count: F(f"active_job__{count}") for count in counts},
)
)

filter_args = []
Expand All @@ -100,7 +97,7 @@ def facet_counts(qs, key):
if "date_range" in request.GET:
date_limit = date_limits.get(request.GET["date_range"])
if date_limit:
filter_args.append(Q(date_from__gte=date_limit) | Q(date_to__gte=date_limit))
filter_args.append(Q(active_job__date_from__gte=date_limit) | Q(active_job__date_to__gte=date_limit))
if "update_frequency" in request.GET:
filter_kwargs["update_frequency__in"] = request.GET.getlist("update_frequency")
if "region" in request.GET:
Expand All @@ -123,18 +120,23 @@ def facet_counts(qs, key):
facets["frequencies"][value] = n
for value, n in facet_counts(qs, "region"):
facets["regions"][value] = n
for row in without_filter(qs, args=False).values("date_from", "date_to"):
for row in without_filter(qs, args=False).values("active_job__date_from", "active_job__date_to"):
facets["date_ranges"][""] += 1
for value, limit in date_limits.items():
if row["date_from"] and row["date_from"] >= limit or row["date_to"] and row["date_to"] >= limit:
if (
row["active_job__date_from"]
and row["active_job__date_from"] >= limit
or row["active_job__date_to"]
and row["active_job__date_to"] >= limit
):
facets["date_ranges"][value] += 1

for lookup, value in exclude.items():
qs = qs.exclude(**{lookup: value})
qs = qs.filter(*filter_args, **filter_kwargs).order_by("country", "title")

for collection in qs:
collection.files = Export.get_files(collection.job_id)
collection.files = Export.get_files(collection.active_job_id)
for value in counts:
if getattr(collection, value):
facets["counts"][value] += 1
Expand All @@ -160,7 +162,7 @@ def detail(request, pk):
)

job = collection.active_job
files = Export.get_files(job and job.id)
files = Export.get_files(collection.active_job_id)

return render(
request,
Expand All @@ -177,9 +179,8 @@ def download_export(request, pk):
return HttpResponseBadRequest("The name query string parameter is invalid")

collection = get_object_or_404(collection_queryset(request), pk=pk)
active_job = get_object_or_404(collection.job_set.active())

export = Export(active_job.id, basename=name)
export = Export(collection.active_job_id, basename=name)
if export.status != TaskStatus.COMPLETED:
return HttpResponseNotFound("File not found")

Expand All @@ -194,30 +195,29 @@ def download_export(request, pk):


def publications_api(request):
active_job = Job.objects.active().filter(collection=OuterRef("pk"))[:1]
publications = (
collection_queryset(request)
.select_related("active_job")
.values(
# Identification
"id",
"title",
# Spatial coverage
"country",
"region",
# Language
"language",
# Accrual periodicity
"last_retrieved",
"retrieval_frequency",
"update_frequency",
"frozen",
# Provenance
"source_id",
"source_url",
# Other details
"region",
"language",
)
.annotate(
date_from=Subquery(active_job.values("date_from")),
date_to=Subquery(active_job.values("date_to")),
# Job logic
"frozen",
"source_id",
"retrieval_frequency",
"last_retrieved",
)
.annotate(date_from=F("active_job__date_from"), date_to=F("active_job__date_to"))
)
return JsonResponse(
list(publications),
Expand Down
12 changes: 12 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from data_registry.models import Task


class TestTask:
def run(self):
pass

def get_status(self):
return Task.Status.COMPLETED

def wipe(self):
pass
Loading

0 comments on commit a96af3d

Please sign in to comment.