Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle property migration write conflicts #412 #433

Merged
merged 12 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion inventory_management_system_api/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
Module for connecting to a MongoDB database.
"""

from typing import Annotated
from contextlib import contextmanager
from typing import Annotated, Generator

from fastapi import Depends
from pymongo import MongoClient
from pymongo.client_session import ClientSession
from pymongo.database import Database
from pymongo.errors import OperationFailure

from inventory_management_system_api.core.config import config
from inventory_management_system_api.core.exceptions import WriteConflictError

db_config = config.database
mongodb_client = MongoClient(
Expand All @@ -28,4 +32,28 @@ def get_database() -> Database:
return mongodb_client[db_config.name.get_secret_value()]


@contextmanager
def start_session_transaction(action_description: str) -> Generator[ClientSession, None, None]:
"""
Starts a MongoDB session followed by a transaction and returns the session to use.

Also handles write conflicts.

:param action_description: Description of what the transaction is doing so it can be used in any raised errors.
:raises WriteConflictError: If there a write conflict during the transaction.
:returns: MongoDB session that has a transaction started on it.
"""

with mongodb_client.start_session() as session:
with session.start_transaction():
try:
yield session
except OperationFailure as exc:
if "write conflict" in str(exc).lower():
raise WriteConflictError(
f"Write conflict while {action_description}. Please try again later."
) from exc
raise exc


DatabaseDep = Annotated[Database, Depends(get_database)]
6 changes: 6 additions & 0 deletions inventory_management_system_api/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,9 @@ class InvalidActionError(DatabaseError):
"""
Exception raised when trying to update an item's catalogue item ID
"""


class WriteConflictError(DatabaseError):
"""
Exception raised when a transaction has a write conflict.
"""
11 changes: 10 additions & 1 deletion inventory_management_system_api/routers/v1/catalogue_category.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
InvalidObjectIdError,
LeafCatalogueCategoryError,
MissingRecordError,
WriteConflictError,
)
from inventory_management_system_api.schemas.breadcrumbs import BreadcrumbsGetSchema
from inventory_management_system_api.schemas.catalogue_category import (
CATALOGUE_CATEGORY_WITH_CHILD_NON_EDITABLE_FIELDS,
CatalogueCategoryPatchSchema,
CatalogueCategoryPostSchema,
CatalogueCategoryPropertyPatchSchema,
CatalogueCategoryPropertyPostSchema,
CatalogueCategoryPropertySchema,
CatalogueCategorySchema,
CATALOGUE_CATEGORY_WITH_CHILD_NON_EDITABLE_FIELDS,
)
from inventory_management_system_api.services.catalogue_category import CatalogueCategoryService
from inventory_management_system_api.services.catalogue_category_property import CatalogueCategoryPropertyService
Expand Down Expand Up @@ -272,6 +273,10 @@ def create_property(
message = str(exc)
logger.exception(message)
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message) from exc
except WriteConflictError as exc:
message = str(exc)
logger.exception(message)
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=message) from exc


@router.patch(
Expand Down Expand Up @@ -320,3 +325,7 @@ def partial_update_property(
message = str(exc)
logger.exception(message)
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message) from exc
except WriteConflictError as exc:
message = str(exc)
logger.exception(message)
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=message) from exc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from fastapi import Depends

from inventory_management_system_api.core.database import mongodb_client
from inventory_management_system_api.core.database import start_session_transaction
from inventory_management_system_api.core.exceptions import InvalidActionError, MissingRecordError
from inventory_management_system_api.models.catalogue_category import (
AllowedValues,
Expand Down Expand Up @@ -105,32 +105,31 @@ def create(
)

# Run all subsequent edits within a transaction to ensure they will all succeed or fail together
with mongodb_client.start_session() as session:
with session.start_transaction():
# Firstly update the catalogue category
catalogue_category_property_out = self._catalogue_category_repository.create_property(
catalogue_category_id, catalogue_category_property_in, session=session
)
with start_session_transaction("adding property") as session:
# Firstly update the catalogue category
catalogue_category_property_out = self._catalogue_category_repository.create_property(
catalogue_category_id, catalogue_category_property_in, session=session
)

property_in = PropertyIn(
id=str(catalogue_category_property_in.id),
name=catalogue_category_property_in.name,
value=catalogue_category_property.default_value,
unit=unit_value,
unit_id=catalogue_category_property.unit_id,
)
property_in = PropertyIn(
id=str(catalogue_category_property_in.id),
name=catalogue_category_property_in.name,
value=catalogue_category_property.default_value,
unit=unit_value,
unit_id=catalogue_category_property.unit_id,
)

# Add property to all catalogue items of the catalogue category
self._catalogue_item_repository.insert_property_to_all_matching(
catalogue_category_id, property_in, session=session
)
# Add property to all catalogue items of the catalogue category
self._catalogue_item_repository.insert_property_to_all_matching(
catalogue_category_id, property_in, session=session
)

# Add property to all items of the catalogue items
# Obtain a list of ids to do this rather than iterate one by one as its faster. Limiting factor
# would be memory to store these ids and the network bandwidth it takes to send the request to the
# database but for 10000 items being updated this only takes 4.92 KB
catalogue_item_ids = self._catalogue_item_repository.list_ids(catalogue_category_id, session=session)
self._item_repository.insert_property_to_all_in(catalogue_item_ids, property_in, session=session)
# Add property to all items of the catalogue items
# Obtain a list of ids to do this rather than iterate one by one as its faster. Limiting factor
# would be memory to store these ids and the network bandwidth it takes to send the request to the
# database but for 10000 items being updated this only takes 4.92 KB
catalogue_item_ids = self._catalogue_item_repository.list_ids(catalogue_category_id, session=session)
self._item_repository.insert_property_to_all_in(catalogue_item_ids, property_in, session=session)

return catalogue_category_property_out

Expand Down Expand Up @@ -228,20 +227,19 @@ def update(
property_in = CatalogueCategoryPropertyIn(**{**existing_property_out.model_dump(), **update_data})

# Run all subsequent edits within a transaction to ensure they will all succeed or fail together
with mongodb_client.start_session() as session:
with session.start_transaction():
# Firstly update the catalogue category
property_out = self._catalogue_category_repository.update_property(
catalogue_category_id, catalogue_category_property_id, property_in, session=session
)
with start_session_transaction("updating property") as session:
# Firstly update the catalogue category
property_out = self._catalogue_category_repository.update_property(
catalogue_category_id, catalogue_category_property_id, property_in, session=session
)

# Avoid propagating changes unless absolutely necessary
if updating_name:
self._catalogue_item_repository.update_names_of_all_properties_with_id(
catalogue_category_property_id, catalogue_category_property.name, session=session
)
self._item_repository.update_names_of_all_properties_with_id(
catalogue_category_property_id, catalogue_category_property.name, session=session
)
# Avoid propagating changes unless absolutely necessary
if updating_name:
self._catalogue_item_repository.update_names_of_all_properties_with_id(
catalogue_category_property_id, catalogue_category_property.name, session=session
)
self._item_repository.update_names_of_all_properties_with_id(
catalogue_category_property_id, catalogue_category_property.name, session=session
)

return property_out
55 changes: 55 additions & 0 deletions test/unit/core/test_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
Unit tests for functions inside the `database` module.
"""

from unittest.mock import patch

import pytest
from pymongo.errors import OperationFailure

from inventory_management_system_api.core.database import start_session_transaction
from inventory_management_system_api.core.exceptions import WriteConflictError


@patch("inventory_management_system_api.core.database.mongodb_client")
def test_start_session_transaction(mock_mongodb_client):
"""Test `start_session_transaction`."""

expected_session = mock_mongodb_client.start_session.return_value.__enter__.return_value

with start_session_transaction("testing") as session:
pass

assert expected_session == session
expected_session.start_transaction.assert_called_once()


@patch("inventory_management_system_api.core.database.mongodb_client")
def test_start_session_transaction_with_operation_failure(mock_mongodb_client):
"""Test `start_session_transaction` when there is an operation failure inside the transaction."""

expected_session = mock_mongodb_client.start_session.return_value.__enter__.return_value

with pytest.raises(OperationFailure) as exc:
with start_session_transaction("testing") as session:
raise OperationFailure("Some operation error.")

assert expected_session == session
expected_session.start_transaction.assert_called_once()
assert str(exc.value) == "Some operation error."


@patch("inventory_management_system_api.core.database.mongodb_client")
def test_start_session_transaction_with_operation_failure_write_conflict(mock_mongodb_client):
"""Test `start_session_transaction` when there is an operation failure due to a write conflict inside the
transaction."""

expected_session = mock_mongodb_client.start_session.return_value.__enter__.return_value

with pytest.raises(WriteConflictError) as exc:
with start_session_transaction("testing") as session:
raise OperationFailure("Write conflict during plan execution and yielding is disabled.")

assert expected_session == session
expected_session.start_transaction.assert_called_once()
assert str(exc.value) == "Write conflict while testing. Please try again later."
16 changes: 8 additions & 8 deletions test/unit/services/test_catalogue_category_property.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class CatalogueCategoryPropertyServiceDSL(BaseCatalogueServiceDSL):
"""Base class for `CatalogueCategoryPropertyService` unit tests."""

wrapped_utils: Mock
mock_mongodb_client: Mock
mock_start_session_transaction: Mock
mock_catalogue_category_repository: Mock
mock_catalogue_item_repository: Mock
mock_item_repository: Mock
Expand Down Expand Up @@ -74,9 +74,9 @@ def setup(
self.catalogue_category_property_service = catalogue_category_property_service

with patch(
"inventory_management_system_api.services.catalogue_category_property.mongodb_client"
) as mocked_mongo_db_client:
self.mock_mongodb_client = mocked_mongo_db_client
"inventory_management_system_api.services.catalogue_category_property.start_session_transaction"
) as mocked_start_session_transaction:
self.mock_start_session_transaction = mocked_start_session_transaction

with patch(
"inventory_management_system_api.services.catalogue_category_property.utils", wraps=utils
Expand Down Expand Up @@ -203,8 +203,8 @@ def check_create_success(self) -> None:
)

# Session/Transaction
expected_session = self.mock_mongodb_client.start_session.return_value.__enter__.return_value
expected_session.start_transaction.assert_called_once()
self.mock_start_session_transaction.assert_called_once_with("adding property")
expected_session = self.mock_start_session_transaction.return_value.__enter__.return_value

# Catalogue category

Expand Down Expand Up @@ -481,8 +481,8 @@ def check_update_success(self) -> None:
self.wrapped_utils.check_duplicate_property_names.assert_called_once_with([modified_catalogue_category_out])

# Session/Transaction
expected_session = self.mock_mongodb_client.start_session.return_value.__enter__.return_value
expected_session.start_transaction.assert_called_once()
self.mock_start_session_transaction.assert_called_once_with("updating property")
expected_session = self.mock_start_session_transaction.return_value.__enter__.return_value

# Catalogue category
self.mock_catalogue_category_repository.update_property.assert_called_once_with(
Expand Down
Loading