Skip to content

Commit

Permalink
Add celery job to clean up old marc file exports (#2019)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathangreen authored Sep 4, 2024
1 parent 3f5d5ae commit 6d93cda
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 21 deletions.
24 changes: 24 additions & 0 deletions src/palace/manager/celery/tasks/marc.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,27 @@ def marc_export_collection(
task.log.info(
f"Finished generating MARC records for collection '{collection_name}' ({collection_id})."
)


@shared_task(queue=QueueNames.default, bind=True)
def marc_export_cleanup(
task: Task,
batch_size: int = 20,
) -> None:
"""
Cleanup old MARC exports that are outdated or no longer needed.
"""
storage_service = task.services.storage.public()
registry = task.services.integration_registry.catalog_services()
with task.session() as session:
for count, file_record in enumerate(
MarcExporter.files_for_cleanup(session, registry)
):
if count >= batch_size:
# Requeue ourselves after deleting `batch_size` files to avoid blocking the worker for too long.
raise task.replace(marc_export_cleanup.s())

task.log.info(f"Deleting MARC export {file_record.key} ({file_record.id}).")
storage_service.delete(file_record.key)
session.delete(file_record)
session.commit()
54 changes: 53 additions & 1 deletion src/palace/manager/marc/exporter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import datetime
from collections.abc import Iterable, Sequence
from collections.abc import Generator, Iterable, Sequence
from uuid import UUID, uuid4

import pytz
Expand Down Expand Up @@ -371,3 +371,55 @@ def create_marc_upload_records(
since=library_info.last_updated,
key=library_info.s3_key_delta,
)

@staticmethod
def files_for_cleanup(
session: Session, registry: CatalogServicesRegistry
) -> Generator[MarcFile, None, None]:
# Files for collections or libraries that have had exports disabled.
existing = {
(row.collection_id, row.library_id)
for row in session.execute(
select(MarcFile.collection_id, MarcFile.library_id).distinct()
).all()
}
enabled = {
(collection.id, integration.library_id)
for collection, integration in MarcExporter._enabled_collections_and_libraries(
session, registry
)
}

for collection_id, library_id in existing - enabled:
yield from session.execute(
select(MarcFile).where(
MarcFile.library_id == library_id,
MarcFile.collection_id == collection_id,
)
).scalars()

# Outdated exports
for collection_id, library_id in existing:
# Only keep the most recent full export for each library/collection pair.
yield from session.execute(
select(MarcFile)
.where(
MarcFile.library_id == library_id,
MarcFile.collection_id == collection_id,
MarcFile.since == None,
)
.order_by(MarcFile.created.desc())
.offset(1)
).scalars()

# Keep the most recent 12 delta exports for each library/collection pair.
yield from session.execute(
select(MarcFile)
.where(
MarcFile.library_id == library_id,
MarcFile.collection_id == collection_id,
MarcFile.since != None,
)
.order_by(MarcFile.created.desc())
.offset(12)
).scalars()
7 changes: 7 additions & 0 deletions src/palace/manager/service/celery/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ def beat_schedule() -> dict[str, Any]:
hour="3,11", minute="0"
), # Run twice a day at 3:00 AM and 11:00 AM
},
"marc_export_cleanup": {
"task": "marc.marc_export_cleanup",
"schedule": crontab(
hour="1",
minute="0",
), # Run every day at 1:00 AM
},
}


Expand Down
2 changes: 0 additions & 2 deletions src/palace/manager/sqlalchemy/model/marcfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class MarcFile(Base):

# The library should never be null in normal operation, but if a library is deleted, we don't want to lose the
# record of the MARC file, so we set the library to null.
# TODO: We need a job to clean up these records.
library_id = Column(
Integer,
ForeignKey("libraries.id", ondelete="SET NULL"),
Expand All @@ -35,7 +34,6 @@ class MarcFile(Base):

# The collection should never be null in normal operation, but similar to the library, if a collection is deleted,
# we don't want to lose the record of the MARC file, so we set the collection to null.
# TODO: We need a job to clean up these records.
collection_id = Column(
Integer,
ForeignKey("collections.id", ondelete="SET NULL"),
Expand Down
45 changes: 35 additions & 10 deletions tests/fixtures/marc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
from collections.abc import Sequence
from typing import Literal

import pytest

Expand All @@ -8,6 +9,7 @@
from palace.manager.marc.settings import MarcExporterLibrarySettings
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.sqlalchemy.model.integration import IntegrationConfiguration
from palace.manager.sqlalchemy.model.library import Library
from palace.manager.sqlalchemy.model.marcfile import MarcFile
from palace.manager.sqlalchemy.model.work import Work
from palace.manager.sqlalchemy.util import create
Expand Down Expand Up @@ -43,7 +45,9 @@ def __init__(
self.test_marc_file_key = "test-file-1.mrc"

def integration(self) -> IntegrationConfiguration:
return self._db.integration_configuration(MarcExporter, Goals.CATALOG_GOAL)
return self._db.integration_configuration(
MarcExporter, Goals.CATALOG_GOAL, name="MARC Exporter"
)

def work(self, collection: Collection | None = None) -> Work:
collection = collection or self.collection1
Expand All @@ -56,7 +60,7 @@ def work(self, collection: Collection | None = None) -> Work:
def works(self, collection: Collection | None = None) -> list[Work]:
return [self.work(collection) for _ in range(5)]

def configure_export(self) -> None:
def configure_export(self, *, marc_file: bool = True) -> None:
marc_integration = self.integration()
self._db.integration_library_configuration(
marc_integration,
Expand All @@ -73,14 +77,11 @@ def configure_export(self) -> None:
self.collection2.export_marc_records = True
self.collection3.export_marc_records = True

create(
self.session,
MarcFile,
library=self.library1,
collection=self.collection1,
key=self.test_marc_file_key,
created=utc_now() - datetime.timedelta(days=7),
)
if marc_file:
self.marc_file(
key=self.test_marc_file_key,
created=utc_now() - datetime.timedelta(days=7),
)

def enabled_libraries(
self, collection: Collection | None = None
Expand All @@ -91,6 +92,30 @@ def enabled_libraries(
self.session, self.registry, collection_id=collection.id
)

def marc_file(
self,
*,
key: str | None = None,
collection: Collection | None | Literal[False] = False,
library: Library | None | Literal[False] = False,
created: datetime.datetime | None = None,
since: datetime.datetime | None = None
) -> MarcFile:
collection = collection if collection is not False else self.collection1
library = library if library is not False else self.library1
key = key or self._db.fresh_str()
created = created or utc_now()
marc_file, _ = create(
self.session,
MarcFile,
library=library,
collection=collection,
key=key,
created=created,
since=since,
)
return marc_file


@pytest.fixture
def marc_exporter_fixture(
Expand Down
4 changes: 4 additions & 0 deletions tests/fixtures/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ def __init__(

self.upload_in_progress: dict[str, MockMultipartUpload] = {}
self.aborted: list[str] = []
self.deleted: list[str] = []

def delete(self, key: str) -> None:
self.deleted.append(key)

def store_stream(
self,
Expand Down
37 changes: 29 additions & 8 deletions tests/manager/celery/tasks/test_marc.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from typing import Any
from unittest.mock import ANY, call, patch

Expand All @@ -17,7 +18,6 @@
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.sqlalchemy.model.marcfile import MarcFile
from palace.manager.sqlalchemy.model.work import Work
from palace.manager.sqlalchemy.util import create
from palace.manager.util.datetime_helpers import utc_now
from tests.fixtures.celery import CeleryFixture
from tests.fixtures.database import DatabaseTransactionFixture
Expand Down Expand Up @@ -90,13 +90,8 @@ def test_skip_collections(
).acquire()

# Collection 2 should be skipped because it was updated recently
create(
db.session,
MarcFile,
library=marc_exporter_fixture.library1,
collection=marc_exporter_fixture.collection2,
created=utc_now(),
key="test-file-2.mrc",
marc_exporter_fixture.marc_file(
collection=marc_exporter_fixture.collection2
)

# Collection 3 should be skipped because its state is not INITIAL
Expand Down Expand Up @@ -319,3 +314,29 @@ def test_outdated_task_run(

assert marc_export_collection_fixture.marc_files() == []
assert marc_export_collection_fixture.redis_data(collection) is None


def test_marc_export_cleanup(
db: DatabaseTransactionFixture,
celery_fixture: CeleryFixture,
s3_service_fixture: S3ServiceFixture,
marc_exporter_fixture: MarcExporterFixture,
services_fixture: ServicesFixture,
):
marc_exporter_fixture.configure_export(marc_file=False)
mock_s3 = s3_service_fixture.mock_service()
services_fixture.services.storage.public.override(mock_s3)

not_deleted_id = marc_exporter_fixture.marc_file(created=utc_now()).id
deleted_keys = [
marc_exporter_fixture.marc_file(
created=utc_now() - datetime.timedelta(days=d + 1)
).key
for d in range(20)
]

marc.marc_export_cleanup.delay(batch_size=5).wait()

[not_deleted] = db.session.execute(select(MarcFile)).scalars().all()
assert not_deleted.id == not_deleted_id
assert mock_s3.deleted == deleted_keys
112 changes: 112 additions & 0 deletions tests/manager/marc/test_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,3 +423,115 @@ def test_create_marc_upload_records(
key=enabled_libraries[0].s3_key_delta,
since=enabled_libraries[0].last_updated,
)

def test_files_for_cleanup_deleted_disabled(
self, marc_exporter_fixture: MarcExporterFixture
) -> None:
marc_exporter_fixture.configure_export(marc_file=False)
files_for_cleanup = partial(
MarcExporter.files_for_cleanup,
marc_exporter_fixture.session,
marc_exporter_fixture.registry,
)

# If there are no files, then no files are returned.
assert set(files_for_cleanup()) == set()

# Files created for libraries or collections that have been deleted are returned.
collection1_library1 = marc_exporter_fixture.marc_file(
collection=marc_exporter_fixture.collection1,
library=marc_exporter_fixture.library1,
)
collection1_library2 = marc_exporter_fixture.marc_file(
collection=marc_exporter_fixture.collection1,
library=marc_exporter_fixture.library2,
)
collection2_library1 = marc_exporter_fixture.marc_file(
collection=marc_exporter_fixture.collection2,
library=marc_exporter_fixture.library1,
)
collection3_library2 = marc_exporter_fixture.marc_file(
collection=marc_exporter_fixture.collection3,
library=marc_exporter_fixture.library2,
)
deleted_collection = marc_exporter_fixture.marc_file(collection=None)
deleted_library = marc_exporter_fixture.marc_file(library=None)

assert set(files_for_cleanup()) == {deleted_collection, deleted_library}

# If a collection has export_marc_records set to False, then the files for that collection are returned.
marc_exporter_fixture.collection1.export_marc_records = False
assert set(files_for_cleanup()) == {
deleted_collection,
deleted_library,
collection1_library1,
collection1_library2,
}

# If a library has its marc exporter integration disabled, then the files for that library are returned.
library2_marc_integration = marc_exporter_fixture.integration().for_library(
marc_exporter_fixture.library2
)
assert library2_marc_integration is not None
marc_exporter_fixture.session.delete(library2_marc_integration)
assert set(files_for_cleanup()) == {
deleted_collection,
deleted_library,
collection1_library1,
collection1_library2,
collection3_library2,
}

def test_files_for_cleanup_outdated_full(
self, marc_exporter_fixture: MarcExporterFixture
) -> None:
marc_exporter_fixture.configure_export(marc_file=False)
files_for_cleanup = partial(
MarcExporter.files_for_cleanup,
marc_exporter_fixture.session,
marc_exporter_fixture.registry,
)

# Only a single full file is needed, the most recent, all other files are returned.
decoy = marc_exporter_fixture.marc_file(
collection=marc_exporter_fixture.collection2,
created=utc_now() - datetime.timedelta(days=15),
)
newest = marc_exporter_fixture.marc_file(created=utc_now())
outdated = {
marc_exporter_fixture.marc_file(
created=utc_now() - datetime.timedelta(days=d + 1)
)
for d in range(5)
}
assert set(files_for_cleanup()) == outdated

def test_files_for_cleanup_outdated_delta(
self, marc_exporter_fixture: MarcExporterFixture
) -> None:
marc_exporter_fixture.configure_export(marc_file=False)
files_for_cleanup = partial(
MarcExporter.files_for_cleanup,
marc_exporter_fixture.session,
marc_exporter_fixture.registry,
)

# The most recent 12 delta files are kept, all others are returned
last_week = utc_now() - datetime.timedelta(days=7)
decoy = marc_exporter_fixture.marc_file(
collection=marc_exporter_fixture.collection2,
created=utc_now() - datetime.timedelta(days=15),
since=last_week - datetime.timedelta(days=15),
)
kept = {
marc_exporter_fixture.marc_file(created=utc_now(), since=last_week)
for _ in range(12)
}
outdated = {
marc_exporter_fixture.marc_file(
created=last_week - datetime.timedelta(days=d),
since=last_week - datetime.timedelta(days=d + 1),
)
for d in range(20)
}
assert set(files_for_cleanup()) == outdated
Loading

0 comments on commit 6d93cda

Please sign in to comment.