From d491e993342b951f9b9052933722d406d8d3924c Mon Sep 17 00:00:00 2001 From: dbernstein Date: Fri, 19 Apr 2024 11:38:08 -0700 Subject: [PATCH] [PP-1157] convert inventory and hold reports to celery (#1794) --- api/admin/controller/report.py | 31 +- bin/delete_old_deferred_tasks | 12 - bin/generate_inventory_reports | 12 - core/celery/job.py | 2 +- .../generate_inventory_and_hold_reports.py | 288 +++++++++++++++ core/scripts.py | 348 +----------------- docker/services/cron/cron.d/circulation | 4 - tests/api/admin/controller/test_report.py | 48 +-- ...est_generate_inventory_and_hold_reports.py | 286 ++++++++++++++ tests/core/test_scripts.py | 309 +--------------- 10 files changed, 622 insertions(+), 718 deletions(-) delete mode 100755 bin/delete_old_deferred_tasks delete mode 100755 bin/generate_inventory_reports create mode 100644 core/celery/tasks/generate_inventory_and_hold_reports.py create mode 100644 tests/core/celery/tasks/test_generate_inventory_and_hold_reports.py diff --git a/api/admin/controller/report.py b/api/admin/controller/report.py index 8155216063..d90860f3c1 100644 --- a/api/admin/controller/report.py +++ b/api/admin/controller/report.py @@ -1,18 +1,15 @@ import json -from dataclasses import asdict from http import HTTPStatus import flask from flask import Response from sqlalchemy.orm import Session -from core.model import Library -from core.model.admin import Admin -from core.model.deferredtask import ( - DeferredTaskType, - InventoryReportTaskData, - queue_task, +from core.celery.tasks.generate_inventory_and_hold_reports import ( + generate_inventory_and_hold_reports, ) +from core.model import Library, MediaTypes +from core.model.admin import Admin from core.problem_details import INTERNAL_SERVER_ERROR from core.util.log import LoggerMixin from core.util.problem_detail import ProblemDetail @@ -31,21 +28,21 @@ def generate_inventory_report(self) -> Response | ProblemDetail: assert admin.id assert library.id - data: InventoryReportTaskData = InventoryReportTaskData( - admin_email=admin.email, admin_id=admin.id, library_id=library.id - ) - task, is_new = queue_task( - self._db, task_type=DeferredTaskType.INVENTORY_REPORT, data=asdict(data) + task = generate_inventory_and_hold_reports.delay( + email_address=admin.email, library_id=library.id ) msg = ( - f"An inventory report request was {'already' if not is_new else ''} received at {task.created}. " - f"When processing is complete, the report will be sent to {admin.email}." + f"An inventory and hold report request was received. Report processing can take a few minutes to " + f"finish depending on current server load. The completed reports will be sent to {admin.email}." ) - self.log.info(msg + f" {task}") - http_status = HTTPStatus.ACCEPTED if is_new else HTTPStatus.CONFLICT - return Response(json.dumps(dict(message=msg)), http_status) + self.log.info(msg + f"(Task Request Id: {task.id})") + return Response( + json.dumps(dict(message=msg)), + HTTPStatus.ACCEPTED, + mimetype=MediaTypes.APPLICATION_JSON_MEDIA_TYPE, + ) except Exception as e: msg = f"failed to generate inventory report request: {e}" self.log.error(msg=msg, exc_info=e) diff --git a/bin/delete_old_deferred_tasks b/bin/delete_old_deferred_tasks deleted file mode 100755 index e2a4d811e6..0000000000 --- a/bin/delete_old_deferred_tasks +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env python -"""Delete completed deferred tasks over 30 days old.""" -import os -import sys - -bin_dir = os.path.split(__file__)[0] -package_dir = os.path.join(bin_dir, "..") -sys.path.append(os.path.abspath(package_dir)) - -from core.scripts import DeleteOldDeferredTasks - -DeleteOldDeferredTasks().run() diff --git a/bin/generate_inventory_reports b/bin/generate_inventory_reports deleted file mode 100755 index 3fff5d7d07..0000000000 --- a/bin/generate_inventory_reports +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env python -"""Update the cached sizes of all custom lists.""" -import os -import sys - -bin_dir = os.path.split(__file__)[0] -package_dir = os.path.join(bin_dir, "..") -sys.path.append(os.path.abspath(package_dir)) - -from core.scripts import GenerateInventoryReports - -GenerateInventoryReports().run() diff --git a/core/celery/job.py b/core/celery/job.py index 9ba08ba6ba..e8ea23c3d4 100644 --- a/core/celery/job.py +++ b/core/celery/job.py @@ -14,7 +14,7 @@ class Job(LoggerMixin, SessionMixin, ABC): This class provides a few helper methods for our jobs to use, such as a logger and a session context manager. This class is and should remain - runable outside the context of a Celery worker. That way we are able to + runnable outside the context of a Celery worker. That way we are able to test our jobs fully outside the Celery worker. This class purposefully does not open a SqlAlchemy session for the job, diff --git a/core/celery/tasks/generate_inventory_and_hold_reports.py b/core/celery/tasks/generate_inventory_and_hold_reports.py new file mode 100644 index 0000000000..3d4fb25dd1 --- /dev/null +++ b/core/celery/tasks/generate_inventory_and_hold_reports.py @@ -0,0 +1,288 @@ +from __future__ import annotations + +import csv +import os +import tempfile +from datetime import datetime +from pathlib import Path +from typing import Any + +from celery import shared_task +from sqlalchemy import not_, select, text +from sqlalchemy.orm import Session, sessionmaker + +from api.integration.registry.license_providers import LicenseProvidersRegistry +from core.celery.job import Job +from core.celery.task import Task +from core.integration.goals import Goals +from core.model import ( + IntegrationConfiguration, + IntegrationLibraryConfiguration, + Library, + get_one, +) +from core.opds_import import OPDSImporterSettings +from core.service.celery.celery import QueueNames +from core.service.email.email import SendEmailCallable + + +class GenerateInventoryAndHoldsReportsJob(Job): + def __init__( + self, + session_maker: sessionmaker[Session], + library_id: int, + email_address: str, + send_email: SendEmailCallable, + delete_attachments: bool = True, + ): + super().__init__(session_maker) + self.library_id = library_id + self.email_address = email_address + self.delete_attachments = delete_attachments + self.send_email = send_email + + def run(self) -> None: + with self.transaction() as session: + library = get_one(session, Library, id=self.library_id) + + if not library: + self.log.error( + f"Cannot generate inventory and holds report for library (id={self.library_id}): " + f"library not found." + ) + return + + try: + current_time = datetime.now() + date_str = current_time.strftime("%Y-%m-%d_%H:%M:%s") + attachments: dict[str, Path] = {} + + file_name_modifier = f"{library.short_name}-{date_str}" + + # resolve integrations + integrations = session.scalars( + select(IntegrationConfiguration) + .join(IntegrationLibraryConfiguration) + .where( + IntegrationLibraryConfiguration.library_id == self.library_id, + IntegrationConfiguration.goal == Goals.LICENSE_GOAL, + not_( + IntegrationConfiguration.settings_dict.contains( + {"include_in_inventory_report": False} + ) + ), + ) + ).all() + registry = LicenseProvidersRegistry() + integration_ids: list[int] = [] + for integration in integrations: + settings = registry[integration.protocol].settings_load(integration) + if not isinstance(settings, OPDSImporterSettings): + continue + integration_ids.append(integration.id) + + # generate inventory report csv file + sql_params: dict[str, Any] = { + "library_id": library.id, + "integration_ids": tuple(integration_ids), + } + + inventory_report_file_path = self.generate_inventory_report( + session, sql_params=sql_params + ) + + # generate holds report csv file + holds_report_file_path = self.generate_holds_report( + session, sql_params=sql_params + ) + + attachments[f"palace-inventory-report-{file_name_modifier}.csv"] = Path( + inventory_report_file_path + ) + attachments[f"palace-holds-report-{file_name_modifier}.csv"] = Path( + holds_report_file_path + ) + + self.send_email( + subject=f"Inventory and Holds Reports {current_time}", + receivers=[self.email_address], + text="", + attachments=attachments, + ) + finally: + if self.delete_attachments: + for file_path in attachments.values(): + os.remove(file_path) + + def generate_inventory_report( + self, _db: Session, sql_params: dict[str, Any] + ) -> str: + """Generate an inventory csv file and return the file path""" + return self.generate_csv_report(_db, sql_params, self.inventory_report_query()) + + def generate_holds_report(self, _db: Session, sql_params: dict[str, Any]) -> str: + """Generate a holds report csv file and return the file path""" + return self.generate_csv_report(_db, sql_params, self.holds_report_query()) + + def generate_csv_report( + self, _db: Session, sql_params: dict[str, Any], query: str + ) -> str: + with tempfile.NamedTemporaryFile("w", delete=False, encoding="utf-8") as temp: + writer = csv.writer(temp, delimiter=",") + rows = _db.execute( + text(query), + sql_params, + ) + writer.writerow(rows.keys()) + writer.writerows(rows) + return temp.name + + @staticmethod + def inventory_report_query() -> str: + return """ + select + e.title, + e.author, + i.identifier, + e.language, + e.publisher, + e.medium as format, + w.audience, + wg.genres, + d.name data_source, + ic.name collection_name, + l.expires license_expiration, + DATE_PART('day', l.expires - now()) days_remaining_on_license, + l.checkouts_left remaining_loans, + l.terms_concurrency allowed_concurrent_users, + coalesce(lib_loans.active_loan_count, 0) library_active_loan_count, + CASE WHEN collection_sharing.is_shared_collection THEN lp.licenses_reserved + ELSE -1 + END shared_active_loan_count + from datasources d, + collections c, + integration_configurations ic, + integration_library_configurations il, + libraries lib, + works w left outer join (select wg.work_id, string_agg(g.name, ',' order by g.name) as genres + from genres g, + workgenres wg + where g.id = wg.genre_id + group by wg.work_id) wg on w.id = wg.work_id, + editions e left outer join (select lp.presentation_edition_id, + p.library_id, + count(ln.id) active_loan_count + from loans ln, + licensepools lp, + patrons p, + libraries l + where p.id = ln.patron_id and + p.library_id = l.id and + ln.license_pool_id = lp.id and + l.id = :library_id + group by p.library_id, lp.presentation_edition_id) lib_loans + on e.id = lib_loans.presentation_edition_id, + identifiers i, + (select ilc.parent_id, + count(ilc.parent_id) > 1 is_shared_collection + from integration_library_configurations ilc, + integration_configurations i, + collections c + where c.integration_configuration_id = i.id and + i.id = ilc.parent_id group by ilc.parent_id) collection_sharing, + licensepools lp left outer join (select license_pool_id, + checkouts_left, + expires, + terms_concurrency + from licenses where status = 'available') l on lp.id = l.license_pool_id + where lp.identifier_id = i.id and + e.primary_identifier_id = i.id and + e.id = w.presentation_edition_id and + d.id = e.data_source_id and + c.id = lp.collection_id and + c.integration_configuration_id = ic.id and + ic.id = il.parent_id and + ic.id = collection_sharing.parent_id and + ic.id in :integration_ids and + il.library_id = lib.id and + lib.id = :library_id + order by title, author + """ + + @staticmethod + def holds_report_query() -> str: + return """ + select + e.title, + e.author, + i.identifier, + e.language, + e.publisher, + e.medium as format, + w.audience, + wg.genres, + d.name data_source, + ic.name collection_name, + coalesce(lib_holds.active_hold_count, 0) library_active_hold_count, + CASE WHEN collection_sharing.is_shared_collection THEN lp.patrons_in_hold_queue + ELSE -1 + END shared_active_hold_count + from datasources d, + collections c, + integration_configurations ic, + integration_library_configurations il, + libraries lib, + works w left outer join (select wg.work_id, string_agg(g.name, ',' order by g.name) as genres + from genres g, + workgenres wg + where g.id = wg.genre_id + group by wg.work_id) wg on w.id = wg.work_id, + editions e, + (select lp.presentation_edition_id, + p.library_id, + count(h.id) active_hold_count + from holds h, + licensepools lp, + patrons p + where p.id = h.patron_id and + h.license_pool_id = lp.id and + p.library_id = :library_id and + (h.end is null or + h.end > now() or + h.position > 0) + group by p.library_id, lp.presentation_edition_id) lib_holds, + identifiers i, + (select ilc.parent_id, + count(ilc.parent_id) > 1 is_shared_collection + from integration_library_configurations ilc, + integration_configurations i, + collections c + where c.integration_configuration_id = i.id and + i.id = ilc.parent_id group by ilc.parent_id) collection_sharing, + licensepools lp + where lp.identifier_id = i.id and + e.primary_identifier_id = i.id and + e.id = lib_holds.presentation_edition_id and + e.id = w.presentation_edition_id and + d.id = e.data_source_id and + c.id = lp.collection_id and + c.integration_configuration_id = ic.id and + ic.id = il.parent_id and + ic.id = collection_sharing.parent_id and + ic.id in :integration_ids and + il.library_id = lib.id and + lib.id = :library_id + order by title, author + """ + + +@shared_task(queue=QueueNames.high, bind=True) +def generate_inventory_and_hold_reports( + task: Task, library_id: int, email_address: str +) -> None: + GenerateInventoryAndHoldsReportsJob( + task.session_maker, + library_id=library_id, + email_address=email_address, + send_email=task.services.email.send_email, + ).run() diff --git a/core/scripts.py b/core/scripts.py index b968be4870..932909a736 100644 --- a/core/scripts.py +++ b/core/scripts.py @@ -1,27 +1,23 @@ import argparse -import csv import datetime import logging import os import random import sys -import tempfile import traceback import unicodedata import uuid from collections.abc import Generator, Sequence from enum import Enum -from pathlib import Path -from typing import Any, TextIO +from typing import TextIO -from sqlalchemy import and_, exists, not_, or_, select, text, tuple_ +from sqlalchemy import and_, exists, or_, select, tuple_ from sqlalchemy.orm import Query, Session, defer from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound from api.integration.registry.license_providers import LicenseProvidersRegistry from core.coverage import CollectionCoverageProviderJob, CoverageProviderProgress -from core.exceptions import BasePalaceException from core.external_search import ExternalSearchIndex, Filter from core.integration.goals import Goals from core.lane import Lane @@ -35,7 +31,6 @@ Edition, Identifier, IntegrationConfiguration, - IntegrationLibraryConfiguration, Library, LicensePool, LicensePoolDeliveryMechanism, @@ -51,22 +46,11 @@ production_session, ) from core.model.classification import Classification -from core.model.deferredtask import ( - DeferredTask, - DeferredTaskType, - InventoryReportTaskData, - start_next_task, -) from core.model.devicetokens import DeviceToken, DeviceTokenTypes from core.model.listeners import site_configuration_has_changed from core.model.patron import Loan from core.monitor import CollectionMonitor, ReaperMonitor -from core.opds_import import ( - OPDSAPI, - OPDSImporter, - OPDSImporterSettings, - OPDSImportMonitor, -) +from core.opds_import import OPDSAPI, OPDSImporter, OPDSImportMonitor from core.search.coverage_provider import SearchIndexCoverageProvider from core.search.coverage_remover import RemovesSearchCoverage from core.service.container import Services, container_instance @@ -2706,332 +2690,6 @@ def suppress_work(self, library: Library, identifier: Identifier) -> None: self._db.commit() -class GenerateInventoryReports(Script): - """Generate inventory reports from queued report tasks""" - - # set to False when testing attachments. - _delete_attachments = True - - @classmethod - def arg_parser(cls, _db: Session | None) -> argparse.ArgumentParser: # type: ignore[override] - parser = argparse.ArgumentParser() - if _db is None: - raise ValueError("No database session provided.") - - return parser - - @classmethod - def parse_command_line( - cls, _db: Session | None = None, cmd_args: list[str] | None = None - ): - parser = cls.arg_parser(_db) - return parser.parse_known_args(cmd_args)[0] - - def do_run(self, cmd_args: list[str] | None = None) -> None: - parsed = self.parse_command_line(self._db, cmd_args=cmd_args) - - while True: - task = start_next_task(self._db, DeferredTaskType.INVENTORY_REPORT) - if not task: - break - - self.process_task(task) - - self.remove_old_tasks() - - def process_task(self, task: DeferredTask): - data = InventoryReportTaskData(**task.data) - try: - current_time = datetime.datetime.now() - date_str = current_time.strftime("%Y-%m-%d_%H:%M:%s") - attachments: dict[str, Path] = {} - library = get_one(self._db, Library, id=data.library_id) - - if not library: - raise BasePalaceException( - message=f"Cannot process task: library {data.library_id} not found." - ) - - file_name_modifier = f"{library.short_name}-{date_str}" - - # resolve integrations - integrations = self._db.scalars( - select(IntegrationConfiguration) - .join(IntegrationLibraryConfiguration) - .where( - IntegrationLibraryConfiguration.library_id == data.library_id, - IntegrationConfiguration.goal == Goals.LICENSE_GOAL, - not_( - IntegrationConfiguration.settings_dict.contains( - {"include_in_inventory_report": False} - ) - ), - ) - ).all() - registry = LicenseProvidersRegistry() - integration_ids: list[int] = [] - for integration in integrations: - settings = registry[integration.protocol].settings_load(integration) - if not isinstance(settings, OPDSImporterSettings): - continue - integration_ids.append(integration.id) - - # generate inventory report csv file - sql_params: dict[str, Any] = { - "library_id": library.id, - "integration_ids": tuple(integration_ids), - } - - inventory_report_file_path = self.generate_inventory_report( - sql_params=sql_params - ) - - # generate holds report csv file - holds_report_file_path = self.generate_holds_report(sql_params=sql_params) - - attachments[f"palace-inventory-report-{file_name_modifier}.csv"] = Path( - inventory_report_file_path - ) - attachments[f"palace-holds-report-{file_name_modifier}.csv"] = Path( - holds_report_file_path - ) - - self.services.email.send_email( - subject=f"Inventory and Holds Reports {current_time}", - receivers=[data.admin_email], - text="", - attachments=attachments, - ) - task.complete() - except Exception as e: - # log error - self.log.error(f"Failed to process task: {task}", e) - task.fail(failure_details=f"{e}") - finally: - self._db.commit() - if self._delete_attachments: - for file_path in attachments.values(): - os.remove(file_path) - - def generate_inventory_report(self, sql_params: dict[str, Any]) -> str: - """Generate an inventory csv file and return the file path""" - return self.generate_csv_report(sql_params, self.inventory_report_query()) - - def generate_holds_report(self, sql_params: dict[str, Any]) -> str: - """Generate a holds report csv file and return the file path""" - return self.generate_csv_report(sql_params, self.holds_report_query()) - - def generate_csv_report(self, sql_params: dict[str, Any], query: str): - with tempfile.NamedTemporaryFile("w", delete=False, encoding="utf-8") as temp: - writer = csv.writer(temp, delimiter=",") - rows = self._db.execute( - text(query), - sql_params, - ) - writer.writerow(rows.keys()) - writer.writerows(rows) - return temp.name - - def inventory_report_query(self) -> str: - return """ - select - e.title, - e.author, - i.identifier, - e.language, - e.publisher, - e.medium as format, - w.audience, - wg.genres, - d.name data_source, - ic.name collection_name, - l.expires license_expiration, - DATE_PART('day', l.expires - now()) days_remaining_on_license, - l.checkouts_left remaining_loans, - l.terms_concurrency allowed_concurrent_users, - coalesce(lib_loans.active_loan_count, 0) library_active_loan_count, - CASE WHEN collection_sharing.is_shared_collection THEN lp.licenses_reserved - ELSE -1 - END shared_active_loan_count - from datasources d, - collections c, - integration_configurations ic, - integration_library_configurations il, - libraries lib, - works w left outer join (select wg.work_id, string_agg(g.name, ',' order by g.name) as genres - from genres g, - workgenres wg - where g.id = wg.genre_id - group by wg.work_id) wg on w.id = wg.work_id, - editions e left outer join (select lp.presentation_edition_id, - p.library_id, - count(ln.id) active_loan_count - from loans ln, - licensepools lp, - patrons p, - libraries l - where p.id = ln.patron_id and - p.library_id = l.id and - ln.license_pool_id = lp.id and - l.id = :library_id - group by p.library_id, lp.presentation_edition_id) lib_loans - on e.id = lib_loans.presentation_edition_id, - identifiers i, - (select ilc.parent_id, - count(ilc.parent_id) > 1 is_shared_collection - from integration_library_configurations ilc, - integration_configurations i, - collections c - where c.integration_configuration_id = i.id and - i.id = ilc.parent_id group by ilc.parent_id) collection_sharing, - licensepools lp left outer join (select license_pool_id, - checkouts_left, - expires, - terms_concurrency - from licenses where status = 'available') l on lp.id = l.license_pool_id - where lp.identifier_id = i.id and - e.primary_identifier_id = i.id and - e.id = w.presentation_edition_id and - d.id = e.data_source_id and - c.id = lp.collection_id and - c.integration_configuration_id = ic.id and - ic.id = il.parent_id and - ic.id = collection_sharing.parent_id and - ic.id in :integration_ids and - il.library_id = lib.id and - lib.id = :library_id - order by title, author - """ - - def holds_report_query(self) -> str: - return """ - select - e.title, - e.author, - i.identifier, - e.language, - e.publisher, - e.medium as format, - w.audience, - wg.genres, - d.name data_source, - ic.name collection_name, - coalesce(lib_holds.active_hold_count, 0) library_active_hold_count, - CASE WHEN collection_sharing.is_shared_collection THEN lp.patrons_in_hold_queue - ELSE -1 - END shared_active_hold_count - from datasources d, - collections c, - integration_configurations ic, - integration_library_configurations il, - libraries lib, - works w left outer join (select wg.work_id, string_agg(g.name, ',' order by g.name) as genres - from genres g, - workgenres wg - where g.id = wg.genre_id - group by wg.work_id) wg on w.id = wg.work_id, - editions e, - (select lp.presentation_edition_id, - p.library_id, - count(h.id) active_hold_count - from holds h, - licensepools lp, - patrons p - where p.id = h.patron_id and - h.license_pool_id = lp.id and - p.library_id = :library_id and - (h.end is null or - h.end > now() or - h.position > 0) - group by p.library_id, lp.presentation_edition_id) lib_holds, - identifiers i, - (select ilc.parent_id, - count(ilc.parent_id) > 1 is_shared_collection - from integration_library_configurations ilc, - integration_configurations i, - collections c - where c.integration_configuration_id = i.id and - i.id = ilc.parent_id group by ilc.parent_id) collection_sharing, - licensepools lp - where lp.identifier_id = i.id and - e.primary_identifier_id = i.id and - e.id = lib_holds.presentation_edition_id and - e.id = w.presentation_edition_id and - d.id = e.data_source_id and - c.id = lp.collection_id and - c.integration_configuration_id = ic.id and - ic.id = il.parent_id and - ic.id = collection_sharing.parent_id and - ic.id in :integration_ids and - il.library_id = lib.id and - lib.id = :library_id - order by title, author - """ - - def remove_old_tasks(self): - """Remove inventory generation tasks older than 30 days""" - self._db.query(DeferredTask) - thirty_days_ago = utc_now() - datetime.timedelta(days=30) - tasks = ( - self._db.query(DeferredTask) - .filter(DeferredTask.task_type == DeferredTaskType.INVENTORY_REPORT) - .filter(DeferredTask.processing_end_time < thirty_days_ago) - ) - for task in tasks: - self._db.delete(task) - - -class DeleteOldDeferredTasks(Script): - """Delete old deferred tasks.""" - - @classmethod - def arg_parser(cls, _db: Session | None) -> argparse.ArgumentParser: # type: ignore[override] - parser = argparse.ArgumentParser() - if _db is None: - raise ValueError("No database session provided.") - - return parser - - @classmethod - def parse_command_line( - cls, _db: Session | None = None, cmd_args: list[str] | None = None - ): - parser = cls.arg_parser(_db) - return parser.parse_known_args(cmd_args)[0] - - def do_run(self, cmd_args: list[str] | None = None) -> None: - parsed = self.parse_command_line(self._db, cmd_args=cmd_args) - self.remove_old_tasks() - - def remove_old_tasks(self): - """Remove inventory generation tasks older than 30 days""" - self._db.query(DeferredTask) - days = 30 - thirty_days_ago = utc_now() - datetime.timedelta(days=days) - tasks = ( - self._db.query(DeferredTask) - .filter(DeferredTask.task_type == DeferredTaskType.INVENTORY_REPORT) - .filter(DeferredTask.processing_end_time < thirty_days_ago) - ) - - tasks_removed = 0 - - for task in tasks: - self._db.delete(task) - tasks_removed += 1 - - self._db.commit() - if tasks_removed > 0: - self.log.info( - f"Successfully removed {tasks_removed} task{ 's' if tasks_removed > 1 else ''} " - f"that were completed over {days} days ago." - ) - else: - self.log.info( - f"There were no deferred tasks that were completed over {days} ago to be removed." - ) - - class MockStdin: """Mock a list of identifiers passed in on standard input.""" diff --git a/docker/services/cron/cron.d/circulation b/docker/services/cron/cron.d/circulation index edb7155ec2..9e3eec7606 100644 --- a/docker/services/cron/cron.d/circulation +++ b/docker/services/cron/cron.d/circulation @@ -108,7 +108,3 @@ HOME=/var/www/circulation 0 8,20 * * * root core/bin/run playtime_summation >> /var/log/cron.log 2>&1 # On the 2nd of every month 0 4 2 * * root core/bin/run playtime_reporting >> /var/log/cron.log 2>&1 - -# check the inventory report task queue every 15 minutes. -*/15 * * * * root core/bin/run generate_inventory_reports >> /var/log/cron.log 2>&1 -0 0 * * * root core/bin/run delete_old_deferred_tasks >> /var/log/cron.log 2>&1 diff --git a/tests/api/admin/controller/test_report.py b/tests/api/admin/controller/test_report.py index 72ff963c9f..509280fa9f 100644 --- a/tests/api/admin/controller/test_report.py +++ b/tests/api/admin/controller/test_report.py @@ -1,7 +1,8 @@ -import json from http import HTTPStatus +from unittest.mock import patch import pytest +from flask import Response from core.model import create from core.model.admin import Admin, AdminRole @@ -22,29 +23,32 @@ def report_fixture( class TestReportController: - def test_generate_inventory_report(self, report_fixture: ReportControllerFixture): + def test_generate_inventory_and_hold_reports( + self, report_fixture: ReportControllerFixture + ): + email_address = "admin@email.com" ctrl = report_fixture.manager.admin_report_controller db = report_fixture.ctrl.db - report_fixture.ctrl.library = report_fixture.ctrl.db.default_library() - system_admin, _ = create(db.session, Admin, email="admin@email.com") + library = report_fixture.ctrl.db.default_library() + report_fixture.ctrl.library = library + library_id = library.id + system_admin, _ = create(db.session, Admin, email=email_address) system_admin.add_role(AdminRole.SYSTEM_ADMIN) - with report_fixture.request_context_with_library_and_admin( - f"/", - admin=system_admin, - ) as ctx: + + with ( + report_fixture.request_context_with_library_and_admin( + f"/", + admin=system_admin, + ), + patch( + "api.admin.controller.report.generate_inventory_and_hold_reports" + ) as mock_generate_reports, + ): response = ctrl.generate_inventory_report() assert response.status_code == HTTPStatus.ACCEPTED - body = json.loads(response.data) # type: ignore - assert body and body["message"].__contains__("admin@email.com") - assert not body.__contains__("already") - - # check that when generating a duplicate request a 409 is returned. - with report_fixture.request_context_with_library_and_admin( - f"/", - admin=system_admin, - ) as ctx: - response = ctrl.generate_inventory_report() - body = json.loads(response.data) # type: ignore - assert response.status_code == HTTPStatus.CONFLICT - assert body and body["message"].__contains__("admin@email.com") - assert body["message"].__contains__("already") + assert isinstance(response, Response) + assert response.json and email_address in response.json["message"] + + mock_generate_reports.delay.assert_called_once_with( + email_address=email_address, library_id=library_id + ) diff --git a/tests/core/celery/tasks/test_generate_inventory_and_hold_reports.py b/tests/core/celery/tasks/test_generate_inventory_and_hold_reports.py new file mode 100644 index 0000000000..0349f2a931 --- /dev/null +++ b/tests/core/celery/tasks/test_generate_inventory_and_hold_reports.py @@ -0,0 +1,286 @@ +import csv +import os +from datetime import timedelta +from unittest.mock import create_autospec + +from pytest import LogCaptureFixture +from sqlalchemy.orm import sessionmaker + +from api.overdrive import OverdriveSettings +from core.celery.tasks.generate_inventory_and_hold_reports import ( + GenerateInventoryAndHoldsReportsJob, + generate_inventory_and_hold_reports, +) +from core.model import Genre, Hold, Library, get_one_or_create +from core.model.licensing import LicenseStatus +from core.opds_import import OPDSImporterSettings +from core.service.logging.configuration import LogLevel +from core.util.datetime_helpers import utc_now +from tests.fixtures.celery import CeleryFixture +from tests.fixtures.database import DatabaseTransactionFixture +from tests.fixtures.services import ServicesFixture + + +def test_job_run( + db: DatabaseTransactionFixture, + mock_session_maker: sessionmaker, + services_fixture: ServicesFixture, + caplog: LogCaptureFixture, +): + email = "test@email.com" + + # A non-existent collection should log an error + caplog.set_level(LogLevel.info) + send_email_mock = create_autospec( + services_fixture.services.email.container.send_email + ) + GenerateInventoryAndHoldsReportsJob( + mock_session_maker, + library_id=1, + email_address=email, + send_email=send_email_mock, + ).run() + assert ( + f"Cannot generate inventory and holds report for library (id=1): library not found." + in caplog.text + ) + + # create some test data that we expect to be picked up in the inventory report + library = db.library(short_name="test_library") + data_source = "BiblioBoard" + collection_name = "BiblioBoard Test Collection" + collection = create_test_opds_collection(collection_name, data_source, db, library) + library2 = db.library(short_name="test_library2") + # add another library + collection.libraries.append(library2) + + # Configure test data we expect will not be picked up. + create_test_opds_collection( + "Another Test Collection", "AnotherOpdsDataSource", db, library, False + ) + + od_settings = OverdriveSettings( + overdrive_website_id="overdrive_id", + overdrive_client_key="client_key", + overdrive_client_secret="secret", + external_account_id="http://www.overdrive/id", + ) + od_collection_not_to_include = db.collection( + name="Overdrive Test Collection", + data_source_name="Overdrive", + settings=od_settings.dict(), + ) + + od_collection_not_to_include.libraries = [library] + + ds = collection.data_source + assert ds + title = "展翅高飞 : Whistling Wings" + author = "Laura Goering" + language = "eng" + publisher = "My Publisher" + checkouts_left = 10 + terms_concurrency = 5 + edition = db.edition(data_source_name=ds.name) + edition.language = language + edition.publisher = publisher + edition.title = title + edition.medium = edition.BOOK_MEDIUM + edition.author = author + work = db.work( + language="eng", + fiction=True, + with_license_pool=False, + data_source_name=ds.name, + presentation_edition=edition, + collection=collection, + genre="genre_z", + ) + + genre, ignore = Genre.lookup(db.session, "genre_a", autocreate=True) + work.genres.append(genre) + work.audience = "young adult" + + licensepool = db.licensepool( + edition=edition, + open_access=False, + data_source_name=ds.name, + set_edition_as_presentation=True, + collection=collection, + ) + + days_remaining = 10 + expiration = utc_now() + timedelta(days=days_remaining) + db.license( + pool=licensepool, + status=LicenseStatus.available, + checkouts_left=checkouts_left, + terms_concurrency=terms_concurrency, + expires=expiration, + ) + db.license( + pool=licensepool, + status=LicenseStatus.unavailable, + checkouts_left=1, + terms_concurrency=1, + expires=utc_now(), + ) + + patron1 = db.patron(library=library) + patron2 = db.patron(library=library) + patron3 = db.patron(library=library) + patron4 = db.patron(library=library) + + # this one should be counted because the end is in the future. + hold1, _ = get_one_or_create( + db.session, + Hold, + patron=patron1, + license_pool=licensepool, + position=1, + start=utc_now(), + end=utc_now() + timedelta(days=1), + ) + + # this one should be counted because the end is None + hold2, _ = get_one_or_create( + db.session, + Hold, + patron=patron2, + license_pool=licensepool, + start=utc_now(), + end=None, + ) + + # this hold should be counted b/c the position is > 0 + hold3, _ = get_one_or_create( + db.session, + Hold, + patron=patron3, + license_pool=licensepool, + start=utc_now() - timedelta(days=1), + end=utc_now() - timedelta(minutes=1), + position=1, + ) + + # this hold should not be counted because the end is neither in the future nor unset and the position is zero + hold4, _ = get_one_or_create( + db.session, + Hold, + patron=patron4, + license_pool=licensepool, + start=utc_now(), + end=utc_now() - timedelta(minutes=1), + position=0, + ) + + shared_patrons_in_hold_queue = 4 + licensepool.patrons_in_hold_queue = shared_patrons_in_hold_queue + + assert library.id + + # for testing, don't delete the files associated with the attachments so we can read them after the script + # runs + job = GenerateInventoryAndHoldsReportsJob( + mock_session_maker, + library.id, + email_address=email, + send_email=send_email_mock, + delete_attachments=False, + ) + + job.run() + send_email_mock.assert_called_once() + kwargs = send_email_mock.call_args.kwargs + assert kwargs["receivers"] == [email] + assert "Inventory and Holds Reports" in kwargs["subject"] + attachments: dict = kwargs["attachments"] + + assert len(attachments) == 2 + inventory_report_key = [x for x in attachments.keys() if "inventory" in x][0] + assert inventory_report_key + assert "test_library" in inventory_report_key + inventory_report_value = attachments[inventory_report_key] + assert inventory_report_value + inventory_report_csv = list(csv.DictReader(open(inventory_report_value))) + + assert len(inventory_report_csv) == 1 + for row in inventory_report_csv: + assert row["title"] == title + assert row["author"] == author + assert row["identifier"] + assert row["language"] == language + assert row["publisher"] == publisher + assert row["audience"] == "young adult" + assert row["genres"] == "genre_a,genre_z" + assert row["format"] == edition.BOOK_MEDIUM + assert row["data_source"] == data_source + assert row["collection_name"] == collection_name + assert float(row["days_remaining_on_license"]) == float(days_remaining) + assert row["shared_active_loan_count"] == "0" + assert row["library_active_loan_count"] == "0" + assert row["remaining_loans"] == str(checkouts_left) + assert row["allowed_concurrent_users"] == str(terms_concurrency) + assert expiration.strftime("%Y-%m-%d %H:%M:%S.%f") in row["license_expiration"] + + holds_report_key = [x for x in attachments.keys() if "holds" in x][0] + assert holds_report_key + assert "test_library" in holds_report_key + holds_report_value = attachments[holds_report_key] + assert holds_report_value + holds_report_csv = list(csv.DictReader(open(holds_report_value))) + assert len(holds_report_csv) == 1 + + for row in holds_report_csv: + assert row["title"] == title + assert row["author"] == author + assert row["identifier"] + assert row["language"] == language + assert row["publisher"] == publisher + assert row["audience"] == "young adult" + assert row["genres"] == "genre_a,genre_z" + assert row["format"] == edition.BOOK_MEDIUM + assert row["data_source"] == data_source + assert row["collection_name"] == collection_name + assert int(row["shared_active_hold_count"]) == shared_patrons_in_hold_queue + assert int(row["library_active_hold_count"]) == 3 + + # clean up files + for f in attachments.values(): + os.remove(f) + + +def create_test_opds_collection( + collection_name: str, + data_source: str, + db: DatabaseTransactionFixture, + library: Library, + include_in_inventory_report: bool = True, +): + settings = OPDSImporterSettings( + include_in_inventory_report=include_in_inventory_report, + external_account_id="http://opds.com", + data_source=data_source, + ) + collection = db.collection(name=collection_name, settings=settings.dict()) + collection.libraries = [library] + return collection + + +def test_generate_inventory_and_hold_reports_task( + db: DatabaseTransactionFixture, + services_fixture: ServicesFixture, + celery_fixture: CeleryFixture, +): + library = db.library(short_name="test_library") + # there must be at least one opds collection associated with the library for this to work + create_test_opds_collection("c1", "d1", db, library) + generate_inventory_and_hold_reports.delay(library.id, "test@email").wait() + services_fixture.email_fixture.mock_emailer.send.assert_called_once() + assert ( + "Inventory and Holds Reports" + in services_fixture.email_fixture.mock_emailer.send.call_args.kwargs["subject"] + ) + assert services_fixture.email_fixture.mock_emailer.send.call_args.kwargs[ + "receivers" + ] == ["test@email"] diff --git a/tests/core/test_scripts.py b/tests/core/test_scripts.py index 1c60159eb3..6d1770dbb0 100644 --- a/tests/core/test_scripts.py +++ b/tests/core/test_scripts.py @@ -1,11 +1,7 @@ from __future__ import annotations -import csv import datetime -import logging -import os import random -from dataclasses import asdict from io import StringIO from unittest.mock import MagicMock, call, create_autospec, patch @@ -16,7 +12,7 @@ from api.bibliotheca import BibliothecaAPI from api.enki import EnkiAPI from api.lanes import create_default_lanes -from api.overdrive import OverdriveAPI, OverdriveSettings +from api.overdrive import OverdriveAPI from core.classifier import Classifier from core.config import CannotLoadConfiguration from core.external_search import ExternalSearchIndex @@ -37,20 +33,11 @@ get_one, get_one_or_create, ) -from core.model.classification import Classification, Genre, Subject -from core.model.deferredtask import ( - DeferredTask, - DeferredTaskStatus, - DeferredTaskType, - InventoryReportTaskData, - queue_task, - start_next_task, -) +from core.model.classification import Classification, Subject from core.model.devicetokens import DeviceToken, DeviceTokenTypes -from core.model.licensing import LicenseStatus -from core.model.patron import Hold, Patron +from core.model.patron import Patron from core.monitor import CollectionMonitor, Monitor, ReaperMonitor -from core.opds_import import OPDSAPI, OPDSImporterSettings, OPDSImportMonitor +from core.opds_import import OPDSAPI, OPDSImportMonitor from core.scripts import ( AddClassificationScript, CheckContributorNamesInDB, @@ -60,9 +47,7 @@ ConfigureLaneScript, ConfigureLibraryScript, DeleteInvisibleLanesScript, - DeleteOldDeferredTasks, Explain, - GenerateInventoryReports, IdentifierInputScript, LaneSweeperScript, LibraryInputScript, @@ -2378,292 +2363,6 @@ def test_suppress_work(self, db: DatabaseTransactionFixture): assert work.suppressed_for == [test_library] -class TestGenerateInventoryReports: - def test_do_run(self, db: DatabaseTransactionFixture): - # create some test data that we expect to be picked up in the inventory report - library = db.library(short_name="test_library") - library2 = db.library(short_name="test_library2") - data_source = "BiblioBoard" - settings = OPDSImporterSettings( - include_in_inventory_report=True, - external_account_id="http://opds.com", - data_source=data_source, - ) - - collection_name = "BiblioBoard Test Collection" - collection = db.collection(name=collection_name, settings=settings.dict()) - collection.libraries = [library, library2] - - # Configure test data we expect will not be picked up. - no_inventory_report_settings = OPDSImporterSettings( - include_in_inventory_report=False, - external_account_id="http://opds.com", - data_source="AnotherOpdsDataSource", - ) - collection_not_to_include = db.collection( - name="Another Test Collection", settings=no_inventory_report_settings.dict() - ) - collection_not_to_include.libraries = [library] - - od_settings = OverdriveSettings( - overdrive_website_id="overdrive_id", - overdrive_client_key="client_key", - overdrive_client_secret="secret", - external_account_id="http://www.overdrive/id", - ) - od_collection_not_to_include = db.collection( - name="Overdrive Test Collection", - data_source_name="Overdrive", - settings=od_settings.dict(), - ) - - od_collection_not_to_include.libraries = [library] - - ds = collection.data_source - assert ds - title = "展翅高飞 : Whistling Wings" - author = "Laura Goering" - email = "test@email.com" - language = "eng" - publisher = "My Publisher" - checkouts_left = 10 - terms_concurrency = 5 - edition = db.edition(data_source_name=ds.name) - edition.language = language - edition.publisher = publisher - edition.title = title - edition.medium = edition.BOOK_MEDIUM - edition.author = author - work = db.work( - language="eng", - fiction=True, - with_license_pool=False, - data_source_name=ds.name, - presentation_edition=edition, - collection=collection, - genre="genre_z", - ) - - genre, ignore = Genre.lookup(db.session, "genre_a", autocreate=True) - work.genres.append(genre) - work.audience = "young adult" - - licensepool = db.licensepool( - edition=edition, - open_access=False, - data_source_name=ds.name, - set_edition_as_presentation=True, - collection=collection, - ) - - days_remaining = 10 - expiration = utc_now() + datetime.timedelta(days=days_remaining) - db.license( - pool=licensepool, - status=LicenseStatus.available, - checkouts_left=checkouts_left, - terms_concurrency=terms_concurrency, - expires=expiration, - ) - db.license( - pool=licensepool, - status=LicenseStatus.unavailable, - checkouts_left=1, - terms_concurrency=1, - expires=utc_now(), - ) - - patron1 = db.patron(library=library) - patron2 = db.patron(library=library) - patron3 = db.patron(library=library) - patron4 = db.patron(library=library) - - # this one should be counted because the end is in the future. - hold1, _ = get_one_or_create( - db.session, - Hold, - patron=patron1, - license_pool=licensepool, - position=1, - start=utc_now(), - end=utc_now() + datetime.timedelta(days=1), - ) - - # this one should be counted because the end is None - hold2, _ = get_one_or_create( - db.session, - Hold, - patron=patron2, - license_pool=licensepool, - start=utc_now(), - end=None, - ) - - # this hold should be counted b/c the position is > 0 - hold3, _ = get_one_or_create( - db.session, - Hold, - patron=patron3, - license_pool=licensepool, - start=utc_now() - datetime.timedelta(days=1), - end=utc_now() - datetime.timedelta(minutes=1), - position=1, - ) - - # this hold should not be counted because the end is neither in the future nor unset and the position is zero - hold4, _ = get_one_or_create( - db.session, - Hold, - patron=patron4, - license_pool=licensepool, - start=utc_now(), - end=utc_now() - datetime.timedelta(minutes=1), - position=0, - ) - - shared_patrons_in_hold_queue = 4 - licensepool.patrons_in_hold_queue = shared_patrons_in_hold_queue - - assert library.id - data = InventoryReportTaskData( - admin_id=1, library_id=library.id, admin_email=email - ) - task, is_new = queue_task( - db.session, task_type=DeferredTaskType.INVENTORY_REPORT, data=asdict(data) - ) - - assert task.status == DeferredTaskStatus.READY - - script = GenerateInventoryReports(db.session) - send_email_mock = create_autospec(script.services.email.container.send_email) - # for testing, don't delete the files associated with the attachments so we can read them after the script - # runs - script._delete_attachments = False - script.services.email.container.send_email = send_email_mock - script.do_run() - send_email_mock.assert_called_once() - args, kwargs = send_email_mock.call_args - assert task.status == DeferredTaskStatus.SUCCESS - assert kwargs["receivers"] == [email] # type:ignore[unreachable] - assert "Inventory and Holds Reports" in kwargs["subject"] - attachments: dict = kwargs["attachments"] - - assert len(attachments) == 2 - inventory_report_key = [x for x in attachments.keys() if "inventory" in x][0] - assert inventory_report_key - assert "test_library" in inventory_report_key - inventory_report_value = attachments[inventory_report_key] - assert inventory_report_value - inventory_report_reader = csv.DictReader(open(inventory_report_value)) - inventory_row_count = 0 - - for row in inventory_report_reader: - inventory_row_count += 1 - assert row["title"] == title - assert row["author"] == author - assert row["identifier"] - assert row["language"] == language - assert row["publisher"] == publisher - assert row["audience"] == "young adult" - assert row["genres"] == "genre_a,genre_z" - assert row["format"] == edition.BOOK_MEDIUM - assert row["data_source"] == data_source - assert row["collection_name"] == collection_name - assert float(row["days_remaining_on_license"]) == float(days_remaining) - assert row["shared_active_loan_count"] == "0" - assert row["library_active_loan_count"] == "0" - assert row["remaining_loans"] == str(checkouts_left) - assert row["allowed_concurrent_users"] == str(terms_concurrency) - assert ( - expiration.strftime("%Y-%m-%d %H:%M:%S.%f") in row["license_expiration"] - ) - - assert inventory_row_count == 1 - - holds_report_key = [x for x in attachments.keys() if "holds" in x][0] - assert holds_report_key - assert "test_library" in holds_report_key - holds_report_value = attachments[holds_report_key] - assert holds_report_value - holds_report_reader = csv.DictReader(open(holds_report_value)) - row_count = 0 - - for row in holds_report_reader: - row_count += 1 - assert row["title"] == title - assert row["author"] == author - assert row["identifier"] - assert row["language"] == language - assert row["publisher"] == publisher - assert row["audience"] == "young adult" - assert row["genres"] == "genre_a,genre_z" - assert row["format"] == edition.BOOK_MEDIUM - assert row["data_source"] == data_source - assert row["collection_name"] == collection_name - assert int(row["shared_active_hold_count"]) == shared_patrons_in_hold_queue - assert int(row["library_active_hold_count"]) == 3 - - assert row_count == 1 - - # clean up files - for f in attachments.values(): - os.remove(f) - - -class TestDeleteOldDeferredTasks: - def test_do_run( - self, db: DatabaseTransactionFixture, caplog: pytest.LogCaptureFixture - ): - caplog.set_level(logging.INFO) - # create some test data - _db = db.session - - # the deferred task table should be empty - task = _db.query(DeferredTask).first() - assert not task - - data = InventoryReportTaskData( - admin_id=1, library_id=1, admin_email="test@email.com" - ) - task, is_new = queue_task( - db.session, task_type=DeferredTaskType.INVENTORY_REPORT, data=asdict(data) - ) - - assert task - assert is_new - - task2 = start_next_task(_db, task_type=DeferredTaskType.INVENTORY_REPORT) - - assert task2 - # sanity check: make sure we got back the task we just created. - assert task2.id == task.id - assert task2.processing_start_time - # make sure it is processing - assert task2.status == DeferredTaskStatus.PROCESSING - task2.complete() - assert task2.processing_end_time - _db.commit() - - # run it with the expection of no tasks to be deleted. - DeleteOldDeferredTasks(_db).do_run() - assert caplog.messages[0].__contains__("There were no deferred tasks") - - # set the task's end processing time to 30 days ago. - task3 = _db.query(DeferredTask).filter(DeferredTask.id == task2.id).first() - assert task3 - task3.processing_end_time = task3.processing_end_time - datetime.timedelta( - days=30 - ) - _db.commit() - # run again with the expectation of 1 task removed. - DeleteOldDeferredTasks(_db).do_run() - task4 = _db.query(DeferredTask).first() - assert not task4 - assert caplog.messages[1].__contains__( - "Successfully removed 1 task that were completed over 30 days ago." - ) - - class TestWorkConsolidationScript: """TODO"""