Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert MARC Export to use Celery (PP-1472) #2017

Merged
merged 7 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,6 @@ docs/source/*
.DS_Store

src/palace/manager/core/_version.py

# Celery beat schedule file
celerybeat-schedule.db
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ grant all privileges on database circ to palace;
Redis is used as the broker for Celery and the caching layer. You can run Redis with docker using the following command:

```sh
docker run -d --name redis -p 6379:6379 redis
docker run -d --name redis -p 6379:6379 redis/redis-stack-server
jonathangreen marked this conversation as resolved.
Show resolved Hide resolved
```

### Environment variables
Expand Down
6 changes: 0 additions & 6 deletions bin/cache_marc_files

This file was deleted.

2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ services:
retries: 5

redis:
image: "redis:7"
image: "redis/redis-stack-server:7.4.0-v0"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
Expand Down
3 changes: 0 additions & 3 deletions docker/services/cron/cron.d/circulation
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ HOME=/var/www/circulation
# Sync a library's collection with NoveList
0 0 * * 0 root bin/run -d 60 novelist_update >> /var/log/cron.log 2>&1

# Generate MARC files for libraries that have a MARC exporter configured.
0 3,11 * * * root bin/run cache_marc_files >> /var/log/cron.log 2>&1

# The remaining scripts keep the circulation manager in sync with
# specific types of collections.

Expand Down
4 changes: 2 additions & 2 deletions src/palace/manager/api/admin/controller/catalog_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
)
from palace.manager.api.admin.form_data import ProcessFormData
from palace.manager.api.admin.problem_details import MULTIPLE_SERVICES_FOR_LIBRARY
from palace.manager.core.marc import MARCExporter
from palace.manager.integration.goals import Goals
from palace.manager.integration.settings import BaseSettings
from palace.manager.marc.exporter import MarcExporter
from palace.manager.sqlalchemy.listeners import site_configuration_has_changed
from palace.manager.sqlalchemy.model.integration import (
IntegrationConfiguration,
Expand All @@ -21,7 +21,7 @@


class CatalogServicesController(
IntegrationSettingsController[MARCExporter],
IntegrationSettingsController[MarcExporter],
jonathangreen marked this conversation as resolved.
Show resolved Hide resolved
AdminPermissionsControllerMixin,
):
def process_catalog_services(self) -> Response | ProblemDetail:
Expand Down
5 changes: 4 additions & 1 deletion src/palace/manager/api/circulation_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,10 @@ def setup_one_time_controllers(self):
"""
self.index_controller = IndexController(self)
self.opds_feeds = OPDSFeedController(self)
self.marc_records = MARCRecordController(self.services.storage.public())
self.marc_records = MARCRecordController(
self.services.storage.public(),
self.services.integration_registry.catalog_services(),
)
self.loans = LoanController(self)
self.annotations = AnnotationController(self)
self.urn_lookup = URNLookupController(self)
Expand Down
16 changes: 11 additions & 5 deletions src/palace/manager/api/controller/marc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
from sqlalchemy import select
from sqlalchemy.orm import Session

from palace.manager.core.marc import MARCExporter
from palace.manager.integration.goals import Goals
from palace.manager.marc.exporter import MarcExporter
from palace.manager.service.integration_registry.catalog_services import (
CatalogServicesRegistry,
)
from palace.manager.service.storage.s3 import S3Service
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.sqlalchemy.model.integration import (
Expand Down Expand Up @@ -49,21 +52,24 @@ class MARCRecordController:
</body>
</html>"""

def __init__(self, storage_service: S3Service | None) -> None:
def __init__(
self, storage_service: S3Service | None, registry: CatalogServicesRegistry
) -> None:
self.storage_service = storage_service
self.registry = registry

@staticmethod
def library() -> Library:
return flask.request.library # type: ignore[no-any-return,attr-defined]

@staticmethod
def has_integration(session: Session, library: Library) -> bool:
def has_integration(self, session: Session, library: Library) -> bool:
protocols = self.registry.get_protocols(MarcExporter)
integration_query = (
select(IntegrationLibraryConfiguration)
.join(IntegrationConfiguration)
.where(
IntegrationConfiguration.goal == Goals.CATALOG_GOAL,
IntegrationConfiguration.protocol == MARCExporter.__name__,
IntegrationConfiguration.protocol.in_(protocols),
IntegrationLibraryConfiguration.library == library,
)
)
Expand Down
161 changes: 161 additions & 0 deletions src/palace/manager/celery/tasks/marc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import datetime
from typing import Any

from celery import shared_task

from palace.manager.celery.task import Task
from palace.manager.marc.exporter import LibraryInfo, MarcExporter
from palace.manager.marc.uploader import MarcUploadManager
from palace.manager.service.celery.celery import QueueNames
from palace.manager.service.redis.models.marc import (
MarcFileUploadSession,
MarcFileUploadState,
)
from palace.manager.util.datetime_helpers import utc_now


@shared_task(queue=QueueNames.default, bind=True)
def marc_export(task: Task, force: bool = False) -> None:
"""
Export MARC records for all collections with the `export_marc_records` flag set to True, whose libraries
have a MARC exporter integration enabled.
"""

with task.session() as session:
registry = task.services.integration_registry.catalog_services()
start_time = utc_now()
collections = MarcExporter.enabled_collections(session, registry)
for collection in collections:
# Collection.id should never be able to be None here, but mypy doesn't know that.
# So we assert it for mypy's benefit.
assert collection.id is not None
upload_session = MarcFileUploadSession(
task.services.redis.client(), collection.id
)
with upload_session.lock() as acquired:
if not acquired:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because another task holds its lock."
)
continue

if (
upload_state := upload_session.state()
) != MarcFileUploadState.INITIAL:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it is already being "
f"processed (state: {upload_state})."
)
continue

libraries_info = MarcExporter.enabled_libraries(
session, registry, collection.id
)
needs_update = (
any(info.needs_update for info in libraries_info) or force
)

if not needs_update:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it has been updated recently."
)
continue

works = MarcExporter.query_works(
session,
collection.id,
work_id_offset=0,
batch_size=1,
)
if not works:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it has no works."
)
continue

task.log.info(
f"Generating MARC records for collection {collection.name} ({collection.id})."
)
upload_session.set_state(MarcFileUploadState.QUEUED)
marc_export_collection.delay(
collection_id=collection.id,
start_time=start_time,
libraries=[l.dict() for l in libraries_info],
)


@shared_task(queue=QueueNames.default, bind=True)
def marc_export_collection(
task: Task,
collection_id: int,
start_time: datetime.datetime,
libraries: list[dict[str, Any]],
batch_size: int = 500,
last_work_id: int | None = None,
update_number: int = 0,
) -> None:
"""
Export MARC records for a single collection.

This task is designed to be re-queued until all works in the collection have been processed,
this can take some time, however each individual task should complete quickly, so that it
doesn't block other tasks from running.
"""

base_url = task.services.config.sitewide.base_url()
storage_service = task.services.storage.public()
libraries_info = [LibraryInfo.parse_obj(l) for l in libraries]
upload_manager = MarcUploadManager(
storage_service,
MarcFileUploadSession(
task.services.redis.client(), collection_id, update_number
),
)
with upload_manager.begin():
if not upload_manager.locked:
task.log.info(
f"Skipping collection {collection_id} because another task is already processing it."
)
return

with task.session() as session:
works = MarcExporter.query_works(
session,
collection_id,
work_id_offset=last_work_id,
batch_size=batch_size,
)
for work in works:
MarcExporter.process_work(
work, libraries_info, base_url, upload_manager=upload_manager
)

# Sync the upload_manager to ensure that all the data is written to storage.
upload_manager.sync()

if len(works) == batch_size:
# This task is complete, but there are more works waiting to be exported. So we requeue ourselves
# to process the next batch.
raise task.replace(
marc_export_collection.s(
collection_id=collection_id,
start_time=start_time,
libraries=[l.dict() for l in libraries_info],
batch_size=batch_size,
last_work_id=works[-1].id,
update_number=upload_manager.update_number,
)
)

# If we got here, we have finished generating MARC records. Cleanup and exit.
with task.transaction() as session:
collection = MarcExporter.collection(session, collection_id)
collection_name = collection.name if collection else "unknown"
completed_uploads = upload_manager.complete()
MarcExporter.create_marc_upload_records(
session, start_time, collection_id, libraries_info, completed_uploads
)
upload_manager.remove_session()
task.log.info(
f"Finished generating MARC records for collection '{collection_name}' ({collection_id})."
)
Empty file.
Loading