Skip to content

Commit

Permalink
[PP-1491] Introduce retries to prevent ObjectDeletedError (#1948)
Browse files Browse the repository at this point in the history
* Introduce retries on SweepMonitor and Axis360CirculationMonitor
* use tenacity retry library
* Add test coverage for axis
* Ensure that sessions containing stale objects are rolled back before attempting to retry.
* Align OverdriveCirculationMonitor retry approach with that of Axis360CirculationMonitor and SweepMonitor.
* Fix test, reduce test execution time, add test for StaleDataError and ObjectDeletedError
  • Loading branch information
dbernstein authored Jul 29, 2024
1 parent d2e0f66 commit 2963329
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 62 deletions.
17 changes: 16 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ redis = "^5.0.5"
redmail = "^0.6.0"
requests = "^2.29"
sqlalchemy = {version = "^1.4", extras = ["mypy"]}
tenacity = "^8.5.0"
textblob = "0.18.0.post0"
types-pyopenssl = "^24.0.0.20240130"
types-pyyaml = "^6.0.12.9"
Expand Down
53 changes: 36 additions & 17 deletions src/palace/manager/api/axis.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
from pydantic import validator
from requests import Response as RequestsResponse
from sqlalchemy.orm import Session
from sqlalchemy.orm.exc import ObjectDeletedError, StaleDataError
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)

from palace.manager.api.admin.validator import Validator
from palace.manager.api.circulation import (
Expand Down Expand Up @@ -734,8 +741,7 @@ class Axis360CirculationMonitor(CollectionMonitor, TimelineMonitor):

SERVICE_NAME = "Axis 360 Circulation Monitor"
INTERVAL_SECONDS = 60
DEFAULT_BATCH_SIZE = 50

MAX_RETRIES = 10
PROTOCOL = Axis360API.label()

DEFAULT_START_TIME = datetime_utc(1970, 1, 1)
Expand All @@ -756,7 +762,6 @@ def __init__(
else:
self.api = api_class(_db, collection)

self.batch_size = self.DEFAULT_BATCH_SIZE
self.bibliographic_coverage_provider = Axis360BibliographicCoverageProvider(
collection, api_class=self.api
)
Expand All @@ -775,26 +780,40 @@ def catch_up_from(
count = 0
for bibliographic, circulation in self.api.recent_activity(start):
self.process_book(bibliographic, circulation)
self._db.commit()
count += 1
if count % self.batch_size == 0:
self._db.commit()
progress.achievements = "Modified titles: %d." % count

@retry(
retry=(
retry_if_exception_type(StaleDataError)
| retry_if_exception_type(ObjectDeletedError)
),
stop=stop_after_attempt(MAX_RETRIES),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
)
def process_book(
self, bibliographic: Metadata, circulation: CirculationData
) -> tuple[Edition, LicensePool]:
edition, new_edition, license_pool, new_license_pool = self.api.update_book(
bibliographic, circulation
)
if new_license_pool or new_edition:
# At this point we have done work equivalent to that done by
# the Axis360BibliographicCoverageProvider. Register that the
# work has been done so we don't have to do it again.
identifier = edition.primary_identifier
self.bibliographic_coverage_provider.handle_success(identifier)
self.bibliographic_coverage_provider.add_coverage_record_for(identifier)

return edition, license_pool
tx = self._db.begin_nested()
try:
edition, new_edition, license_pool, new_license_pool = self.api.update_book(
bibliographic, circulation
)
if new_license_pool or new_edition:
# At this point we have done work equivalent to that done by
# the Axis360BibliographicCoverageProvider. Register that the
# work has been done so we don't have to do it again.
identifier = edition.primary_identifier
self.bibliographic_coverage_provider.handle_success(identifier)
self.bibliographic_coverage_provider.add_coverage_record_for(identifier)

tx.commit()
return edition, license_pool
except Exception as e:
tx.rollback()
raise e


class Axis360BibliographicCoverageProvider(BibliographicCoverageProvider):
Expand Down
60 changes: 36 additions & 24 deletions src/palace/manager/api/overdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import json
import logging
import re
import time
import urllib.parse
from collections.abc import Iterable
from threading import RLock
Expand All @@ -22,7 +21,13 @@
from requests.structures import CaseInsensitiveDict
from sqlalchemy import select
from sqlalchemy.orm import Query, Session
from sqlalchemy.orm.exc import StaleDataError
from sqlalchemy.orm.exc import ObjectDeletedError, StaleDataError
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)

from palace.manager.api.circulation import (
BaseCirculationAPI,
Expand Down Expand Up @@ -1956,7 +1961,7 @@ class OverdriveCirculationMonitor(CollectionMonitor, TimelineMonitor):
basic Editions for any new LicensePools that show up.
"""

MAXIMUM_BOOK_RETRIES = 3
MAXIMUM_BOOK_RETRIES = 5
SERVICE_NAME = "Overdrive Circulation Monitor"
PROTOCOL = OverdriveAPI.label()
OVERLAP = datetime.timedelta(minutes=1)
Expand Down Expand Up @@ -1995,34 +2000,41 @@ def catch_up_from(self, start, cutoff, progress: TimestampData):
if not book:
continue

# Attempt to create/update the book up to MAXIMUM_BOOK_RETRIES times.
book_changed = False
book_succeeded = False
for attempt in range(OverdriveCirculationMonitor.MAXIMUM_BOOK_RETRIES):
if book_succeeded:
break

try:
_, _, is_changed = self.api.update_licensepool(book)
self._db.commit()
book_succeeded = True
book_changed = is_changed
except StaleDataError as e:
self.log.exception("encountered stale data exception: ", exc_info=e)
self._db.rollback()
if attempt + 1 == OverdriveCirculationMonitor.MAXIMUM_BOOK_RETRIES:
progress.exception = e
else:
time.sleep(1)
self.log.warning(
f"retrying book {book} (attempt {attempt} of {OverdriveCirculationMonitor.MAXIMUM_BOOK_RETRIES})"
)
try:
book_changed = self.process_book(book, progress)
self._db.commit()
except Exception as e:
progress.exception = e

if self.should_stop(start, book, book_changed):
break

progress.achievements = "Books processed: %d." % total_books

@retry(
retry=(
retry_if_exception_type(StaleDataError)
| retry_if_exception_type(ObjectDeletedError)
),
stop=stop_after_attempt(MAXIMUM_BOOK_RETRIES),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
)
def process_book(self, book, progress):
# Attempt to create/update the book up to MAXIMUM_BOOK_RETRIES times.
book_changed = False
tx = self._db.begin_nested()
try:
_, _, is_changed = self.api.update_licensepool(book)
tx.commit()
book_changed = is_changed
except Exception as e:
self.log.exception("exception on update_licensepool: ", exc_info=e)
tx.rollback()
raise e
return book_changed

def should_stop(self, start, api_description, is_changed):
pass

Expand Down
51 changes: 39 additions & 12 deletions src/palace/manager/core/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
from typing import TYPE_CHECKING

from sqlalchemy.orm import defer
from sqlalchemy.orm.exc import ObjectDeletedError, StaleDataError
from sqlalchemy.sql.expression import and_, or_
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)

from palace.manager.core.exceptions import BasePalaceException
from palace.manager.core.metadata_layer import TimestampData
Expand Down Expand Up @@ -441,6 +448,8 @@ class SweepMonitor(CollectionMonitor):
# Items will be processed in batches of this size.
DEFAULT_BATCH_SIZE = 100

MAXIMUM_BATCH_RETRIES = 10

DEFAULT_COUNTER = 0

# The model class corresponding to the database table that this
Expand Down Expand Up @@ -471,7 +480,6 @@ def run_once(self, *ignore):
# last _successful_ batch.
run_started_at = utc_now()
timestamp.start = run_started_at

total_processed = 0
while True:
old_offset = offset
Expand Down Expand Up @@ -505,19 +513,38 @@ def run_once(self, *ignore):
# update.
return TimestampData(counter=offset, achievements=achievements)

@retry(
retry=(
retry_if_exception_type(StaleDataError)
| retry_if_exception_type(ObjectDeletedError)
),
stop=stop_after_attempt(MAXIMUM_BATCH_RETRIES),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
)
def process_batch(self, offset):
"""Process one batch of work."""
offset = offset or 0
items = self.fetch_batch(offset).all()
if items:
self.process_items(items)
# We've completed a batch. Return the ID of the last item
# in the batch so we don't do this work again.
return items[-1].id, len(items)
else:
# There are no more items in this database table, so we
# are done with the sweep. Reset the counter.
return 0, 0
tx = self._db.begin_nested()
try:
offset = offset or 0
items = self.fetch_batch(offset).all()
if items:
self.process_items(items)

# We've completed a batch. Return the ID of the last item
# in the batch so we don't do this work again.

result = (items[-1].id, len(items))
else:
# There are no more items in this database table, so we
# are done with the sweep. Reset the counter.
result = (0, 0)

tx.commit()
return result
except Exception as e:
tx.rollback()
raise e

def process_items(self, items):
"""Process a list of items."""
Expand Down
50 changes: 48 additions & 2 deletions tests/manager/api/test_axis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import urllib
from functools import partial
from typing import TYPE_CHECKING, cast
from unittest.mock import MagicMock, Mock, PropertyMock
from unittest.mock import MagicMock, Mock, PropertyMock, patch

import pytest
from sqlalchemy.orm.exc import ObjectDeletedError, StaleDataError

from palace.manager.api.axis import (
AudiobookMetadataParser,
Expand Down Expand Up @@ -63,8 +64,9 @@
from palace.manager.sqlalchemy.model.datasource import DataSource
from palace.manager.sqlalchemy.model.edition import Edition
from palace.manager.sqlalchemy.model.identifier import Identifier
from palace.manager.sqlalchemy.model.licensing import DeliveryMechanism
from palace.manager.sqlalchemy.model.licensing import DeliveryMechanism, LicensePool
from palace.manager.sqlalchemy.model.resource import Hyperlink, Representation
from palace.manager.sqlalchemy.util import get_one_or_create
from palace.manager.util.datetime_helpers import datetime_utc, utc_now
from palace.manager.util.flask_util import Response
from palace.manager.util.http import RemoteIntegrationException
Expand Down Expand Up @@ -1006,6 +1008,9 @@ def test_process_book_updates_old_licensepool(self, axis360: Axis360Fixture):
identifier_type=Identifier.AXIS_360_ID,
identifier_id="0003642860",
)
licensepool, _ = get_one_or_create(
axis360.db.session, LicensePool, id=licensepool.id
)
# We start off with availability information based on the
# default for test data.
assert 1 == licensepool.licenses_owned
Expand All @@ -1025,6 +1030,47 @@ def test_process_book_updates_old_licensepool(self, axis360: Axis360Fixture):
# Now we have information based on the CirculationData.
assert 9 == licensepool.licenses_owned

# def test_retry_failure

def test_retry(self, axis360: Axis360Fixture):
monitor = Axis360CirculationMonitor(
axis360.db.session,
axis360.collection,
api_class=MockAxis360API,
)

edition, licensepool = axis360.db.edition(
with_license_pool=True,
identifier_type=Identifier.AXIS_360_ID,
identifier_id="012345678",
)
identifier = IdentifierData(
type=licensepool.identifier.type,
identifier=licensepool.identifier.identifier,
)

metadata = Metadata(DataSource.AXIS_360, primary_identifier=identifier)

with patch("tests.mocks.axis.MockAxis360API.update_book") as update_book, patch(
"tests.mocks.axis.MockAxis360API.recent_activity"
) as recent_activity:
update_book.side_effect = [
ObjectDeletedError({}, "object deleted"),
StaleDataError("stale data"),
]

update_book.return_value = (edition, False, licensepool, False)
recent_activity.return_value = [(metadata, metadata.circulation)]
monitor = Axis360CirculationMonitor(
axis360.db.session,
axis360.collection,
api_class=MockAxis360API,
)

monitor.run()

assert update_book.call_count == 3


class TestReaper:
def test_instantiate(self, axis360: Axis360Fixture):
Expand Down
Loading

0 comments on commit 2963329

Please sign in to comment.