diff --git a/.github/workflows/.ci.yml b/.github/workflows/.ci.yml index 88722026..17591f91 100644 --- a/.github/workflows/.ci.yml +++ b/.github/workflows/.ci.yml @@ -16,10 +16,10 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout repository - uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3 + uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4.1.0 - name: Set up Python - uses: actions/setup-python@bd6b4b6205c4dbad673328db7b31b7fab9e241c0 # v4.6.1 + uses: actions/setup-python@65d7f2d534ac1bc67fcd62888c5f4f3d2cb2b236 # v4.7.1 with: python-version: "3.10" cache: "pip" @@ -36,10 +36,10 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout repository - uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3 + uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4.1.0 - name: Set up Python - uses: actions/setup-python@bd6b4b6205c4dbad673328db7b31b7fab9e241c0 # v4.6.1 + uses: actions/setup-python@65d7f2d534ac1bc67fcd62888c5f4f3d2cb2b236 # v4.7.1 with: python-version: "3.10" cache: "pip" @@ -63,10 +63,10 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout repository - uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3 + uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4.1.0 - name: Start MongoDB - uses: supercharge/mongodb-github-action@d26215f71b2ce60420a2a3776a25893d11a65f85 #v1.9.0 + uses: supercharge/mongodb-github-action@b0a1493307c4e9b82ed61f3858d606c5ff190c64 #v1.10.0 with: mongodb-version: "6.0" mongodb-username: root @@ -74,7 +74,7 @@ jobs: mongodb-db: test-ims - name: Set up Python - uses: actions/setup-python@bd6b4b6205c4dbad673328db7b31b7fab9e241c0 # v4.6.1 + uses: actions/setup-python@65d7f2d534ac1bc67fcd62888c5f4f3d2cb2b236 # v4.7.1 with: python-version: "3.10" cache: "pip" @@ -91,3 +91,36 @@ jobs: - name: Run e2e tests run: DATABASE__NAME="test-ims" pytest test/e2e/ --cov + + docker: + # This job triggers only if all the other jobs succeed and does different things depending on the context. + # The job builds the Docker image in all cases and also pushes the image to Harbor only if something is + # pushed to the develop branch. + needs: [linting, unit-tests, e2e-tests] + name: Docker + runs-on: ubuntu-latest + steps: + - name: Check out repo + uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4.1.0 + + - name: Login to Harbor + uses: docker/login-action@343f7c4344506bcbf9b4de18042ae17996df046d # v3.0.0 + with: + registry: harbor.stfc.ac.uk/inventory-management-system + username: ${{ secrets.HARBOR_USERNAME }} + password: ${{ secrets.HARBOR_TOKEN }} + + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@96383f45573cb7f253c731d3b3ab81c87ef81934 # v5.0.0 + with: + images: harbor.stfc.ac.uk/inventory-management-system/ims-api + + - name: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' && 'Build and push Docker image to Harbor' || 'Build Docker image' }} + uses: docker/build-push-action@0565240e2d4ab88bba5387d719585280857ece09 # v5.0.0 + with: + context: . + file: ./Dockerfile.prod + push: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} diff --git a/.gitignore b/.gitignore index 6cd4b56b..bdd80b03 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,8 @@ .vscode/ mongodb/data/* !mongodb/data/.keep -logging.ini +inventory_management_system_api/logging.ini + # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/Dockerfile.prod b/Dockerfile.prod new file mode 100644 index 00000000..e0a32c81 --- /dev/null +++ b/Dockerfile.prod @@ -0,0 +1,38 @@ +FROM python:3.10-alpine3.18 + +WORKDIR /inventory-management-system-api-run + +COPY README.md pyproject.toml ./ +# Copy inventory_management_system_api source files +COPY inventory_management_system_api/ inventory_management_system_api/ + +RUN set -eux; \ + \ + # Install pip dependencies \ + python -m pip install --no-cache-dir .; \ + \ + # Create loging.ini from its .example file \ + cp inventory_management_system_api/logging.example.ini inventory_management_system_api/logging.ini; \ + \ + # Create a non-root user to run as \ + addgroup -S inventory-management-system-api; \ + adduser -S -D -G inventory-management-system-api -H -h /inventory-management-system-api-run inventory-management-system-api; \ + \ + # Create a log file \ + touch inventory-management-system-api.log; \ + # Change ownership of log file - app will need to write to it + chown -R inventory-management-system-api:inventory-management-system-api inventory-management-system-api.log; + +USER inventory-management-system-api + +ENV API__TITLE="Inventory Management System API" +ENV API__DESCRIPTION="This is the API for the Inventory Management System" +ENV DATABASE__PROTOCOL=mongodb +ENV DATABASE__USERNAME=root +ENV DATABASE__PASSWORD=example +ENV DATABASE__HOSTNAME=localhost +ENV DATABASE__PORT=27017 +ENV DATABASE__NAME=ims + +CMD ["uvicorn", "inventory_management_system_api.main:app", "--host", "0.0.0.0", "--port", "8000"] +EXPOSE 8000 diff --git a/README.md b/README.md index 7d131de7..716f6a77 100644 --- a/README.md +++ b/README.md @@ -2,65 +2,80 @@ ## How to Run -This is a Python microservice created using FastAPI and requires a MongoDB instance to run against. If you are using -Docker to run the application, the `docker-compose.yml file` has already been configured to start a MongoDB instance -that can be accessed at `localhost:27017` using `root` as the username and `example` as the password. +This is a Python microservice created using FastAPI and requires a MongoDB instance to run against. ### Prerequisites - Docker installed (if you want to run the microservice inside Docker) - Python 3.10 (or above) and MongoDB 6.0 installed on your machine if you are not using Docker - [MongoDB Compass](https://www.mongodb.com/products/compass) installed (if you want to interact with the database using a GUI) +- This repository cloned ### Docker Setup +The easiest way to run the application with Docker for local development is using the `docker-compose.yml` file. It is +configured to spin up a MongoDB instance that can be accessed at `localhost:27017` using `root` as the username and +`example` as the password. It also starts the application in a reload mode using the `Dockerfile`. Use the `Dockerfile` +or `Dockerfile.prod` to run just the application itself in a container. The former is for local development and must +not be used in production. -1. Ensure that Docker is installed and running on your machine. -2. Clone the repository and navigate to the project directory: - ```bash - git clone git@github.com:ral-facilities/inventory-management-system-api.git - cd inventory-management-system-api -3. Create a `logging.ini` file. +Ensure that Docker is installed and running on your machine before proceeding. + +#### Using Docker Compose File +1. Create a `logging.ini` file from the example in the root of the project directory: ```bash cp logging.example.ini logging.ini ``` -4. Build and start the Docker containers: +2. Build and start the Docker containers: ```bash docker-compose up ``` - The microservice should now be running inside Docker at http://localhost:8000. The Swagger UI can be accessed - at http://localhost:8000/docs. + The microservice should now be running inside Docker at http://localhost:8000 and its Swagger UI could be accessed + at http://localhost:8000/docs. A MongoDB instance should also be running at http://localhost:27017. -### Local Setup +#### Using Dockerfiles + +1. Build an image using either the `Dockerfile` or `Dockerfile.prod` from the root of the project directory: + ```bash + docker build -f Dockerfile.prod -t ims_api_image . + ``` -1. Clone the repository and navigate to the project directory: +2. Start the container using the image built and map it to port `8000` locally: ```bash - git clone git@github.com:ral-facilities/inventory-management-system-api.git - cd inventory-management-system-api + docker run -p 8000:8000 --name ims_api_container ims_api_image ``` -2. Create a Python virtual environment and activate it: + or with values for the environment variables: + ```bash + docker run -p 8000:8000 --name ims_api_container --env DATABASE__NAME=test-ims ims_api_image + ``` + The microservice should now be running inside Docker at http://localhost:8000 and its Swagger UI could be accessed + at http://localhost:8000/docs. + +### Local Setup + +Ensure that MongoDB is installed and running on your machine before proceeding. + +1. Create a Python virtual environment and activate it in the root of the project directory: ```bash python -m venv venv source venv/bin/activate ``` -3. Install the required dependencies using pip: +2. Install the required dependencies using pip: ```bash pip install .[dev] ``` -4. Create a `.env` file using the `.env.example` file and modify the values inside accordingly. -5. Create a `logging.ini` file using the `logging.example.ini` file and modify it accordingly. -6. Ensure that MongoDB is running locally. If it's not installed, you can follow the official MongoDB installation guide - for your operating system. -7. Start the microservice using Uvicorn from the project directory: +3. Create a `.env` file using the `.env.example` file and modify the values inside accordingly. +4. Create a `logging.ini` file using the `logging.example.ini` file and modify it accordingly. +5. Start the microservice using Uvicorn: ```bash uvicorn inventory_management_system_api.main:app --log-config inventory_management_system_api/logging.ini --reload ``` The microservice should now be running locally at http://localhost:8000. The Swagger UI can be accessed at http://localhost:8000/docs. -8. To run the unit tests, run : +6. To run the unit tests, run : ```bash pytest test/unit/ ``` -9. To run the e2e tests, ensure that MongoDB is running locally and run: +7. To run the e2e tests, run: ```bash DATABASE__NAME="test-ims" pytest test/e2e/ ``` diff --git a/inventory_management_system_api/main.py b/inventory_management_system_api/main.py index ee546b1c..1ff75298 100644 --- a/inventory_management_system_api/main.py +++ b/inventory_management_system_api/main.py @@ -11,8 +11,7 @@ from inventory_management_system_api.core.config import config from inventory_management_system_api.core.logger_setup import setup_logger -from inventory_management_system_api.routers.v1 import catalogue_category, manufacturer -from inventory_management_system_api.routers.v1 import catalogue_item +from inventory_management_system_api.routers.v1 import catalogue_category, catalogue_item, system, manufacturer app = FastAPI(title=config.api.title, description=config.api.description) @@ -66,6 +65,7 @@ async def custom_validation_exception_handler(request: Request, exc: RequestVali app.include_router(catalogue_category.router) app.include_router(catalogue_item.router) app.include_router(manufacturer.router) +app.include_router(system.router) @app.get("/") diff --git a/inventory_management_system_api/models/system.py b/inventory_management_system_api/models/system.py new file mode 100644 index 00000000..8fba4eda --- /dev/null +++ b/inventory_management_system_api/models/system.py @@ -0,0 +1,43 @@ +""" +Module for defining the database models for representing a System +""" + +from typing import Optional + +from pydantic import BaseModel, Field + +from inventory_management_system_api.models.custom_object_id_data_types import CustomObjectIdField, StringObjectIdField + + +class SystemIn(BaseModel): + """ + Input database model for a System + """ + + name: str + location: str + owner: str + importance: str + description: str + + # Used for uniqueness checks (sanitised name) + code: str + # These two are purely for front end navigation + path: str + parent_path: str + + parent_id: Optional[CustomObjectIdField] = None + + +class SystemOut(SystemIn): + """ + Output database model for a System + """ + + id: StringObjectIdField = Field(alias="_id") + parent_id: Optional[StringObjectIdField] = None + + # Required just for unit tests + class Config: + # pylint: disable=C0115 + allow_population_by_field_name = True diff --git a/inventory_management_system_api/repositories/catalogue_category.py b/inventory_management_system_api/repositories/catalogue_category.py index 6ec5dfa2..e73d874f 100644 --- a/inventory_management_system_api/repositories/catalogue_category.py +++ b/inventory_management_system_api/repositories/catalogue_category.py @@ -2,7 +2,7 @@ Module for providing a repository for managing catalogue categories in a MongoDB database. """ import logging -from typing import Optional, List +from typing import List, Optional from fastapi import Depends from pymongo.collection import Collection @@ -11,11 +11,12 @@ from inventory_management_system_api.core.custom_object_id import CustomObjectId from inventory_management_system_api.core.database import get_database from inventory_management_system_api.core.exceptions import ( - MissingRecordError, - DuplicateRecordError, ChildrenElementsExistError, + DuplicateRecordError, + MissingRecordError, ) from inventory_management_system_api.models.catalogue_category import CatalogueCategoryIn, CatalogueCategoryOut +from inventory_management_system_api.repositories import utils logger = logging.getLogger() @@ -72,7 +73,7 @@ def delete(self, catalogue_category_id: str) -> None: :raises MissingRecordError: If the catalogue category doesn't exist. """ catalogue_category_id = CustomObjectId(catalogue_category_id) - if self._has_children_elements(str(catalogue_category_id)): + if self._has_child_elements(catalogue_category_id): raise ChildrenElementsExistError( f"Catalogue category with ID {str(catalogue_category_id)} has children elements and cannot be deleted" ) @@ -96,7 +97,7 @@ def get(self, catalogue_category_id: str) -> Optional[CatalogueCategoryOut]: return CatalogueCategoryOut(**catalogue_category) return None - def update(self, catalogue_category_id: str, catalogue_category: CatalogueCategoryIn): + def update(self, catalogue_category_id: str, catalogue_category: CatalogueCategoryIn) -> CatalogueCategoryOut: """ Update a catalogue category by its ID in a MongoDB database. @@ -113,7 +114,7 @@ def update(self, catalogue_category_id: str, catalogue_category: CatalogueCatego :raises DuplicateRecordError: If a duplicate catalogue category is found within the parent catalogue category. """ catalogue_category_id = CustomObjectId(catalogue_category_id) - if self._has_children_elements(str(catalogue_category_id)): + if self._has_child_elements(catalogue_category_id): raise ChildrenElementsExistError( f"Catalogue category with ID {str(catalogue_category_id)} has children elements and cannot be updated" ) @@ -145,18 +146,7 @@ def list(self, path: Optional[str], parent_path: Optional[str]) -> List[Catalogu :return: A list of catalogue categories, or an empty list if no catalogue categories are returned by the database. """ - query = {} - if path: - query["path"] = path - if parent_path: - query["parent_path"] = parent_path - - message = "Retrieving all catalogue categories from the database" - if not query: - logger.info(message) - else: - logger.info("%s matching the provided filter(s)", message) - logger.debug("Provided filter(s): %s", query) + query = utils.path_query(path, parent_path, "catalogue categories") catalogue_categories = self._catalogue_categories_collection.find(query) return [CatalogueCategoryOut(**catalogue_category) for catalogue_category in catalogue_categories] @@ -176,7 +166,7 @@ def _is_duplicate_catalogue_category(self, parent_id: Optional[str], code: str) count = self._catalogue_categories_collection.count_documents({"parent_id": parent_id, "code": code}) return count > 0 - def _has_children_elements(self, catalogue_category_id: str) -> bool: + def _has_child_elements(self, catalogue_category_id: CustomObjectId) -> bool: """ Check if a catalogue category has children elements based on its ID. @@ -186,7 +176,6 @@ def _has_children_elements(self, catalogue_category_id: str) -> bool: :param catalogue_category_id: The ID of the catalogue category to check. :return: True if the catalogue category has children elements, False otherwise. """ - catalogue_category_id = CustomObjectId(catalogue_category_id) logger.info("Checking if catalogue category with ID '%s' has children elements", catalogue_category_id) # Check if it has catalogue categories query = {"parent_id": catalogue_category_id} diff --git a/inventory_management_system_api/repositories/catalogue_item.py b/inventory_management_system_api/repositories/catalogue_item.py index e5b07ba9..8fe8ad4b 100644 --- a/inventory_management_system_api/repositories/catalogue_item.py +++ b/inventory_management_system_api/repositories/catalogue_item.py @@ -10,7 +10,7 @@ from inventory_management_system_api.core.custom_object_id import CustomObjectId from inventory_management_system_api.core.database import get_database -from inventory_management_system_api.core.exceptions import DuplicateRecordError +from inventory_management_system_api.core.exceptions import MissingRecordError from inventory_management_system_api.models.catalogue_item import CatalogueItemOut, CatalogueItemIn logger = logging.getLogger() @@ -34,20 +34,29 @@ def create(self, catalogue_item: CatalogueItemIn) -> CatalogueItemOut: """ Create a new catalogue item in a MongoDB database. - The method checks if a duplicate catalogue item is found within the catalogue category. - :param catalogue_item: The catalogue item to be created. :return: The created catalogue item. - :raises DuplicateRecordError: If a duplicate catalogue item is found within the catalogue category. """ - if self._is_duplicate_catalogue_item(str(catalogue_item.catalogue_category_id), catalogue_item.name): - raise DuplicateRecordError("Duplicate catalogue item found within the catalogue category") - logger.info("Inserting the new catalogue item into the database") result = self._collection.insert_one(catalogue_item.dict()) catalogue_item = self.get(str(result.inserted_id)) return catalogue_item + def delete(self, catalogue_item_id: str) -> None: + """ + Delete a catalogue item by its ID from a MongoDB database. + + :param catalogue_item_id: The ID of the catalogue item to delete. + :raises MissingRecordError: If the catalogue item doesn't exist. + """ + catalogue_item_id = CustomObjectId(catalogue_item_id) + # pylint: disable=fixme + # TODO - (when the relevant item logic is implemented) check if catalogue item has children elements + logger.info("Deleting catalogue item with ID: %s from the database", catalogue_item_id) + result = self._collection.delete_one({"_id": catalogue_item_id}) + if result.deleted_count == 0: + raise MissingRecordError(f"No catalogue item found with ID: {str(catalogue_item_id)}") + def get(self, catalogue_item_id: str) -> Optional[CatalogueItemOut]: """ Retrieve a catalogue item by its ID from a MongoDB database. @@ -62,6 +71,23 @@ def get(self, catalogue_item_id: str) -> Optional[CatalogueItemOut]: return CatalogueItemOut(**catalogue_item) return None + def update(self, catalogue_item_id: str, catalogue_item: CatalogueItemIn) -> CatalogueItemOut: + """ + Update a catalogue item by its ID in a MongoDB database. + + :param catalogue_item_id: The ID of the catalogue item to update. + :param catalogue_item: The catalogue item containing the update data. + :return: The updated catalogue item. + """ + catalogue_item_id = CustomObjectId(catalogue_item_id) + # pylint: disable=fixme + # TODO - (when the relevant item logic is implemented) check if catalogue item has children elements if the + # `catalogue_category_id` is being updated. + logger.info("Updating catalogue item with ID: %s in the database", catalogue_item_id) + self._collection.update_one({"_id": catalogue_item_id}, {"$set": catalogue_item.dict()}) + catalogue_item = self.get(str(catalogue_item_id)) + return catalogue_item + def list(self, catalogue_category_id: Optional[str]) -> List[CatalogueItemOut]: """ Retrieve all catalogue items from a MongoDB. @@ -83,15 +109,3 @@ def list(self, catalogue_category_id: Optional[str]) -> List[CatalogueItemOut]: catalogue_items = self._collection.find(query) return [CatalogueItemOut(**catalogue_item) for catalogue_item in catalogue_items] - - def _is_duplicate_catalogue_item(self, catalogue_category_id: str, name: str) -> bool: - """ - Check if a catalogue item with the same name already exists within the catalogue category. - - :param catalogue_category_id: The ID of the catalogue category to check for duplicates in. - :return: `True` if a duplicate catalogue item is found, `False` otherwise. - """ - logger.info("Checking if catalogue item with name '%s' already exists within the category", name) - catalogue_category_id = CustomObjectId(catalogue_category_id) - count = self._collection.count_documents({"catalogue_category_id": catalogue_category_id, "name": name}) - return count > 0 diff --git a/inventory_management_system_api/repositories/system.py b/inventory_management_system_api/repositories/system.py new file mode 100644 index 00000000..8919f526 --- /dev/null +++ b/inventory_management_system_api/repositories/system.py @@ -0,0 +1,137 @@ +""" +Module for providing a repository for managing System's in a MongoDB database +""" +import logging +from typing import Optional + +from fastapi import Depends +from pymongo.collection import Collection +from pymongo.database import Database + +from inventory_management_system_api.core.custom_object_id import CustomObjectId +from inventory_management_system_api.core.database import get_database +from inventory_management_system_api.core.exceptions import ( + ChildrenElementsExistError, + DuplicateRecordError, + MissingRecordError, +) +from inventory_management_system_api.models.system import SystemIn, SystemOut +from inventory_management_system_api.repositories import utils + +logger = logging.getLogger() + + +class SystemRepo: + """ + Repository for managing System's in a MongoDB database + """ + + def __init__(self, database: Database = Depends(get_database)) -> None: + """ + Initialise the `SystemRepo` with a MongoDB database instance + + :param database: Database to use + """ + self._database = database + self._systems_collection: Collection = self._database.systems + + def create(self, system: SystemIn) -> SystemOut: + """ + Create a new System in a MongoDB database + + If a parent system is specified by `parent_id`, then checks if that exists in the database and raises a + `MissingRecordError` if it doesn't exist. It also checks if a duplicate System is found within the parent + System and raises a `DuplicateRecordError` if it is. + + :param system: System to be created + :return: Created System + :raises MissingRecordError: If the parent System specified by `parent_id` doesn't exist + :raises DuplicateRecordError: If a duplicate System is found within the parent System + """ + parent_id = str(system.parent_id) if system.parent_id else None + if parent_id and not self.get(parent_id): + raise MissingRecordError(f"No parent System found with ID: {parent_id}") + + if self._is_duplicate_system(parent_id, system.code): + raise DuplicateRecordError("Duplicate System found within the parent System") + + logger.info("Inserting the new System into the database") + result = self._systems_collection.insert_one(system.dict()) + system = self.get(str(result.inserted_id)) + return system + + def delete(self, system_id: str) -> None: + """ + Delete a System by its ID from a MongoDB database + + The method checks if the system has any children and raises a `ChildrenElementsExistError` if it does + + :param system_id: ID of the System to delete + :raises ChildrenElementsExistError: If the System has child elements + :raises MissingRecordError: If the System doesn't exist + """ + system_id = CustomObjectId(system_id) + if self._has_child_elements(system_id): + raise ChildrenElementsExistError( + f"System with ID {str(system_id)} has child elements and cannot be deleted" + ) + + logger.info("Deleting system with ID: %s from the database", system_id) + result = self._systems_collection.delete_one({"_id": system_id}) + if result.deleted_count == 0: + raise MissingRecordError(f"No System found with ID: {str(system_id)}") + + def get(self, system_id: str) -> Optional[SystemOut]: + """ + Retrieve a System by its ID from a MongoDB database + + :param system_id: ID of the System to retrieve + :return: Retrieved System or `None` if not found + """ + system_id = CustomObjectId(system_id) + logger.info("Retrieving system with ID: %s from the database", system_id) + system = self._systems_collection.find_one({"_id": system_id}) + if system: + return SystemOut(**system) + return None + + def list(self, path: Optional[str], parent_path: Optional[str]) -> list[SystemOut]: + """ + Retrieve Systems from a MongoDB database based on the provided filters + + :param path: Path to filter Systems by + :param parent_path: Parent path to filter Systems by + :return: List of System's or an empty list if no Systems are retrieved + """ + query = utils.path_query(path, parent_path, "systems") + + systems = self._systems_collection.find(query) + return [SystemOut(**system) for system in systems] + + def _is_duplicate_system(self, parent_id: Optional[str], code: str) -> bool: + """ + Check if a System with the same code already exists within the parent System + + :param parent_id: ID of the parent System which can also be `None` + :param code: Code of the System to check for duplicates + :return: `True` if a duplicate System code is found under the given parent, `False` otherwise + """ + logger.info("Checking if System with code '%s' already exists within the parent System", code) + if parent_id: + parent_id = CustomObjectId(parent_id) + + count = self._systems_collection.count_documents({"parent_id": parent_id, "code": code}) + return count > 0 + + def _has_child_elements(self, system_id: CustomObjectId) -> bool: + """ + Check if a System has any child System's based on its ID + + :param system_id: ID of the System to check + :return: True if the System has child elements, False otherwise + """ + logger.info("Checking if system with ID '%s' has child elements", str(system_id)) + # Check if it has System's + query = {"parent_id": system_id} + count = self._systems_collection.count_documents(query) + return count > 0 diff --git a/inventory_management_system_api/repositories/utils.py b/inventory_management_system_api/repositories/utils.py new file mode 100644 index 00000000..c7c54ff1 --- /dev/null +++ b/inventory_management_system_api/repositories/utils.py @@ -0,0 +1,33 @@ +""" +Utility methods used in the repositories +""" + +import logging +from typing import Optional + +logger = logging.getLogger() + + +def path_query(path: Optional[str], parent_path: Optional[str], entity_type: str) -> dict: + """ + Constructs filters for a pymongo collection based on a given path and parent path while + also logging the action + + :param path: Path to filter the `entity_type` by + :param parent_path: Parent path to filter `entity_type` by + :param entity_type: Name of the entity type e.g. catalogue categories/systems (Used for logging) + :return: Dictionary representing the query to pass to a pymongo's Collection `find` function + """ + query = {} + if path: + query["path"] = path + if parent_path: + query["parent_path"] = parent_path + + message = f"Retrieving all {entity_type} from the database" + if not query: + logger.info(message) + else: + logger.info("%s matching the provided filter(s)", message) + logger.debug("Provided filter(s): %s", query) + return query diff --git a/inventory_management_system_api/routers/v1/catalogue_item.py b/inventory_management_system_api/routers/v1/catalogue_item.py index 5b1e93e1..b387408a 100644 --- a/inventory_management_system_api/routers/v1/catalogue_item.py +++ b/inventory_management_system_api/routers/v1/catalogue_item.py @@ -10,12 +10,15 @@ from inventory_management_system_api.core.exceptions import ( MissingRecordError, InvalidObjectIdError, - DuplicateRecordError, NonLeafCategoryError, InvalidCatalogueItemPropertyTypeError, MissingMandatoryCatalogueItemProperty, ) -from inventory_management_system_api.schemas.catalogue_item import CatalogueItemSchema, CatalogueItemPostRequestSchema +from inventory_management_system_api.schemas.catalogue_item import ( + CatalogueItemSchema, + CatalogueItemPostRequestSchema, + CatalogueItemPatchRequestSchema, +) from inventory_management_system_api.services.catalogue_item import CatalogueItemService logger = logging.getLogger() @@ -52,18 +55,15 @@ def get_catalogue_item( ) -> CatalogueItemSchema: # pylint: disable=missing-function-docstring logger.info("Getting catalogue item with ID: %s", catalogue_item_id) + message = "A catalogue item with such ID was not found" try: catalogue_item = catalogue_item_service.get(catalogue_item_id) if not catalogue_item: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="The requested catalogue item was not found" - ) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message) return CatalogueItemSchema(**catalogue_item.dict()) except InvalidObjectIdError as exc: logger.exception("The ID is not a valid ObjectId value") - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="The requested catalogue item was not found" - ) from exc + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message) from exc @router.post( @@ -85,14 +85,68 @@ def create_catalogue_item( logger.exception(str(exc)) raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)) from exc except (MissingRecordError, InvalidObjectIdError) as exc: - message = "The specified catalogue category ID does not exist in the database" + message = "The specified catalogue category ID does not exist" logger.exception(message) raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message) from exc - except DuplicateRecordError as exc: - message = "A catalogue item with the same name already exists within the catalogue category" + except NonLeafCategoryError as exc: + message = "Adding a catalogue item to a non-leaf catalogue category is not allowed" logger.exception(message) raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=message) from exc + + +@router.patch( + path="/{catalogue_item_id}", + summary="Update a catalogue item partially by ID", + response_description="Catalogue item updated successfully", +) +def partial_update_catalogue_item( + catalogue_item: CatalogueItemPatchRequestSchema, + catalogue_item_id: str = Path(description="The ID of the catalogue item to update"), + catalogue_item_service: CatalogueItemService = Depends(), +) -> CatalogueItemSchema: + # pylint: disable=missing-function-docstring + logger.info("Partially updating catalogue item with ID: %s", catalogue_item_id) + logger.debug("Catalogue item data: %s", catalogue_item) + try: + updated_catalogue_item = catalogue_item_service.update(catalogue_item_id, catalogue_item) + return CatalogueItemSchema(**updated_catalogue_item.dict()) + except (InvalidCatalogueItemPropertyTypeError, MissingMandatoryCatalogueItemProperty) as exc: + logger.exception(str(exc)) + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)) from exc + except (MissingRecordError, InvalidObjectIdError) as exc: + if ( + catalogue_item.catalogue_category_id + and catalogue_item.catalogue_category_id in str(exc) + or "catalogue category" in str(exc).lower() + ): + message = "The specified catalogue category ID does not exist" + logger.exception(message) + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message) from exc + + message = "A catalogue item with such ID was not found" + logger.exception(message) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message) from exc except NonLeafCategoryError as exc: message = "Adding a catalogue item to a non-leaf catalogue category is not allowed" logger.exception(message) raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=message) from exc + + +@router.delete( + path="/{catalogue_item_id}", + summary="Delete a catalogue item by ID", + response_description="Catalogue item deleted successfully", + status_code=status.HTTP_204_NO_CONTENT, +) +def delete_catalogue_item( + catalogue_item_id: str = Path(description="The ID of the catalogue item to delete"), + catalogue_item_service: CatalogueItemService = Depends(), +) -> None: + # pylint: disable=missing-function-docstring + logger.info("Deleting catalogue item with ID: %s", catalogue_item_id) + try: + catalogue_item_service.delete(catalogue_item_id) + except (MissingRecordError, InvalidObjectIdError) as exc: + message = "A catalogue item with such ID was not found" + logger.exception(message) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message) from exc diff --git a/inventory_management_system_api/routers/v1/system.py b/inventory_management_system_api/routers/v1/system.py new file mode 100644 index 00000000..ebc80128 --- /dev/null +++ b/inventory_management_system_api/routers/v1/system.py @@ -0,0 +1,103 @@ +""" +Module for providing an API router which defines routes for managing Systems using the `SystemService` +service. +""" + +import logging +from typing import Annotated, Optional + +from fastapi import APIRouter, Depends, HTTPException, Path, Query, status + +from inventory_management_system_api.core.exceptions import ( + ChildrenElementsExistError, + DuplicateRecordError, + InvalidObjectIdError, + MissingRecordError, +) +from inventory_management_system_api.schemas.system import SystemRequestSchema, SystemPostRequestSchema +from inventory_management_system_api.services.system import SystemService + +logger = logging.getLogger() + +router = APIRouter(prefix="/v1/systems", tags=["systems"]) + + +@router.get(path="/", summary="Get Systems", response_description="List of Systems") +def get_systems( + path: Annotated[Optional[str], Query(description="Filter Systems by path")] = None, + parent_path: Annotated[Optional[str], Query(description="Filter Systems by parent path")] = None, + system_service: SystemService = Depends(), +) -> list[SystemRequestSchema]: + # pylint: disable=missing-function-docstring + logger.info("Getting Systems") + if path: + logger.debug("Path filter: '%s'", path) + if parent_path: + logger.debug("Parent path filter: '%s'", parent_path) + + systems = system_service.list(path, parent_path) + return [SystemRequestSchema(**system.dict()) for system in systems] + + +@router.get(path="/{system_id}", summary="Get a System by ID", response_description="Single System") +def get_system( + system_id: Annotated[str, Path(description="ID of the System to get")], system_service: SystemService = Depends() +) -> SystemRequestSchema: + # pylint: disable=missing-function-docstring + logger.info("Getting System with ID: %s", system_service) + try: + system = system_service.get(system_id) + if not system: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="A System with such ID was not found") + return SystemRequestSchema(**system.dict()) + except InvalidObjectIdError as exc: + logger.exception("The ID is not a valid ObjectId value") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="A System with such ID was not found" + ) from exc + + +@router.post( + path="/", + summary="Create a new System", + response_description="The created System", + status_code=status.HTTP_201_CREATED, +) +def create_system(system: SystemPostRequestSchema, system_service: SystemService = Depends()) -> SystemRequestSchema: + # pylint: disable=missing-function-docstring + logger.info("Creating a new System") + logger.debug("System data : %s", system) + try: + system = system_service.create(system) + return SystemRequestSchema(**system.dict()) + except (MissingRecordError, InvalidObjectIdError) as exc: + message = "The specified parent System ID does not exist" + logger.exception(message) + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message) from exc + except DuplicateRecordError as exc: + message = "A System with the same name already exists within the same parent System" + logger.exception(message) + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=message) from exc + + +@router.delete( + path="/{system_id}", + summary="Delete a system by ID", + response_description="System deleted successfully", + status_code=status.HTTP_204_NO_CONTENT, +) +def delete_system( + system_id: str = Path(description="ID of the system to delete"), system_service: SystemService = Depends() +) -> None: + # pylint: disable=missing-function-docstring + logger.info("Deleting system with ID: %s", system_id) + try: + system_service.delete(system_id) + except (MissingRecordError, InvalidObjectIdError) as exc: + message = "System with such ID was not found" + logger.exception(message) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message) from exc + except ChildrenElementsExistError as exc: + message = "System has child elements and cannot be deleted" + logger.exception(message) + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=message) from exc diff --git a/inventory_management_system_api/schemas/catalogue_item.py b/inventory_management_system_api/schemas/catalogue_item.py index 0d4e4c10..4e6d2b48 100644 --- a/inventory_management_system_api/schemas/catalogue_item.py +++ b/inventory_management_system_api/schemas/catalogue_item.py @@ -47,6 +47,19 @@ class CatalogueItemPostRequestSchema(BaseModel): manufacturer: ManufacturerSchema = Field(description="The details of the manufacturer") +class CatalogueItemPatchRequestSchema(CatalogueItemPostRequestSchema): + """ + Schema model for a catalogue item update request. + """ + + catalogue_category_id: Optional[str] = Field( + description="The ID of the catalogue category that the catalogue item belongs to" + ) + name: Optional[str] = Field(description="The name of the catalogue item") + description: Optional[str] = Field(description="The catalogue item description") + manufacturer: Optional[ManufacturerSchema] = Field(description="The details of the manufacturer") + + class CatalogueItemSchema(CatalogueItemPostRequestSchema): """ Schema model for a catalogue item response. diff --git a/inventory_management_system_api/schemas/system.py b/inventory_management_system_api/schemas/system.py new file mode 100644 index 00000000..b1d3c75c --- /dev/null +++ b/inventory_management_system_api/schemas/system.py @@ -0,0 +1,41 @@ +""" +Module for defining the API schema models for representing Systems +""" +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + + +class SystemImportanceType(str, Enum): + """ + Enumeration for System importance types + """ + + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + + +class SystemPostRequestSchema(BaseModel): + """ + Schema model for a System creation request + """ + + name: str = Field(description="Name of the system") + location: str = Field(description="Location of the system") + owner: str = Field(description="Owner of the systems") + importance: SystemImportanceType = Field(description="Importance of the system") + description: str = Field(description="Description of the system") + parent_id: Optional[str] = Field(default=None, description="ID of the parent System (if applicable)") + + +class SystemRequestSchema(SystemPostRequestSchema): + """ + Schema models for System get request response + """ + + id: str = Field(description="ID of the System") + code: str = Field(description="Code of the System") + path: str = Field(description="Path to the System") + parent_path: str = Field(description="Path to the parent System") diff --git a/inventory_management_system_api/services/catalogue_category.py b/inventory_management_system_api/services/catalogue_category.py index 06979d39..d10a7c2d 100644 --- a/inventory_management_system_api/services/catalogue_category.py +++ b/inventory_management_system_api/services/catalogue_category.py @@ -2,8 +2,7 @@ Module for providing a service for managing catalogue categories using the `CatalogueCategoryRepo` repository. """ import logging -import re -from typing import Optional, List +from typing import List, Optional from fastapi import Depends @@ -11,9 +10,10 @@ from inventory_management_system_api.models.catalogue_category import CatalogueCategoryIn, CatalogueCategoryOut from inventory_management_system_api.repositories.catalogue_category import CatalogueCategoryRepo from inventory_management_system_api.schemas.catalogue_category import ( - CatalogueCategoryPostRequestSchema, CatalogueCategoryPatchRequestSchema, + CatalogueCategoryPostRequestSchema, ) +from inventory_management_system_api.services import utils logger = logging.getLogger() @@ -50,8 +50,8 @@ def create(self, catalogue_category: CatalogueCategoryPostRequestSchema) -> Cata if parent_catalogue_category and parent_catalogue_category.is_leaf: raise LeafCategoryError("Cannot add catalogue category to a leaf parent catalogue category") - code = self._generate_code(catalogue_category.name) - path = self._generate_path(parent_path, code) + code = utils.generate_code(catalogue_category.name, "catalogue category") + path = utils.generate_path(parent_path, code, "catalogue category") return self._catalogue_category_repository.create( CatalogueCategoryIn( name=catalogue_category.name, @@ -114,58 +114,22 @@ def update( if not stored_catalogue_category: raise MissingRecordError(f"No catalogue category found with ID: {catalogue_category_id}") - if "name" in update_data and update_data["name"] != stored_catalogue_category.name: - stored_catalogue_category.name = update_data["name"] - stored_catalogue_category.code = self._generate_code(stored_catalogue_category.name) - stored_catalogue_category.path = self._generate_path( - stored_catalogue_category.parent_path, stored_catalogue_category.code + if "name" in update_data and catalogue_category.name != stored_catalogue_category.name: + update_data["code"] = utils.generate_code(catalogue_category.name, "catalogue category") + update_data["path"] = utils.generate_path( + stored_catalogue_category.parent_path, update_data["code"], "catalogue category" ) - if "parent_id" in update_data and update_data["parent_id"] != stored_catalogue_category.parent_id: - stored_catalogue_category.parent_id = update_data["parent_id"] - parent_catalogue_category = ( - self.get(stored_catalogue_category.parent_id) if stored_catalogue_category.parent_id else None - ) - stored_catalogue_category.parent_path = parent_catalogue_category.path if parent_catalogue_category else "/" - stored_catalogue_category.path = self._generate_path( - stored_catalogue_category.parent_path, stored_catalogue_category.code - ) + if "parent_id" in update_data and catalogue_category.parent_id != stored_catalogue_category.parent_id: + parent_catalogue_category = self.get(catalogue_category.parent_id) if catalogue_category.parent_id else None + update_data["parent_path"] = parent_catalogue_category.path if parent_catalogue_category else "/" + code = update_data["code"] if "code" in update_data else stored_catalogue_category.code + update_data["path"] = utils.generate_path(update_data["parent_path"], code, "catalogue category") if parent_catalogue_category and parent_catalogue_category.is_leaf: raise LeafCategoryError("Cannot add catalogue category to a leaf parent catalogue category") - if "is_leaf" in update_data: - stored_catalogue_category.is_leaf = update_data["is_leaf"] - - if "catalogue_item_properties" in update_data: - stored_catalogue_category.catalogue_item_properties = update_data["catalogue_item_properties"] - + stored_catalogue_category = stored_catalogue_category.copy(update=update_data) return self._catalogue_category_repository.update( catalogue_category_id, CatalogueCategoryIn(**stored_catalogue_category.dict()) ) - - def _generate_code(self, name: str) -> str: - """ - Generate a code for a catalogue category based on its name. This is used to maintain uniqueness and prevent - duplicate subcategories within a category. - - The code is generated by converting the name to lowercase and replacing spaces with hyphens. Leading and - trailing spaces are removed, and consecutive spaces are replaced with a single hyphen. - - :param name: The name of the catalogue category. - :return: The generated code for the catalogue category. - """ - logger.info("Generating code for the catalogue category based on its name") - name = name.lower().strip() - return re.sub(r"\s+", "-", name) - - def _generate_path(self, parent_path: str, code: str) -> str: - """ - Generate a path for a catalogue category based on its code and the path from its parent. - - :param parent_path: The path of the parent catalogue category. - :param code: The code of the catalogue category. - :return: The generated path for the catalogue category. - """ - logger.info("Generating path for the catalogue category") - return f"{parent_path}{code}" if parent_path.endswith("/") else f"{parent_path}/{code}" diff --git a/inventory_management_system_api/services/catalogue_item.py b/inventory_management_system_api/services/catalogue_item.py index f7645726..e2d3692f 100644 --- a/inventory_management_system_api/services/catalogue_item.py +++ b/inventory_management_system_api/services/catalogue_item.py @@ -4,7 +4,7 @@ """ import logging from numbers import Number -from typing import Optional, List, Dict +from typing import Optional, List, Dict, Union from fastapi import Depends @@ -14,11 +14,16 @@ InvalidCatalogueItemPropertyTypeError, MissingMandatoryCatalogueItemProperty, ) +from inventory_management_system_api.models.catalogue_category import CatalogueItemProperty from inventory_management_system_api.models.catalogue_item import CatalogueItemOut, CatalogueItemIn from inventory_management_system_api.repositories.catalogue_category import CatalogueCategoryRepo from inventory_management_system_api.repositories.catalogue_item import CatalogueItemRepo from inventory_management_system_api.schemas.catalogue_category import CatalogueItemPropertyType -from inventory_management_system_api.schemas.catalogue_item import CatalogueItemPostRequestSchema +from inventory_management_system_api.schemas.catalogue_item import ( + CatalogueItemPostRequestSchema, + CatalogueItemPatchRequestSchema, + PropertyPostRequestSchema, +) logger = logging.getLogger() @@ -47,9 +52,8 @@ def create(self, catalogue_item: CatalogueItemPostRequestSchema) -> CatalogueIte Create a new catalogue item. The method checks if the catalogue category exists in the database and raises a `MissingRecordError` if it does - not. It also checks if the category is not a leaf category and raises a `NonLeafCategoryError` if it is. It - then proceeds to check for missing mandatory catalogue item properties, adds the property units, and finally - validates the property values. + not. It also checks if the category is not a leaf category and raises a `NonLeafCategoryError` if it is. It then + processes the catalogue item properties. :param catalogue_item: The catalogue item to be created. :return: The created catalogue item. @@ -64,30 +68,152 @@ def create(self, catalogue_item: CatalogueItemPostRequestSchema) -> CatalogueIte if catalogue_category.is_leaf is False: raise NonLeafCategoryError("Cannot add catalogue item to a non-leaf catalogue category") - defined_properties = { - defined_property.name: defined_property.dict() - for defined_property in catalogue_category.catalogue_item_properties - } - supplied_properties = { - supplied_property.name: supplied_property.dict() - for supplied_property in (catalogue_item.properties if catalogue_item.properties else []) - } - - self._check_missing_mandatory_catalogue_item_properties(defined_properties, supplied_properties) - supplied_properties = self._filter_matching_catalogue_item_properties(defined_properties, supplied_properties) - self._add_catalogue_item_property_units(defined_properties, supplied_properties) - self._validate_catalogue_item_property_values(defined_properties, supplied_properties) + defined_properties = catalogue_category.catalogue_item_properties + supplied_properties = catalogue_item.properties if catalogue_item.properties else [] + supplied_properties = self._process_catalogue_item_properties(defined_properties, supplied_properties) return self._catalogue_item_repository.create( CatalogueItemIn( catalogue_category_id=catalogue_item.catalogue_category_id, name=catalogue_item.name, description=catalogue_item.description, - properties=list(supplied_properties.values()), + properties=supplied_properties, manufacturer=catalogue_item.manufacturer, ) ) + def delete(self, catalogue_item_id: str) -> None: + """ + Delete a catalogue item by its ID. + + :param catalogue_item_id: The ID of the catalogue item to delete. + """ + return self._catalogue_item_repository.delete(catalogue_item_id) + + def get(self, catalogue_item_id: str) -> Optional[CatalogueItemOut]: + """ + Retrieve a catalogue item by its ID. + + :param catalogue_item_id: The ID of the catalogue item to retrieve. + :return: The retrieved catalogue item, or `None` if not found. + """ + return self._catalogue_item_repository.get(catalogue_item_id) + + def list(self, catalogue_category_id: Optional[str]) -> List[CatalogueItemOut]: + """ + Retrieve all catalogue items. + + :param catalogue_category_id: The ID of the catalogue category to filter catalogue items by. + :return: A list of catalogue items, or an empty list if no catalogue items are retrieved. + """ + return self._catalogue_item_repository.list(catalogue_category_id) + + def update(self, catalogue_item_id: str, catalogue_item: CatalogueItemPatchRequestSchema) -> CatalogueItemOut: + """ + Update a catalogue item by its ID. + + The method checks if the catalogue item exists in the database and raises a `MissingRecordError` if it does + not. If the catalogue category ID is being updated, it checks if catalogue category with such ID exists and + raises a MissingRecordError` if it does not. It also checks if the category is not a leaf category and raises a + `NonLeafCategoryError` if it is. If the catalogue item properties are being updated, it also processes them. + + :param catalogue_item_id: The ID of the catalogue item to update. + :param catalogue_item: The catalogue item containing the fields that need to be updated. + :return: The updated catalogue item. + """ + update_data = catalogue_item.dict(exclude_unset=True) + + stored_catalogue_item = self.get(catalogue_item_id) + if not stored_catalogue_item: + raise MissingRecordError(f"No catalogue item found with ID: {catalogue_item_id}") + + catalogue_category = None + if ( + "catalogue_category_id" in update_data + and catalogue_item.catalogue_category_id != stored_catalogue_item.catalogue_category_id + ): + catalogue_category = self._catalogue_category_repository.get(catalogue_item.catalogue_category_id) + if not catalogue_category: + raise MissingRecordError(f"No catalogue category found with ID: {catalogue_item.catalogue_category_id}") + + if catalogue_category.is_leaf is False: + raise NonLeafCategoryError("Cannot add catalogue item to a non-leaf catalogue category") + + # If the catalogue category ID is updated but no catalogue item properties are supplied then the stored + # catalogue item properties should be used. They need to be processed and validated against the defined + # properties of the new catalogue category. + if "properties" not in update_data: + # Create `PropertyPostRequestSchema` objects from the stored catalogue item properties and assign them + # to the `properties` field of `catalogue_item` before proceeding with processing and validation. + catalogue_item.properties = [ + PropertyPostRequestSchema(**prop.dict()) for prop in stored_catalogue_item.properties + ] + # Get the new `catalogue_item` state + update_data = catalogue_item.dict(exclude_unset=True) + + if "properties" in update_data: + if not catalogue_category: + catalogue_category = self._catalogue_category_repository.get( + stored_catalogue_item.catalogue_category_id + ) + + defined_properties = catalogue_category.catalogue_item_properties + supplied_properties = catalogue_item.properties + update_data["properties"] = self._process_catalogue_item_properties(defined_properties, supplied_properties) + + # Create a copy of `stored_catalogue_item`, updating its field values with the received partial updates + stored_catalogue_item = stored_catalogue_item.copy(update=update_data) + return self._catalogue_item_repository.update( + catalogue_item_id, CatalogueItemIn(**stored_catalogue_item.dict()) + ) + + def _process_catalogue_item_properties( + self, defined_properties: List[CatalogueItemProperty], supplied_properties: List[PropertyPostRequestSchema] + ) -> List[Dict]: + """ + Process and validate supplied catalogue item properties based on defined catalogue item properties. It checks + for missing mandatory catalogue item properties, files the matching catalogue item properties, adds the property + units, and finally validates the property values. + + The `supplied_properties_dict` dictionary may get modified as part of the processing and validation. + + :param defined_properties: The list of defined catalogue item property objects. + :param supplied_properties: The list of supplied catalogue item property objects. + :return: A list of processed and validated supplied catalogue item properties. + """ + # Convert properties to dictionaries for easier lookups + defined_properties_dict = self._create_catalogue_item_properties_dict(defined_properties) + supplied_properties_dict = self._create_catalogue_item_properties_dict(supplied_properties) + + # Some mandatory catalogue item properties may not have been supplied + self._check_missing_mandatory_catalogue_item_properties(defined_properties_dict, supplied_properties_dict) + # Catalogue item properties that have not been defined may have been supplied + supplied_properties_dict = self._filter_matching_catalogue_item_properties( + defined_properties_dict, supplied_properties_dict + ) + # Supplied catalogue item properties do not have units as we can't trust they would be correct + self._add_catalogue_item_property_units(defined_properties_dict, supplied_properties_dict) + # The values of the supplied catalogue item properties may not be of the expected types + self._validate_catalogue_item_property_values(defined_properties_dict, supplied_properties_dict) + + return list(supplied_properties_dict.values()) + + def _create_catalogue_item_properties_dict( + self, catalogue_item_properties: Union[List[CatalogueItemProperty], List[PropertyPostRequestSchema]] + ) -> Dict[str, Dict]: + """ + Convert a list of catalogue item property objects into a dictionary where the keys are the catalogue item + property names and the values are the catalogue item property dictionaries. + + :param catalogue_item_properties: The list of catalogue item property objects. + :return: A dictionary where the keys are the catalogue item property names and the values are the catalogue item + property dictionaries. + """ + return { + catalogue_item_property.name: catalogue_item_property.dict() + for catalogue_item_property in catalogue_item_properties + } + def _add_catalogue_item_property_units( self, defined_properties: Dict[str, Dict], @@ -97,7 +223,7 @@ def _add_catalogue_item_property_units( Add the units to the supplied properties. The supplied properties only contain a name and value so the units from the defined properties in the database - are added to the supplied properties. + are added to the supplied properties. This means that this method modifies the `supplied_properties` dictionary. :param defined_properties: The defined catalogue item properties stored as part of the catalogue category in the database. @@ -188,21 +314,3 @@ def _filter_matching_catalogue_item_properties( matching_properties[supplied_property_name] = supplied_property return matching_properties - - def get(self, catalogue_item_id: str) -> Optional[CatalogueItemOut]: - """ - Retrieve a catalogue item by its ID. - - :param catalogue_item_id: The ID of the catalogue item to retrieve. - :return: The retrieved catalogue item, or `None` if not found. - """ - return self._catalogue_item_repository.get(catalogue_item_id) - - def list(self, catalogue_category_id: Optional[str]) -> List[CatalogueItemOut]: - """ - Retrieve all catalogue items. - - :param catalogue_category_id: The ID of the catalogue category to filter catalogue items by. - :return: A list of catalogue items, or an empty list if no catalogue items are retrieved. - """ - return self._catalogue_item_repository.list(catalogue_category_id) diff --git a/inventory_management_system_api/services/system.py b/inventory_management_system_api/services/system.py new file mode 100644 index 00000000..d89d3033 --- /dev/null +++ b/inventory_management_system_api/services/system.py @@ -0,0 +1,83 @@ +""" +Module for providing a service for managing Systems using the `SystemRepo` repository +""" + +import logging +from typing import Optional + +from fastapi import Depends + +from inventory_management_system_api.models.system import SystemIn, SystemOut +from inventory_management_system_api.repositories.system import SystemRepo +from inventory_management_system_api.schemas.system import SystemPostRequestSchema +from inventory_management_system_api.services import utils + +logger = logging.getLogger() + + +class SystemService: + """ + Service for managing Systems + """ + + def __init__(self, system_repository: SystemRepo = Depends(SystemRepo)) -> None: + """ + Initialise the `SystemService` with a `SystemRepo` repository + + :param system_repository: `SystemRepo` repository to use + """ + self._system_repository = system_repository + + def create(self, system: SystemPostRequestSchema) -> SystemOut: + """ + Create a new System + + :param system: System to be created + :return: Created System + """ + parent_id = system.parent_id + parent_system = self.get(parent_id) if parent_id else None + parent_path = parent_system.path if parent_system else "/" + + code = utils.generate_code(system.name, "system") + path = utils.generate_path(parent_path, code, "system") + return self._system_repository.create( + SystemIn( + name=system.name, + location=system.location, + owner=system.owner, + importance=system.importance, + description=system.description, + code=code, + path=path, + parent_path=parent_path, + parent_id=parent_id, + ) + ) + + def delete(self, system_id: str) -> None: + """ + Delete a System by its ID + + :param system_id: ID of the System to delete + """ + return self._system_repository.delete(system_id) + + def get(self, system_id: str) -> Optional[SystemOut]: + """ + Retrieve a System by its ID + + :param system_id: ID of the System to retrieve + :return: Retrieved System or `None` if not found + """ + return self._system_repository.get(system_id) + + def list(self, path: Optional[str], parent_path: Optional[str]) -> list[SystemOut]: + """ + Retrieve Systems based on the provided filters + + :param path: Path to filter Systems by + :param parent_path: Parent path to filter Systems by + :return: List of System's or an empty list if no Systems are retrieved + """ + return self._system_repository.list(path, parent_path) diff --git a/inventory_management_system_api/services/utils.py b/inventory_management_system_api/services/utils.py new file mode 100644 index 00000000..fb3b873c --- /dev/null +++ b/inventory_management_system_api/services/utils.py @@ -0,0 +1,38 @@ +""" +Collection of some utility functions used by services +""" + +import logging +import re + +logger = logging.getLogger() + + +def generate_code(name: str, entity_type: str) -> str: + """ + Generate a code for an entity based on its name. This is used to maintain uniqueness and prevent + duplicates. (E.g. a duplicate subcategories within a category) + + The code is generated by converting the name to lowercase and replacing spaces with hyphens. Leading and + trailing spaces are removed, and consecutive spaces are replaced with a single hyphen + + :param name: The name of the entity + :param entity_type: Name of the entity type e.g. catalogue category/system (Used for logging) + :return: The generated code for the entity + """ + logger.info("Generating code for the %s based on its name", entity_type) + name = name.lower().strip() + return re.sub(r"\s+", "-", name) + + +def generate_path(parent_path: str, code: str, entity_type: str) -> str: + """ + Generate a path for a catalogue category based on its code and the path from its parent + + :param parent_path: The path of the parent catalogue category + :param code: The code of the catalogue category + :param entity_type: Name of the entity type e.g. catalogue category/system (Used for logging) + :return: The generated path for the catalogue category + """ + logger.info("Generating path for the %s", entity_type) + return f"{parent_path}{code}" if parent_path.endswith("/") else f"{parent_path}/{code}" diff --git a/pyproject.toml b/pyproject.toml index bee1a388..8c32c269 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,8 +7,8 @@ version = "0.0.1" dependencies = [ "fastapi[all]==0.99.1", - "pymongo==4.4.0", - "uvicorn==0.22.0" + "pymongo==4.5.0", + "uvicorn==0.23.2" ] [project.urls] @@ -21,7 +21,7 @@ code-analysis = [ ] formatting = [ - "black==23.3.0" + "black==23.9.1" ] test = [ diff --git a/test/e2e/conftest.py b/test/e2e/conftest.py index 8e4308df..64a786ac 100644 --- a/test/e2e/conftest.py +++ b/test/e2e/conftest.py @@ -27,3 +27,4 @@ def fixture_cleanup_database_collections(): yield database.catalogue_categories.delete_many({}) database.catalogue_items.delete_many({}) + database.systems.delete_many({}) diff --git a/test/e2e/test_catalogue_category.py b/test/e2e/test_catalogue_category.py index af8393bc..bf6df956 100644 --- a/test/e2e/test_catalogue_category.py +++ b/test/e2e/test_catalogue_category.py @@ -245,9 +245,6 @@ def test_delete_catalogue_category_with_invalid_id(test_client): """ Test deleting a catalogue category with an invalid ID. """ - catalogue_category_post = {"name": "Category A", "is_leaf": False} - test_client.post("/v1/catalogue-categories", json=catalogue_category_post) - response = test_client.delete("/v1/catalogue-categories/invalid") assert response.status_code == 404 @@ -258,9 +255,6 @@ def test_delete_catalogue_category_with_nonexistent_id(test_client): """ Test deleting a catalogue category with a nonexistent ID. """ - catalogue_category_post = {"name": "Category A", "is_leaf": False} - test_client.post("/v1/catalogue-categories", json=catalogue_category_post) - response = test_client.delete(f"/v1/catalogue-categories/{str(ObjectId())}") assert response.status_code == 404 diff --git a/test/e2e/test_catalogue_item.py b/test/e2e/test_catalogue_item.py index 71a6a1c9..303b503d 100644 --- a/test/e2e/test_catalogue_item.py +++ b/test/e2e/test_catalogue_item.py @@ -96,24 +96,6 @@ def test_create_catalogue_item(test_client): assert catalogue_item["manufacturer"] == catalogue_item_post["manufacturer"] -def test_create_catalogue_item_with_duplicate_name_within_catalogue_category(test_client): - """ - Test creating a catalogue item with a duplicate name within the catalogue category. - """ - response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) - catalogue_category = response.json() - - catalogue_item_post = get_catalogue_item_a_dict(catalogue_category["id"]) - test_client.post("/v1/catalogue-items", json=catalogue_item_post) - - response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) - - assert response.status_code == 409 - assert ( - response.json()["detail"] == "A catalogue item with the same name already exists within the catalogue category" - ) - - def test_create_catalogue_item_with_invalid_catalogue_category_id(test_client): """ Test creating a catalogue item with an invalid catalogue category id. @@ -122,7 +104,7 @@ def test_create_catalogue_item_with_invalid_catalogue_category_id(test_client): response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) assert response.status_code == 422 - assert response.json()["detail"] == "The specified catalogue category ID does not exist in the database" + assert response.json()["detail"] == "The specified catalogue category ID does not exist" def test_create_catalogue_item_with_nonexistent_catalogue_category_id(test_client): @@ -133,7 +115,7 @@ def test_create_catalogue_item_with_nonexistent_catalogue_category_id(test_clien response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) assert response.status_code == 422 - assert response.json()["detail"] == "The specified catalogue category ID does not exist in the database" + assert response.json()["detail"] == "The specified catalogue category ID does not exist" def test_create_catalogue_item_in_non_leaf_catalogue_category(test_client): @@ -268,6 +250,50 @@ def test_create_catalogue_item_with_invalid_value_type_for_boolean_property(test ) +def test_delete_catalogue_item(test_client): + """ + Test deleting a catalogue item. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + catalogue_item_post = get_catalogue_item_a_dict(response.json()["id"]) + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + catalogue_item = response.json() + + response = test_client.delete(f"/v1/catalogue-items/{catalogue_item['id']}") + + assert response.status_code == 204 + response = test_client.delete(f"/v1/catalogue-items/{catalogue_item['id']}") + assert response.status_code == 404 + + +def test_delete_catalogue_item_with_invalid_id(test_client): + """ + Test deleting a catalogue item with an invalid ID. + """ + response = test_client.delete("/v1/catalogue-items/invalid") + + assert response.status_code == 404 + assert response.json()["detail"] == "A catalogue item with such ID was not found" + + +def test_delete_catalogue_item_with_nonexistent_id(test_client): + """ + Test deleting a catalogue item with a nonexistent ID. + """ + response = test_client.delete(f"/v1/catalogue-items/{str(ObjectId())}") + + assert response.status_code == 404 + assert response.json()["detail"] == "A catalogue item with such ID was not found" + + +def test_delete_catalogue_item_with_children_items(): + """ + Test deleting a catalogue item with children items. + """ + # pylint: disable=fixme + # TODO - Implement this test when the relevant item logic is implemented + + def test_get_catalogue_item(test_client): """ Test getting a catalogue item. @@ -298,7 +324,7 @@ def test_get_catalogue_item_with_invalid_id(test_client): response = test_client.get("/v1/catalogue-items/invalid") assert response.status_code == 404 - assert response.json()["detail"] == "The requested catalogue item was not found" + assert response.json()["detail"] == "A catalogue item with such ID was not found" def test_get_catalogue_item_with_nonexistent_id(test_client): @@ -308,7 +334,7 @@ def test_get_catalogue_item_with_nonexistent_id(test_client): response = test_client.get(f"/v1/catalogue-items/{str(ObjectId())}") assert response.status_code == 404 - assert response.json()["detail"] == "The requested catalogue item was not found" + assert response.json()["detail"] == "A catalogue item with such ID was not found" def test_get_catalogue_items(test_client): @@ -402,3 +428,436 @@ def test_get_catalogue_items_with_invalid_catalogue_category_id_filter(test_clie catalogue_items = response.json() assert len(catalogue_items) == 0 + + +def test_partial_update_catalogue_item(test_client): + """ + Test changing the name and description of a catalogue item. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + catalogue_category = response.json() + + catalogue_category_id = catalogue_category["id"] + catalogue_item_post = get_catalogue_item_a_dict(catalogue_category_id) + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + catalogue_item_patch = {"name": "Catalogue Item B", "description": "This is Catalogue Item B"} + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 200 + + catalogue_item = response.json() + + catalogue_item_post["properties"][0]["unit"] = "mm" + catalogue_item_post["properties"][1]["unit"] = None + catalogue_item_post["properties"][2]["unit"] = "cm" + assert catalogue_item["catalogue_category_id"] == catalogue_category_id + assert catalogue_item["name"] == catalogue_item_patch["name"] + assert catalogue_item["description"] == catalogue_item_patch["description"] + assert catalogue_item["properties"] == catalogue_item_post["properties"] + assert catalogue_item["manufacturer"] == catalogue_item_post["manufacturer"] + + +def test_partial_update_catalogue_item_invalid_id(test_client): + """ + Test updating a catalogue item with an invalid ID. + """ + catalogue_item_patch = {"name": "Catalogue Item B", "description": "This is Catalogue Item B"} + + response = test_client.patch("/v1/catalogue-items/invalid", json=catalogue_item_patch) + + assert response.status_code == 404 + assert response.json()["detail"] == "A catalogue item with such ID was not found" + + +def test_partial_update_catalogue_item_nonexistent_id(test_client): + """ + Test updating a catalogue item with a nonexistent ID. + """ + catalogue_item_patch = {"name": "Catalogue Item B", "description": "This is Catalogue Item B"} + + response = test_client.patch(f"/v1/catalogue-items/{str(ObjectId())}", json=catalogue_item_patch) + + assert response.status_code == 404 + assert response.json()["detail"] == "A catalogue item with such ID was not found" + + +def test_partial_update_catalogue_item_has_children_items(): + """ + Test updating a catalogue item which has children items. + """ + # pylint: disable=fixme + # TODO - Implement this test when the relevant item logic is implemented. Extra test cases may be needed. + + +def test_partial_update_catalogue_item_change_catalogue_category_id(test_client): + """ + Test moving a catalogue item to another catalogue category. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + catalogue_category_a = response.json() + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_B) + catalogue_category_b = response.json() + + catalogue_item_post = get_catalogue_item_a_dict(catalogue_category_a["id"]) + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + catalogue_item_patch = { + "catalogue_category_id": catalogue_category_b["id"], + "properties": [{"name": "Property A", "value": True}], + } + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 200 + + catalogue_item = response.json() + + catalogue_item_patch["properties"][0]["unit"] = None + assert catalogue_item["catalogue_category_id"] == catalogue_item_patch["catalogue_category_id"] + assert catalogue_item["name"] == catalogue_item_post["name"] + assert catalogue_item["description"] == catalogue_item_post["description"] + assert catalogue_item["properties"] == catalogue_item_patch["properties"] + assert catalogue_item["manufacturer"] == catalogue_item_post["manufacturer"] + + +def test_partial_update_catalogue_item_change_catalogue_category_id_without_properties(test_client): + """ + Test moving a catalogue item to another catalogue category without supplying any catalogue item properties. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + catalogue_category_a = response.json() + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_B) + catalogue_category_b = response.json() + + catalogue_item_post = get_catalogue_item_a_dict(catalogue_category_a["id"]) + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + catalogue_item_patch = {"catalogue_category_id": catalogue_category_b["id"]} + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 422 + assert ( + response.json()["detail"] + == "Invalid value type for catalogue item property 'Property A'. Expected type: boolean." + ) + + +def test_partial_update_catalogue_item_change_catalogue_category_id_missing_mandatory_properties(test_client): + """ + Test moving a catalogue item to another catalogue category with missing mandatory catalogue item properties. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + catalogue_category_a = response.json() + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_B) + catalogue_category_b = response.json() + + catalogue_item_post = get_catalogue_item_b_dict(catalogue_category_b["id"]) + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + catalogue_item_patch = { + "catalogue_category_id": catalogue_category_a["id"], + "properties": [{"name": "Property A", "value": 20}], + } + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 422 + assert response.json()["detail"] == "Missing mandatory catalogue item property: 'Property B'" + + +def test_partial_update_catalogue_item_change_catalogue_category_id_missing_non_mandatory_properties(test_client): + """ + Test moving a catalogue item to another catalogue category with missing non-mandatory catalogue item properties. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + catalogue_category_a = response.json() + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_B) + catalogue_category_b = response.json() + + catalogue_item_post = get_catalogue_item_b_dict(catalogue_category_b["id"]) + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + catalogue_item_patch = { + "catalogue_category_id": catalogue_category_a["id"], + "properties": [ + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10"}, + ], + } + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + catalogue_item = response.json() + + catalogue_item_patch["properties"][0]["unit"] = None + catalogue_item_patch["properties"][1]["unit"] = "cm" + assert catalogue_item["catalogue_category_id"] == catalogue_item_patch["catalogue_category_id"] + assert catalogue_item["name"] == catalogue_item_post["name"] + assert catalogue_item["description"] == catalogue_item_post["description"] + assert catalogue_item["properties"] == catalogue_item_patch["properties"] + assert catalogue_item["manufacturer"] == catalogue_item_post["manufacturer"] + + +def test_partial_update_catalogue_item_change_catalogue_category_id_invalid_id(test_client): + """ + Test changing the catalogue category ID of a catalogue item to an invalid ID. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + + catalogue_item_post = get_catalogue_item_a_dict(response.json()["id"]) + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + catalogue_item_patch = { + "catalogue_category_id": "invalid", + "properties": [{"name": "Property A", "value": 20}], + } + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 422 + assert response.json()["detail"] == "The specified catalogue category ID does not exist" + + +def test_partial_update_catalogue_item_change_catalogue_category_id_nonexistent_id(test_client): + """ + Test changing the catalogue category ID of a catalogue item to a nonexistent ID. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + + catalogue_item_post = get_catalogue_item_a_dict(response.json()["id"]) + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + catalogue_item_patch = { + "catalogue_category_id": str(ObjectId()), + "properties": [{"name": "Property A", "value": 20}], + } + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 422 + assert response.json()["detail"] == "The specified catalogue category ID does not exist" + + +def test_partial_update_catalogue_item_change_catalogue_category_id_non_leaf_catalogue_category(test_client): + """ + Test moving a catalogue item to a non-leaf catalogue category. + """ + catalogue_category_post_a = {"name": "Category A", "is_leaf": False} + response = test_client.post("/v1/catalogue-categories", json=catalogue_category_post_a) + catalogue_category_a = response.json() + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_B) + catalogue_category_b = response.json() + + catalogue_item_post = get_catalogue_item_b_dict(catalogue_category_b["id"]) + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + catalogue_item_patch = {"catalogue_category_id": catalogue_category_a["id"]} + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 409 + assert response.json()["detail"] == "Adding a catalogue item to a non-leaf catalogue category is not allowed" + + +def test_partial_update_catalogue_item_change_catalogue_category_id_has_children_items(): + """ + Test moving a catalogue item with children items to another catalogue category. + """ + # pylint: disable=fixme + # TODO - Implement this test when the relevant item logic is implemented. + + +def test_partial_update_catalogue_item_add_non_mandatory_property(test_client): + """ + Test adding a non-mandatory catalogue item property and a value. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + catalogue_category = response.json() + + catalogue_category_id = catalogue_category["id"] + catalogue_item_post = get_catalogue_item_a_dict(catalogue_category_id) + catalogue_item_properties = catalogue_item_post["properties"].copy() + + # Delete the non-mandatory property so that the catalogue item is created without it + del catalogue_item_post["properties"][0] + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + catalogue_item_patch = {"properties": catalogue_item_properties} + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 200 + + catalogue_item = response.json() + + catalogue_item_patch["properties"][0]["unit"] = "mm" + catalogue_item_patch["properties"][1]["unit"] = None + catalogue_item_patch["properties"][2]["unit"] = "cm" + assert catalogue_item["catalogue_category_id"] == catalogue_category_id + assert catalogue_item["name"] == catalogue_item_post["name"] + assert catalogue_item["description"] == catalogue_item_post["description"] + assert catalogue_item["properties"] == catalogue_item_patch["properties"] + assert catalogue_item["manufacturer"] == catalogue_item_post["manufacturer"] + + +def test_partial_update_catalogue_item_remove_non_mandatory_property(test_client): + """ + Test removing a non-mandatory catalogue item property and its value.. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + catalogue_category = response.json() + + catalogue_category_id = catalogue_category["id"] + catalogue_item_post = get_catalogue_item_a_dict(catalogue_category_id) + + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + # Delete the non-mandatory property + del catalogue_item_post["properties"][0] + catalogue_item_patch = {"properties": catalogue_item_post["properties"]} + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 200 + + catalogue_item = response.json() + + catalogue_item_patch["properties"][0]["unit"] = None + catalogue_item_patch["properties"][1]["unit"] = "cm" + assert catalogue_item["catalogue_category_id"] == catalogue_category_id + assert catalogue_item["name"] == catalogue_item_post["name"] + assert catalogue_item["description"] == catalogue_item_post["description"] + assert catalogue_item["properties"] == catalogue_item_patch["properties"] + assert catalogue_item["manufacturer"] == catalogue_item_post["manufacturer"] + + +def test_partial_update_catalogue_item_remove_mandatory_property(test_client): + """ + Test removing a mandatory catalogue item property and its value. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + catalogue_category = response.json() + + catalogue_category_id = catalogue_category["id"] + catalogue_item_post = get_catalogue_item_a_dict(catalogue_category_id) + + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + del catalogue_item_post["properties"][1] + catalogue_item_patch = {"properties": catalogue_item_post["properties"]} + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 422 + assert response.json()["detail"] == "Missing mandatory catalogue item property: 'Property B'" + + +def test_partial_update_catalogue_item_change_value_for_string_property_invalid_type(test_client): + """ + Test changing the value of a string catalogue item property to an invalid type. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + catalogue_category = response.json() + + catalogue_category_id = catalogue_category["id"] + catalogue_item_post = get_catalogue_item_a_dict(catalogue_category_id) + + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + catalogue_item_post["properties"][2]["value"] = True + catalogue_item_patch = {"properties": catalogue_item_post["properties"]} + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 422 + assert ( + response.json()["detail"] + == "Invalid value type for catalogue item property 'Property C'. Expected type: string." + ) + + +def test_partial_update_catalogue_item_change_value_for_number_property_invalid_type(test_client): + """ + Test changing the value of a number catalogue item property to an invalid type. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + catalogue_category = response.json() + + catalogue_category_id = catalogue_category["id"] + catalogue_item_post = get_catalogue_item_a_dict(catalogue_category_id) + + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + catalogue_item_post["properties"][0]["value"] = "20" + catalogue_item_patch = {"properties": catalogue_item_post["properties"]} + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 422 + assert ( + response.json()["detail"] + == "Invalid value type for catalogue item property 'Property A'. Expected type: number." + ) + + +def test_partial_update_catalogue_item_change_value_for_boolean_property_invalid_type(test_client): + """ + Test changing the value of a boolean catalogue item property to an invalid type. + """ + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_A) + catalogue_category = response.json() + + catalogue_category_id = catalogue_category["id"] + catalogue_item_post = get_catalogue_item_a_dict(catalogue_category_id) + + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + catalogue_item_post["properties"][1]["value"] = "False" + catalogue_item_patch = {"properties": catalogue_item_post["properties"]} + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 422 + assert ( + response.json()["detail"] + == "Invalid value type for catalogue item property 'Property B'. Expected type: boolean." + ) + + +def test_partial_update_catalogue_item_change_manufacturer(test_client): + """ + Test updating the manufacturer details of a catalogue item. + """ + # pylint: disable=fixme + # TODO - Modify this test to use manufacturer ID when the relevant manufacturer logic is implemented + response = test_client.post("/v1/catalogue-categories", json=CATALOGUE_CATEGORY_POST_B) + catalogue_category = response.json() + + catalogue_category_id = catalogue_category["id"] + catalogue_item_post = get_catalogue_item_b_dict(catalogue_category_id) + + response = test_client.post("/v1/catalogue-items", json=catalogue_item_post) + + catalogue_item_patch = { + "manufacturer": { + "name": "Manufacturer B", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-b.co.uk", + } + } + response = test_client.patch(f"/v1/catalogue-items/{response.json()['id']}", json=catalogue_item_patch) + + assert response.status_code == 200 + catalogue_item = response.json() + + catalogue_item_post["properties"][0]["unit"] = None + assert catalogue_item["catalogue_category_id"] == catalogue_category_id + assert catalogue_item["name"] == catalogue_item_post["name"] + assert catalogue_item["description"] == catalogue_item_post["description"] + assert catalogue_item["properties"] == catalogue_item_post["properties"] + assert catalogue_item["manufacturer"] == catalogue_item_patch["manufacturer"] + + +def test_partial_update_catalogue_item_change_manufacturer_id_invalid_id(): + """ + Test changing the manufacturer ID of a catalogue item to an invalid ID. + """ + # pylint: disable=fixme + # TODO - Implement this test when the relevant manufacturer logic is implemented + + +def test_partial_update_catalogue_item_change_manufacturer_id_nonexistent_id(): + """ + Test changing the manufacturer ID of a catalogue item to a nonexistent ID. + """ + # pylint: disable=fixme + # TODO - Implement this test when the relevant manufacturer logic is implemented diff --git a/test/e2e/test_system.py b/test/e2e/test_system.py new file mode 100644 index 00000000..ff35204a --- /dev/null +++ b/test/e2e/test_system.py @@ -0,0 +1,319 @@ +""" +End-to-End tests for the System router +""" + +from unittest.mock import ANY + +from bson import ObjectId + + +SYSTEM_POST_A = { + "name": "System A", + "location": "Test location", + "owner": "Me", + "importance": "low", + "description": "System description", +} +SYSTEM_POST_A_EXPECTED = { + **SYSTEM_POST_A, + "id": ANY, + "code": "system-a", + "path": "/system-a", + "parent_path": "/", + "parent_id": None, +} + +# To be posted as a child of the above +SYSTEM_POST_B = { + "name": "System B", + "location": "Test location", + "owner": "Me", + "importance": "low", + "description": "System description", +} +SYSTEM_POST_B_EXPECTED = { + **SYSTEM_POST_B, + "id": ANY, + "code": "system-b", + "path": "/system-a/system-b", + "parent_path": "/system-a", +} + +SYSTEM_POST_C = { + "name": "System C", + "location": "Test location", + "owner": "Me", + "importance": "low", + "description": "System description", +} +SYSTEM_POST_C_EXPECTED = { + **SYSTEM_POST_C, + "id": ANY, + "code": "system-c", + "path": "/system-c", + "parent_path": "/", + "parent_id": None, +} + + +def _post_systems(test_client): + """Utility function for posting all mock systems""" + + # Parent + response = test_client.post("/v1/systems", json=SYSTEM_POST_A) + system_a = response.json() + + # Child + response = test_client.post("/v1/systems", json={**SYSTEM_POST_B, "parent_id": system_a["id"]}) + system_b = response.json() + + response = test_client.post("/v1/systems", json=SYSTEM_POST_C) + system_c = response.json() + + return system_a, system_b, system_c + + +def test_create_system(test_client): + """ + Test creating a System + """ + response = test_client.post("/v1/systems", json=SYSTEM_POST_A) + + assert response.status_code == 201 + + system = response.json() + + assert system == SYSTEM_POST_A_EXPECTED + + +def test_create_system_with_valid_parent_id(test_client): + """ + Test creating a System with a valid parent ID + """ + # Parent + response = test_client.post("/v1/systems", json=SYSTEM_POST_A) + parent_system = response.json() + + # Child + response = test_client.post("/v1/systems", json={**SYSTEM_POST_B, "parent_id": parent_system["id"]}) + assert response.status_code == 201 + system = response.json() + assert system == {**SYSTEM_POST_B_EXPECTED, "parent_id": parent_system["id"]} + + +def test_create_system_with_duplicate_name_within_parent(test_client): + """ + Test creating a System with a duplicate name within the parent System + """ + # Parent + response = test_client.post("/v1/systems", json=SYSTEM_POST_A) + parent_system = response.json() + + # Child - post twice as will have the same name + response = test_client.post("/v1/systems", json={**SYSTEM_POST_B, "parent_id": parent_system["id"]}) + response = test_client.post("/v1/systems", json={**SYSTEM_POST_B, "parent_id": parent_system["id"]}) + + assert response.status_code == 409 + assert response.json()["detail"] == "A System with the same name already exists within the same parent System" + + +def test_create_system_with_invalid_parent_id(test_client): + """ + Test creating a System with an invalid parent ID + """ + response = test_client.post("/v1/systems", json={**SYSTEM_POST_A, "parent_id": "invalid"}) + + assert response.status_code == 422 + assert response.json()["detail"] == "The specified parent System ID does not exist" + + +def test_create_system_with_non_existent_parent_id(test_client): + """ + Test creating a System with a non-existent parent ID + """ + response = test_client.post("/v1/systems", json={**SYSTEM_POST_A, "parent_id": str(ObjectId())}) + + assert response.status_code == 422 + assert response.json()["detail"] == "The specified parent System ID does not exist" + + +def test_create_system_with_invalid_importance(test_client): + """ + Test creating a System with an invalid importance + """ + response = test_client.post("/v1/systems", json={**SYSTEM_POST_A, "importance": "invalid"}) + + assert response.status_code == 422 + assert ( + response.json()["detail"][0]["msg"] + == "value is not a valid enumeration member; permitted: 'low', 'medium', 'high'" + ) + + +def test_delete_system(test_client): + """ + Test deleting a System + """ + + # Create one to delete + response = test_client.post("/v1/systems", json=SYSTEM_POST_A) + system = response.json() + + # Delete + response = test_client.delete(f"/v1/systems/{system['id']}") + + assert response.status_code == 204 + response = test_client.get(f"/v1/systems/{system['id']}") + assert response.status_code == 404 + + +def test_delete_system_with_invalid_id(test_client): + """ + Test deleting a System with an invalid ID + """ + + # Delete + response = test_client.delete("/v1/systems/invalid") + + assert response.status_code == 404 + assert response.json()["detail"] == "System with such ID was not found" + + +def test_delete_system_with_non_existent_id(test_client): + """ + Test deleting a System with a non existent ID + """ + + # Delete + response = test_client.delete(f"/v1/systems/{str(ObjectId())}") + + assert response.status_code == 404 + assert response.json()["detail"] == "System with such ID was not found" + + +def test_get_system(test_client): + """ + Test getting a System + """ + + # Post one first + response = test_client.post("/v1/systems", json=SYSTEM_POST_A) + system = response.json() + system_id = system["id"] + + # Ensure can get it again + response = test_client.get(f"/v1/systems/{system_id}") + + assert response.status_code == 200 + assert response.json() == SYSTEM_POST_A_EXPECTED + + +def test_delete_system_with_child_system(test_client): + """ + Test deleting a System + """ + + # Create one to delete + # Parent + response = test_client.post("/v1/systems", json=SYSTEM_POST_A) + parent_system = response.json() + + # Child + response = test_client.post("/v1/systems", json={**SYSTEM_POST_B, "parent_id": parent_system["id"]}) + + # Delete + response = test_client.delete(f"/v1/systems/{parent_system['id']}") + + assert response.status_code == 409 + assert response.json()["detail"] == "System has child elements and cannot be deleted" + + +def test_get_system_with_invalid_id(test_client): + """ + Test getting a System with an invalid ID + """ + response = test_client.get("/v1/systems/invalid") + + assert response.status_code == 404 + assert response.json()["detail"] == "A System with such ID was not found" + + +def test_get_system_with_non_existent_id(test_client): + """ + Test getting a System with a non-existent ID + """ + response = test_client.get(f"/v1/systems/{str(ObjectId())}") + + assert response.status_code == 404 + assert response.json()["detail"] == "A System with such ID was not found" + + +def test_get_systems(test_client): + """ + Test getting a list of Systems + """ + + system_a, system_b, system_c = _post_systems(test_client) + + # Get all systems (no filters) + response = test_client.get("/v1/systems") + + assert response.status_code == 200 + assert response.json() == [system_a, system_b, system_c] + + +def test_get_systems_with_path_filter(test_client): + """ + Test getting a list of Systems with a path filter + """ + + _, _, system_c = _post_systems(test_client) + + # Get only those with the given path + response = test_client.get("/v1/systems", params={"path": "/system-c"}) + + assert response.status_code == 200 + assert response.json() == [system_c] + + +def test_get_systems_with_parent_path_filter(test_client): + """ + Test getting a list of Systems with a parent path filter + """ + + _, system_b, _ = _post_systems(test_client) + + # Get only those with the given parent path + response = test_client.get("/v1/systems", params={"parent_path": "/system-a"}) + + assert response.status_code == 200 + assert response.json() == [system_b] + + +def test_get_systems_with_path_and_parent_path_filter(test_client): + """ + Test getting a list of Systems with a path and parent path filter + """ + + _, system_b, _ = _post_systems(test_client) + + # Get only those with the given path and parent path + response = test_client.get("/v1/systems", params={"path": "/system-a/system-b", "parent_path": "/system-a"}) + + assert response.status_code == 200 + assert response.json() == [system_b] + + +def test_get_systems_with_path_and_parent_path_filters_no_matching_results(test_client): + """ + Test getting a list of Systems with a path and parent path filter when there is no + matching results in the database + """ + + _, _, _ = _post_systems(test_client) + + # Get only those with the given path and parent path + response = test_client.get("/v1/systems", params={"path": "/", "parent_path": "/system-b"}) + + assert response.status_code == 200 + assert response.json() == [] diff --git a/test/unit/repositories/conftest.py b/test/unit/repositories/conftest.py index a67f76fb..fd7dcbf1 100644 --- a/test/unit/repositories/conftest.py +++ b/test/unit/repositories/conftest.py @@ -14,6 +14,7 @@ from inventory_management_system_api.repositories.catalogue_category import CatalogueCategoryRepo from inventory_management_system_api.repositories.catalogue_item import CatalogueItemRepo from inventory_management_system_api.repositories.manufacturer import ManufacturerRepo +from inventory_management_system_api.repositories.system import SystemRepo @pytest.fixture(name="database_mock") @@ -28,6 +29,7 @@ def fixture_database_mock() -> Mock: database_mock.catalogue_categories = Mock(Collection) database_mock.catalogue_items = Mock(Collection) database_mock.manufacturer = Mock(Collection) + database_mock.systems = Mock(Collection) return database_mock @@ -59,6 +61,15 @@ def fixture_manufacturer_repository(database_mock: Mock) -> ManufacturerRepo: Fixture to create ManufacturerRepo instance """ return ManufacturerRepo(database_mock) +@pytest.fixture(name="system_repository") +def fixture_system_repository(database_mock: Mock) -> SystemRepo: + """ + Fixture to create a `SystemRepo` instance with a mocked Database dependency. + + :param database_mock: Mocked MongoDB database instance. + :return: `SystemRepo` instance with the mocked dependency. + """ + return SystemRepo(database_mock) class RepositoryTestHelpers: diff --git a/test/unit/repositories/test_catalogue_category.py b/test/unit/repositories/test_catalogue_category.py index e5ad4444..ec1b78b6 100644 --- a/test/unit/repositories/test_catalogue_category.py +++ b/test/unit/repositories/test_catalogue_category.py @@ -2,7 +2,7 @@ """ Unit tests for the `CatalogueCategoryRepo` repository. """ -from unittest.mock import call +from unittest.mock import call, MagicMock import pytest from bson import ObjectId @@ -352,6 +352,7 @@ def test_create_with_nonexistent_parent_id(test_helpers, database_mock, catalogu catalogue_item_properties=[], ) ) + database_mock.catalogue_categories.insert_one.assert_not_called() assert str(exc.value) == f"No parent catalogue category found with ID: {catalogue_category.parent_id}" @@ -911,19 +912,11 @@ def test_update_with_invalid_id(catalogue_category_repository): """ Test updating a catalogue category with invalid ID. - Verify that the `update` method properly handles the update of a catalogue category with a nonexistent ID. + Verify that the `update` method properly handles the update of a catalogue category with an invalid ID. """ - update_catalogue_category = CatalogueCategoryIn( - name="Category B", - code="category-b", - is_leaf=False, - path="/category-b", - parent_path="/", - parent_id=None, - catalogue_item_properties=[], - ) - + update_catalogue_category = MagicMock() catalogue_category_id = "invalid" + with pytest.raises(InvalidObjectIdError) as exc: catalogue_category_repository.update(catalogue_category_id, update_catalogue_category) assert str(exc.value) == f"Invalid ObjectId value '{catalogue_category_id}'" diff --git a/test/unit/repositories/test_catalogue_item.py b/test/unit/repositories/test_catalogue_item.py index 91b74f86..1432ab9f 100644 --- a/test/unit/repositories/test_catalogue_item.py +++ b/test/unit/repositories/test_catalogue_item.py @@ -1,11 +1,13 @@ """ Unit tests for the `CatalogueItemRepo` repository. """ +from unittest.mock import MagicMock + import pytest from bson import ObjectId from inventory_management_system_api.core.custom_object_id import CustomObjectId -from inventory_management_system_api.core.exceptions import DuplicateRecordError, InvalidObjectIdError +from inventory_management_system_api.core.exceptions import InvalidObjectIdError, MissingRecordError from inventory_management_system_api.models.catalogue_item import ( CatalogueItemOut, Property, @@ -79,47 +81,64 @@ def test_create(test_helpers, database_mock, catalogue_item_repository): assert created_catalogue_item == catalogue_item -def test_create_with_duplicate_name_within_catalogue_category(test_helpers, database_mock, catalogue_item_repository): +def test_delete(test_helpers, database_mock, catalogue_item_repository): """ - Test creating a catalogue item with a duplicate name within the catalogue category. + Test deleting a catalogue item. - Verify that the `create` method properly handles a catalogue item with a duplicate name, finds that there is a - duplicate catalogue item, and does not create the catalogue item. + Verify that the `delete` method properly handles the deletion of a catalogue item by ID. """ - # pylint: disable=duplicate-code - catalogue_item = CatalogueItemOut( - id=str(ObjectId()), - catalogue_category_id=str(ObjectId()), - name="Catalogue Item A", - description="This is Catalogue Item A", - properties=[ - Property(name="Property A", value=20, unit="mm"), - Property(name="Property B", value=False), - Property(name="Property C", value="20x15x10", unit="cm"), - ], - manufacturer=Manufacturer( - name="Manufacturer A", address="1 Address, City, Country, Postcode", web_url="www.manufacturer-a.co.uk" - ), - ) - # pylint: enable=duplicate-code + catalogue_item_id = str(ObjectId()) - # Mock `count_documents` to return 1 (duplicate catalogue item found within the catalogue category) - test_helpers.mock_count_documents(database_mock.catalogue_items, 1) - - with pytest.raises(DuplicateRecordError) as exc: - catalogue_item_repository.create( - CatalogueItemIn( - catalogue_category_id=catalogue_item.catalogue_category_id, - name=catalogue_item.name, - description=catalogue_item.description, - properties=catalogue_item.properties, - manufacturer=catalogue_item.manufacturer, - ) - ) - assert str(exc.value) == "Duplicate catalogue item found within the catalogue category" - database_mock.catalogue_items.count_documents.assert_called_once_with( - {"catalogue_category_id": CustomObjectId(catalogue_item.catalogue_category_id), "name": catalogue_item.name} - ) + # Mock `delete_one` to return that one document has been deleted + test_helpers.mock_delete_one(database_mock.catalogue_items, 1) + + # pylint: disable=fixme + # TODO - (when the relevant item logic is implemented) mock it so that no children items are returned + + catalogue_item_repository.delete(catalogue_item_id) + + database_mock.catalogue_items.delete_one.assert_called_once_with({"_id": CustomObjectId(catalogue_item_id)}) + + +def test_delete_with_children_items(): + """ + Test deleting a catalogue item with children items. + + Verify that the `delete` method properly handles the deletion of a catalogue item with children items. + """ + # pylint: disable=fixme + # TODO - Implement this test when the relevant item logic is implemented + + +def test_delete_with_invalid_id(catalogue_item_repository): + """ + Test deleting a catalogue item with an invalid ID. + + Verify that the `delete` method properly handles the deletion of a catalogue item with an invalid ID. + """ + with pytest.raises(InvalidObjectIdError) as exc: + catalogue_item_repository.delete("invalid") + assert str(exc.value) == "Invalid ObjectId value 'invalid'" + + +def test_delete_with_nonexistent_id(test_helpers, database_mock, catalogue_item_repository): + """ + Test deleting a catalogue item with a nonexistent ID. + + Verify that the `delete` method properly handles the deletion of a catalogue item with a nonexistent ID. + """ + catalogue_item_id = str(ObjectId()) + + # Mock `delete_one` to return that no document has been deleted + test_helpers.mock_delete_one(database_mock.catalogue_items, 0) + + # pylint: disable=fixme + # TODO - (when the relevant item logic is implemented) mock it so that no children items are returned + + with pytest.raises(MissingRecordError) as exc: + catalogue_item_repository.delete(catalogue_item_id) + assert str(exc.value) == f"No catalogue item found with ID: {catalogue_item_id}" + database_mock.catalogue_items.delete_one.assert_called_once_with({"_id": CustomObjectId(catalogue_item_id)}) def test_get(test_helpers, database_mock, catalogue_item_repository): @@ -332,3 +351,79 @@ def test_list_with_invalid_catalogue_category_id_filter(catalogue_item_repositor with pytest.raises(InvalidObjectIdError) as exc: catalogue_item_repository.list("invalid") assert str(exc.value) == "Invalid ObjectId value 'invalid'" + + +def test_update(test_helpers, database_mock, catalogue_item_repository): + """ + Test updating a catalogue item. + + Verify that the `update` method properly handles the catalogue item to be updated. + """ + # pylint: disable=duplicate-code + catalogue_item_info = { + "name": "Catalogue Item B", + "description": "This is Catalogue Item B", + "properties": [ + {"name": "Property A", "value": 20, "unit": "mm"}, + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10", "unit": "cm"}, + ], + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + } + # pylint: enable=duplicate-code + catalogue_item = CatalogueItemOut(id=str(ObjectId()), catalogue_category_id=str(ObjectId()), **catalogue_item_info) + + # Mock `update_one` to return an object for the updated catalogue item document + test_helpers.mock_update_one(database_mock.catalogue_items) + # Mock `find_one` to return the updated catalogue item document + test_helpers.mock_find_one( + database_mock.catalogue_items, + { + "_id": CustomObjectId(catalogue_item.id), + "catalogue_category_id": CustomObjectId(catalogue_item.catalogue_category_id), + **catalogue_item_info, + }, + ) + + catalogue_item_in = CatalogueItemIn( + catalogue_category_id=catalogue_item.catalogue_category_id, **catalogue_item_info + ) + updated_catalogue_item = catalogue_item_repository.update(catalogue_item.id, catalogue_item_in) + + database_mock.catalogue_items.update_one.assert_called_once_with( + {"_id": CustomObjectId(catalogue_item.id)}, + { + "$set": { + "catalogue_category_id": CustomObjectId(catalogue_item.catalogue_category_id), + **catalogue_item_in.dict(), + } + }, + ) + database_mock.catalogue_items.find_one.assert_called_once_with({"_id": CustomObjectId(catalogue_item.id)}) + assert updated_catalogue_item == catalogue_item + + +def test_update_with_invalid_id(catalogue_item_repository): + """ + Test updating a catalogue category with invalid ID. + + Verify that the `update` method properly handles the update of a catalogue category with an invalid ID. + """ + update_catalogue_item = MagicMock() + catalogue_item_id = "invalid" + + with pytest.raises(InvalidObjectIdError) as exc: + catalogue_item_repository.update(catalogue_item_id, update_catalogue_item) + assert str(exc.value) == f"Invalid ObjectId value '{catalogue_item_id}'" + + +def test_update_has_child_items(): + """ + Test updating a catalogue item with child items. + """ + # pylint: disable=fixme + # TODO - Implement this test when the relevant item logic is implemented. diff --git a/test/unit/repositories/test_system.py b/test/unit/repositories/test_system.py new file mode 100644 index 00000000..c9eb91fc --- /dev/null +++ b/test/unit/repositories/test_system.py @@ -0,0 +1,442 @@ +""" +Unit tests for the `SystemRepo` repository +""" + + +from typing import Optional +from unittest.mock import call + +import pytest +from bson import ObjectId + +from inventory_management_system_api.core.custom_object_id import CustomObjectId +from inventory_management_system_api.core.exceptions import ( + ChildrenElementsExistError, + DuplicateRecordError, + InvalidObjectIdError, + MissingRecordError, +) +from inventory_management_system_api.models.system import SystemIn, SystemOut + + +def _test_list(test_helpers, database_mock, system_repository, path: Optional[str], parent_path: Optional[str]): + """ + Utility method that tests getting Systems + + Verifies that the `list` method properly handles the retrieval of systems with the given filters + """ + # pylint: disable=duplicate-code + system_a_info = { + "name": "Test name a", + "location": "Test location", + "owner": "Test owner", + "importance": "low", + "description": "Test description", + "code": "test-name", + "path": "/test-name-a", + "parent_path": "/", + "parent_id": str(ObjectId()), + } + system_a = SystemOut(id=str(ObjectId()), **system_a_info) + system_b_info = { + "name": "Test name b", + "location": "Test location", + "owner": "Test owner", + "importance": "low", + "description": "Test description", + "code": "test-name", + "path": "/test-name-b", + "parent_path": "/", + "parent_id": str(ObjectId()), + } + system_b = SystemOut(id=str(ObjectId()), **system_b_info) + # pylint: enable=duplicate-code + + # Mock `find` to return a list of System documents + test_helpers.mock_find( + database_mock.systems, + [{"_id": CustomObjectId(system_a.id), **system_a_info}, {"_id": CustomObjectId(system_b.id), **system_b_info}], + ) + + retrieved_systems = system_repository.list(path, parent_path) + + expected_filters = {} + if path: + expected_filters["path"] = path + if parent_path: + expected_filters["parent_path"] = parent_path + + database_mock.systems.find.assert_called_once_with(expected_filters) + assert retrieved_systems == [system_a, system_b] + + +def test_create(test_helpers, database_mock, system_repository): + """ + Test creating a System + + Verify that the `create` method properly handles the System to be created, checks that there is not + a duplicate System, and creates the System + """ + # pylint: disable=duplicate-code + system_info = { + "name": "Test name", + "location": "Test location", + "owner": "Test owner", + "importance": "low", + "description": "Test description", + "code": "test-name", + "path": "/test-name", + "parent_path": "/", + "parent_id": None, + } + system = SystemOut(id=str(ObjectId()), **system_info) + # pylint: enable=duplicate-code + + # Mock `count_documents` to return 0 (no duplicate system found within the parent system) + test_helpers.mock_count_documents(database_mock.systems, 0) + # Mock `insert_one` to return an object for the inserted system document + test_helpers.mock_insert_one(database_mock.systems, CustomObjectId(system.id)) + # Mock `find_one` to return the inserted system document + test_helpers.mock_find_one( + database_mock.systems, + {"_id": CustomObjectId(system.id), **system_info}, + ) + + created_system = system_repository.create(SystemIn(**system_info)) + + database_mock.systems.insert_one.assert_called_once_with( + {**system_info}, + ) + assert created_system == system + + +def test_create_with_parent_id(test_helpers, database_mock, system_repository): + """ + Test creating a System with a parent ID + + Verify that the `create` method properly handles the creation of a System with a parent ID + """ + # pylint: disable=duplicate-code + system_info = { + "name": "Test name b", + "location": "Test location", + "owner": "Test owner", + "importance": "low", + "description": "Test description", + "code": "test-name", + "path": "/test-name-a/test-name-b", + "parent_path": "/test-name-a", + "parent_id": str(ObjectId()), + } + system = SystemOut(id=str(ObjectId()), **system_info) + + # Mock `find_one` to return the parent system document + test_helpers.mock_find_one( + database_mock.systems, + { + "_id": CustomObjectId(system.parent_id), + "name": "Test name a", + "location": "Test location", + "owner": "Test owner", + "importance": "low", + "description": "Test description", + "code": "test-name", + "path": "/test-name-a", + "parent_path": "/", + "parent_id": None, + }, + ) + # pylint: enable=duplicate-code + # Mock `count_documents` to return 0 (no duplicate system found within the parent system) + test_helpers.mock_count_documents(database_mock.systems, 0) + # Mock `insert_one` to return an object for the inserted system document + test_helpers.mock_insert_one(database_mock.systems, CustomObjectId(system.id)) + # Mock `find_one` to return the inserted system document + test_helpers.mock_find_one( + database_mock.systems, + {"_id": CustomObjectId(system.id), **system_info}, + ) + + created_system = system_repository.create(SystemIn(**system_info)) + + database_mock.systems.insert_one.assert_called_once_with( + {**system_info, "parent_id": CustomObjectId(system.parent_id)}, + ) + database_mock.systems.find_one.assert_has_calls( + [call({"_id": CustomObjectId(system.parent_id)}), call({"_id": CustomObjectId(system.id)})] + ) + assert created_system == system + + +def test_create_with_non_existent_parent_id(test_helpers, database_mock, system_repository): + """ + Test creating a System with a non-existent parent ID + + Verify that the `create` method properly handles a System with a non-existent parent ID + and does not create it + """ + # pylint: disable=duplicate-code + system_info = { + "name": "Test name b", + "location": "Test location", + "owner": "Test owner", + "importance": "low", + "description": "Test description", + "code": "test-name", + "path": "/test-name-a/test-name-b", + "parent_path": "/test-name-a", + "parent_id": str(ObjectId()), + } + system = SystemOut(id=str(ObjectId()), **system_info) + # pylint: enable=duplicate-code + + # Mock `find_one` to not return a parent system document + test_helpers.mock_find_one(database_mock.systems, None) + + with pytest.raises(MissingRecordError) as exc: + system_repository.create(SystemIn(**system_info)) + + database_mock.systems.insert_one.assert_not_called() + assert str(exc.value) == f"No parent System found with ID: {system.parent_id}" + + +def test_create_with_duplicate_name_within_parent(test_helpers, database_mock, system_repository): + """ + Test creating a System with a duplicate name within the parent System + + Verify that the `create` method properly handles a System with a duplicate name + and does not create it + """ + # pylint: disable=duplicate-code + system_info = { + "name": "Test name b", + "location": "Test location", + "owner": "Test owner", + "importance": "low", + "description": "Test description", + "code": "test-name", + "path": "/test-name-a/test-name-b", + "parent_path": "/test-name-a", + "parent_id": str(ObjectId()), + } + system = SystemOut(id=str(ObjectId()), **system_info) + # pylint: enable=duplicate-code + + # Mock `find_one` to return the parent system document + test_helpers.mock_find_one( + database_mock.systems, + { + "_id": CustomObjectId(system.parent_id), + "name": "Test name a", + "location": "Test location", + "owner": "Test owner", + "importance": "low", + "description": "Test description", + "code": "test-name", + "path": "/test-name-a", + "parent_path": "/", + "parent_id": None, + }, + ) + # Mock `count_documents` to return 1 (duplicate system found within the parent system) + test_helpers.mock_count_documents(database_mock.systems, 1) + + with pytest.raises(DuplicateRecordError) as exc: + system_repository.create(SystemIn(**system_info)) + + database_mock.systems.insert_one.assert_not_called() + assert str(exc.value) == "Duplicate System found within the parent System" + + +def test_get(test_helpers, database_mock, system_repository): + """ + Test getting a System + + Verify that the `get` method properly handles the retrieval of a System by ID + """ + # pylint: disable=duplicate-code + system_info = { + "name": "Test name a", + "location": "Test location", + "owner": "Test owner", + "importance": "low", + "description": "Test description", + "code": "test-name", + "path": "/test-name-a", + "parent_path": "/", + "parent_id": str(ObjectId()), + } + system = SystemOut(id=str(ObjectId()), **system_info) + # pylint: enable=duplicate-code + + # Mock `find_one` to return a system + test_helpers.mock_find_one( + database_mock.systems, + {"_id": CustomObjectId(system.id), **system_info}, + ) + + retrieved_system = system_repository.get(system.id) + + database_mock.systems.find_one.assert_called_with({"_id": CustomObjectId(system.id)}) + assert retrieved_system == system + + +def test_get_with_invalid_id(database_mock, system_repository): + """ + Test getting a System with an invalid ID + + Verify that the `get` method properly handles the retrieval of a System with an invalid ID + """ + + with pytest.raises(InvalidObjectIdError) as exc: + system_repository.get("invalid") + database_mock.systems.find_one.assert_not_called() + assert str(exc.value) == "Invalid ObjectId value 'invalid'" + + +def test_get_with_non_existent_id(test_helpers, database_mock, system_repository): + """ + Test getting a System with a non-existent ID + + Verify that the `get` method properly handles the retrieval of a System with a non-existent ID + """ + system_id = str(ObjectId()) + + # Mock `find_one` to not return a system document + test_helpers.mock_find_one(database_mock.systems, None) + + retrieved_system = system_repository.get(system_id) + + database_mock.systems.find_one.assert_called_with({"_id": CustomObjectId(system_id)}) + assert retrieved_system is None + + +def test_list(test_helpers, database_mock, system_repository): + """ + Test getting Systems + + Verify that the `list` method properly handles the retrieval of systems without filters + """ + _test_list(test_helpers, database_mock, system_repository, None, None) + + +def test_list_with_path_filter(test_helpers, database_mock, system_repository): + """ + Test getting Systems based on the provided path filter + + Verify that the `list` method properly handles the retrieval of systems based on the provided + path filter + """ + _test_list(test_helpers, database_mock, system_repository, "/test-name-a", None) + + +def test_list_with_parent_path_filter(test_helpers, database_mock, system_repository): + """ + Test getting Systems based on the provided parent path filter + + Verify that the `list` method properly handles the retrieval of systems based on the provided parent + path filter + """ + _test_list(test_helpers, database_mock, system_repository, None, "/") + + +def test_list_with_path_and_parent_path_filter(test_helpers, database_mock, system_repository): + """ + Test getting Systems based on the provided path and parent path filters + + Verify that the `list` method properly handles the retrieval of systems based on the provided path + and parent path filters + """ + _test_list(test_helpers, database_mock, system_repository, "/test-name-a", "/") + + +def test_list_with_path_and_parent_path_filters_no_matching_results(test_helpers, database_mock, system_repository): + """ + Test getting Systems based on the provided path and parent path filters when there are no matching results + int he database + + Verify that the `list` method properly handles the retrieval of systems based on the provided path + and parent path filters when there are no matching results in the database + """ + # Mock `find` to return a list of System documents + test_helpers.mock_find(database_mock.systems, []) + + retrieved_systems = system_repository.list("/test-name-a", "/") + + database_mock.systems.find.assert_called_once_with({"path": "/test-name-a", "parent_path": "/"}) + assert retrieved_systems == [] + + +def test_delete(test_helpers, database_mock, system_repository): + """ + Test deleting a System + + Verify that the `delete` method properly handles the deletion of a System by its ID + """ + system_id = str(ObjectId()) + + # Mock `delete_one` to return that one document has been deleted + test_helpers.mock_delete_one(database_mock.systems, 1) + + # Mock count_documents to return 0 (children elements not found) + test_helpers.mock_count_documents(database_mock.systems, 0) + + system_repository.delete(system_id) + + database_mock.systems.delete_one.assert_called_once_with({"_id": CustomObjectId(system_id)}) + + +def test_delete_with_child_systems(test_helpers, database_mock, system_repository): + """ + Test deleting a System with child Systems + + Verify that the `delete` method properly handles the deletion of a System with child Systems + """ + system_id = str(ObjectId()) + + # Mock `delete_one` to return that one document has been deleted + test_helpers.mock_delete_one(database_mock.systems, 1) + + # Mock count_documents to return 1 (children elements found) + test_helpers.mock_count_documents(database_mock.systems, 1) + + with pytest.raises(ChildrenElementsExistError) as exc: + system_repository.delete(system_id) + + database_mock.systems.delete_one.assert_not_called() + assert str(exc.value) == f"System with ID {system_id} has child elements and cannot be deleted" + + +def test_delete_with_invalid_id(database_mock, system_repository): + """ + Test deleting a System with an invalid ID + + Verify that the `delete` method properly handles the deletion of a System with an invalid ID + """ + + with pytest.raises(InvalidObjectIdError) as exc: + system_repository.delete("invalid") + + database_mock.systems.delete_one.assert_not_called() + assert str(exc.value) == "Invalid ObjectId value 'invalid'" + + +def test_delete_with_non_existent_id(test_helpers, database_mock, system_repository): + """ + Test deleting a System with a non-existent ID + + Verify that the `delete` method properly handles the deletion of a System with a non-existant ID + """ + system_id = str(ObjectId()) + + # Mock `delete_one` to return that no document has been deleted + test_helpers.mock_delete_one(database_mock.systems, 0) + + # Mock count_documents to return 0 (children elements not found) + test_helpers.mock_count_documents(database_mock.systems, 0) + + with pytest.raises(MissingRecordError) as exc: + system_repository.delete(system_id) + assert str(exc.value) == f"No System found with ID: {system_id}" + + database_mock.systems.delete_one.assert_called_once_with({"_id": CustomObjectId(system_id)}) diff --git a/test/unit/services/conftest.py b/test/unit/services/conftest.py index c573a2e7..6be32022 100644 --- a/test/unit/services/conftest.py +++ b/test/unit/services/conftest.py @@ -1,7 +1,7 @@ """ Module for providing common test configuration and test fixtures. """ -from typing import Union, List, Type +from typing import List, Type, Union from unittest.mock import Mock import pytest @@ -11,8 +11,10 @@ from inventory_management_system_api.repositories.catalogue_category import CatalogueCategoryRepo from inventory_management_system_api.repositories.manufacturer import ManufacturerRepo from inventory_management_system_api.repositories.catalogue_item import CatalogueItemRepo +from inventory_management_system_api.repositories.system import SystemRepo from inventory_management_system_api.services.catalogue_category import CatalogueCategoryService from inventory_management_system_api.services.catalogue_item import CatalogueItemService +from inventory_management_system_api.services.system import SystemService @pytest.fixture(name="catalogue_category_repository_mock") @@ -44,6 +46,15 @@ def fixture_manufacturer_repository_mock() -> Mock: """ return Mock(ManufacturerRepo) +@pytest.fixture(name="system_repository_mock") +def fixture_system_repository_mock() -> Mock: + """ + Fixture to create a mock of the `SystemRepo` dependency. + + :return: Mocked SystemRepo instance. + """ + return Mock(SystemRepo) + @pytest.fixture(name="catalogue_category_service") def fixture_catalogue_category_service(catalogue_category_repository_mock: Mock) -> CatalogueCategoryService: @@ -71,6 +82,18 @@ def fixture_catalogue_item_service( return CatalogueItemService(catalogue_item_repository_mock, catalogue_category_repository_mock) +@pytest.fixture(name="system_service") +def fixture_system_service(system_repository_mock: Mock) -> SystemService: + """ + Fixture to create a `SystemService` instance with a mocked `SystemRepo` + dependencies. + + :param system_repository_mock: Mocked `SystemRepo` instance + :return: `SystemService` instance with the mocked dependency + """ + return SystemService(system_repository_mock) + + class ServiceTestHelpers: """ A utility class containing common helper methods for the service tests. diff --git a/test/unit/services/test_catalogue_category.py b/test/unit/services/test_catalogue_category.py index 6b0c123c..32eb23a6 100644 --- a/test/unit/services/test_catalogue_category.py +++ b/test/unit/services/test_catalogue_category.py @@ -642,6 +642,7 @@ def test_update_change_from_leaf_to_non_leaf( """ Test changing a catalogue category from leaf to non-leaf. """ + # pylint: disable=duplicate-code catalogue_category = CatalogueCategoryOut( id=str(ObjectId()), name="Category A", @@ -652,6 +653,7 @@ def test_update_change_from_leaf_to_non_leaf( parent_id=None, catalogue_item_properties=[], ) + # pylint: enable=duplicate-code # Mock `get` to return a catalogue category test_helpers.mock_get( diff --git a/test/unit/services/test_catalogue_item.py b/test/unit/services/test_catalogue_item.py index 55c4d85d..23348035 100644 --- a/test/unit/services/test_catalogue_item.py +++ b/test/unit/services/test_catalogue_item.py @@ -1,3 +1,4 @@ +# pylint: disable=too-many-lines """ Unit tests for the `CatalogueCategoryService` service. """ @@ -22,6 +23,7 @@ PropertyPostRequestSchema, CatalogueItemPostRequestSchema, ManufacturerSchema, + CatalogueItemPatchRequestSchema, ) @@ -478,6 +480,19 @@ def test_create_with_with_invalid_value_type_for_boolean_property( catalogue_category_repository_mock.get.assert_called_once_with(catalogue_category.id) +def test_delete(catalogue_item_repository_mock, catalogue_item_service): + """ + Test deleting a catalogue item. + + Verify that the `delete` method properly handles the deletion of a catalogue item by ID. + """ + catalogue_item_id = str(ObjectId) + + catalogue_item_service.delete(catalogue_item_id) + + catalogue_item_repository_mock.delete.assert_called_once_with(catalogue_item_id) + + def test_get(test_helpers, catalogue_item_repository_mock, catalogue_item_service): """ Test getting a catalogue item. @@ -629,3 +644,848 @@ def test_list_with_catalogue_category_id_filter_no_matching_results( catalogue_item_repository_mock.list.assert_called_once_with(catalogue_category_id) assert retrieved_catalogue_items == [] + + +def test_update(test_helpers, catalogue_item_repository_mock, catalogue_item_service): + """ + Test updating a catalogue item. + + Verify that the `update` method properly handles the catalogue item to be updated. + """ + # pylint: disable=duplicate-code + catalogue_item_info = { + "id": str(ObjectId()), + "catalogue_category_id": str(ObjectId()), + "properties": [ + {"name": "Property A", "value": 20, "unit": "mm"}, + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10", "unit": "cm"}, + ], + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + } + # pylint: enable=duplicate-code + full_catalogue_item_info = { + **catalogue_item_info, + "name": "Catalogue Item B", + "description": "This is Catalogue Item B", + } + catalogue_item = CatalogueItemOut(**full_catalogue_item_info) + + # Mock `get` to return a catalogue item + test_helpers.mock_get( + catalogue_item_repository_mock, + CatalogueItemOut( + name="Catalogue Item A", + description="This is Catalogue Item A", + **catalogue_item_info, + ), + ) + # Mock `update` to return the updated catalogue item + test_helpers.mock_update(catalogue_item_repository_mock, catalogue_item) + + updated_catalogue_item = catalogue_item_service.update( + catalogue_item.id, + CatalogueItemPatchRequestSchema(name=catalogue_item.name, description=catalogue_item.description), + ) + + catalogue_item_repository_mock.update.assert_called_once_with( + catalogue_item.id, CatalogueItemIn(**full_catalogue_item_info) + ) + assert updated_catalogue_item == catalogue_item + + +def test_update_with_nonexistent_id(test_helpers, catalogue_item_repository_mock, catalogue_item_service): + """ + Test updating a catalogue item with a non-existent ID. + + Verify that the `update` method properly handles the catalogue category to be updated with a non-existent ID. + """ + # Mock `get` to return a catalogue item + test_helpers.mock_get(catalogue_item_repository_mock, None) + + catalogue_item_id = str(ObjectId()) + with pytest.raises(MissingRecordError) as exc: + catalogue_item_service.update(catalogue_item_id, CatalogueItemPatchRequestSchema(properties=[])) + assert str(exc.value) == f"No catalogue item found with ID: {catalogue_item_id}" + + +def test_update_change_catalogue_category_id_same_defined_properties_without_supplied_properties( + test_helpers, catalogue_category_repository_mock, catalogue_item_repository_mock, catalogue_item_service +): + """ + Test moving a catalogue item to another catalogue category that has the same defined catalogue item properties when + no properties are supplied. + """ + catalogue_item_info = { + "id": str(ObjectId()), + "name": "Catalogue Item A", + "description": "This is Catalogue Item A", + "properties": [ + {"name": "Property A", "value": 20, "unit": "mm"}, + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10", "unit": "cm"}, + ], + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + } + full_catalogue_item_info = { + **catalogue_item_info, + "catalogue_category_id": str(ObjectId()), + } + catalogue_item = CatalogueItemOut(**full_catalogue_item_info) + + # Mock `get` to return a catalogue item + test_helpers.mock_get( + catalogue_item_repository_mock, + CatalogueItemOut( + catalogue_category_id=str(ObjectId()), + **catalogue_item_info, + ), + ) + # Mock `get` to return a catalogue category + test_helpers.mock_get( + catalogue_category_repository_mock, + CatalogueCategoryOut( + id=catalogue_item.catalogue_category_id, + name="Category A", + code="category-a", + is_leaf=True, + path="/category-a", + parent_path="/", + parent_id=None, + catalogue_item_properties=[ + CatalogueItemProperty(name="Property A", type="number", unit="mm", mandatory=False), + CatalogueItemProperty(name="Property B", type="boolean", mandatory=True), + CatalogueItemProperty(name="Property C", type="string", unit="cm", mandatory=True), + ], + ), + ) + # Mock `update` to return the updated catalogue item + test_helpers.mock_update(catalogue_item_repository_mock, catalogue_item) + + updated_catalogue_item = catalogue_item_service.update( + catalogue_item.id, CatalogueItemPatchRequestSchema(catalogue_category_id=catalogue_item.catalogue_category_id) + ) + + catalogue_item_repository_mock.update.assert_called_once_with( + catalogue_item.id, CatalogueItemIn(**full_catalogue_item_info) + ) + assert updated_catalogue_item == catalogue_item + + +def test_update_change_catalogue_category_id_same_defined_properties_with_supplied_properties( + test_helpers, catalogue_category_repository_mock, catalogue_item_repository_mock, catalogue_item_service +): + """ + Test moving a catalogue item to another catalogue category that has the same defined catalogue item properties when + properties are supplied. + """ + catalogue_item_info = { + "id": str(ObjectId()), + "name": "Catalogue Item A", + "description": "This is Catalogue Item A", + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + } + full_catalogue_item_info = { + **catalogue_item_info, + "catalogue_category_id": str(ObjectId()), + "properties": [ + {"name": "Property A", "value": 20, "unit": "mm"}, + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10", "unit": "cm"}, + ], + } + catalogue_item = CatalogueItemOut(**full_catalogue_item_info) + + # Mock `get` to return a catalogue item + test_helpers.mock_get( + catalogue_item_repository_mock, + CatalogueItemOut( + catalogue_category_id=str(ObjectId()), + properties=[ + Property(name="Property A", value=1, unit="mm"), + Property(name="Property B", value=True), + Property(name="Property C", value="1x1x1", unit="cm"), + ], + **catalogue_item_info, + ), + ) + # Mock `get` to return a catalogue category + test_helpers.mock_get( + catalogue_category_repository_mock, + CatalogueCategoryOut( + id=catalogue_item.catalogue_category_id, + name="Category A", + code="category-a", + is_leaf=True, + path="/category-a", + parent_path="/", + parent_id=None, + catalogue_item_properties=[ + CatalogueItemProperty(name="Property A", type="number", unit="mm", mandatory=False), + CatalogueItemProperty(name="Property B", type="boolean", mandatory=True), + CatalogueItemProperty(name="Property C", type="string", unit="cm", mandatory=True), + ], + ), + ) + # Mock `update` to return the updated catalogue item + test_helpers.mock_update(catalogue_item_repository_mock, catalogue_item) + + updated_catalogue_item = catalogue_item_service.update( + catalogue_item.id, + CatalogueItemPatchRequestSchema( + catalogue_category_id=catalogue_item.catalogue_category_id, + properties=[{"name": prop.name, "value": prop.value} for prop in catalogue_item.properties], + ), + ) + + catalogue_item_repository_mock.update.assert_called_once_with( + catalogue_item.id, CatalogueItemIn(**full_catalogue_item_info) + ) + assert updated_catalogue_item == catalogue_item + + +def test_update_change_catalogue_category_id_different_defined_properties_without_supplied_properties( + test_helpers, catalogue_category_repository_mock, catalogue_item_repository_mock, catalogue_item_service +): + """ + Test moving a catalogue item to another catalogue category that has different defined catalogue item properties when + no properties are supplied. + """ + catalogue_item_info = { + "id": str(ObjectId()), + "name": "Catalogue Item A", + "description": "This is Catalogue Item A", + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + "properties": [ + {"name": "Property A", "value": 20, "unit": "mm"}, + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10", "unit": "cm"}, + ], + } + + # Mock `get` to return a catalogue item + test_helpers.mock_get( + catalogue_item_repository_mock, + CatalogueItemOut( + catalogue_category_id=str(ObjectId()), + **catalogue_item_info, + ), + ) + catalogue_category_id = str(ObjectId()) + # Mock `get` to return a catalogue category + test_helpers.mock_get( + catalogue_category_repository_mock, + CatalogueCategoryOut( + id=catalogue_category_id, + name="Category A", + code="category-a", + is_leaf=True, + path="/category-a", + parent_path="/", + parent_id=None, + catalogue_item_properties=[ + CatalogueItemProperty(name="Property A", type="boolean", mandatory=True), + ], + ), + ) + + with pytest.raises(InvalidCatalogueItemPropertyTypeError) as exc: + catalogue_item_service.update( + catalogue_item_info["id"], + CatalogueItemPatchRequestSchema(catalogue_category_id=catalogue_category_id), + ) + assert str(exc.value) == "Invalid value type for catalogue item property 'Property A'. Expected type: boolean." + + +def test_update_change_catalogue_category_id_different_defined_properties_with_supplied_properties( + test_helpers, catalogue_category_repository_mock, catalogue_item_repository_mock, catalogue_item_service +): + """ + Test moving a catalogue item to another catalogue category that has different defined catalogue item properties when + properties are supplied. + """ + catalogue_item_info = { + "id": str(ObjectId()), + "name": "Catalogue Item A", + "description": "This is Catalogue Item A", + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + } + full_catalogue_item_info = { + **catalogue_item_info, + "catalogue_category_id": str(ObjectId()), + "properties": [ + {"name": "Property A", "value": 20, "unit": "mm"}, + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10", "unit": "cm"}, + ], + } + catalogue_item = CatalogueItemOut(**full_catalogue_item_info) + + # Mock `get` to return a catalogue item + test_helpers.mock_get( + catalogue_item_repository_mock, + CatalogueItemOut( + catalogue_category_id=str(ObjectId()), + properties=[{"name": "Property A", "value": True}], + **catalogue_item_info, + ), + ) + # Mock `get` to return a catalogue category + test_helpers.mock_get( + catalogue_category_repository_mock, + CatalogueCategoryOut( + id=catalogue_item.catalogue_category_id, + name="Category A", + code="category-a", + is_leaf=True, + path="/category-a", + parent_path="/", + parent_id=None, + catalogue_item_properties=[ + CatalogueItemProperty(name="Property A", type="number", unit="mm", mandatory=False), + CatalogueItemProperty(name="Property B", type="boolean", mandatory=True), + CatalogueItemProperty(name="Property C", type="string", unit="cm", mandatory=True), + ], + ), + ) + # Mock `update` to return the updated catalogue item + test_helpers.mock_update(catalogue_item_repository_mock, catalogue_item) + + updated_catalogue_item = catalogue_item_service.update( + catalogue_item.id, + CatalogueItemPatchRequestSchema( + catalogue_category_id=catalogue_item.catalogue_category_id, + properties=[{"name": prop.name, "value": prop.value} for prop in catalogue_item.properties], + ), + ) + + catalogue_item_repository_mock.update.assert_called_once_with( + catalogue_item.id, CatalogueItemIn(**full_catalogue_item_info) + ) + assert updated_catalogue_item == catalogue_item + + +def test_update_with_nonexistent_catalogue_category_id( + test_helpers, catalogue_category_repository_mock, catalogue_item_repository_mock, catalogue_item_service +): + """ + Test updating a catalogue item with a non-existent catalogue category ID. + """ + catalogue_item_info = { + "id": str(ObjectId()), + "catalogue_category_id": str(ObjectId()), + "name": "Catalogue Item A", + "description": "This is Catalogue Item A", + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + "properties": [], + } + + # Mock `get` to return a catalogue item + test_helpers.mock_get(catalogue_item_repository_mock, CatalogueItemOut(**catalogue_item_info)) + # Mock `get` to not return a catalogue category + test_helpers.mock_get(catalogue_category_repository_mock, None) + + catalogue_category_id = str(ObjectId()) + with pytest.raises(MissingRecordError) as exc: + catalogue_item_service.update( + catalogue_item_info["id"], + CatalogueItemPatchRequestSchema(catalogue_category_id=catalogue_category_id), + ) + assert str(exc.value) == f"No catalogue category found with ID: {catalogue_category_id}" + + +def test_update_change_catalogue_category_id_non_leaf_catalogue_category( + test_helpers, catalogue_category_repository_mock, catalogue_item_repository_mock, catalogue_item_service +): + """ + Test moving a catalogue item to a non-leaf catalogue category. + """ + catalogue_item_info = { + "id": str(ObjectId()), + "catalogue_category_id": str(ObjectId()), + "name": "Catalogue Item A", + "description": "This is Catalogue Item A", + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + "properties": [], + } + + # Mock `get` to return a catalogue item + test_helpers.mock_get(catalogue_item_repository_mock, CatalogueItemOut(**catalogue_item_info)) + catalogue_category_id = str(ObjectId()) + # Mock `get` to return a catalogue category + # pylint: disable=duplicate-code + test_helpers.mock_get( + catalogue_category_repository_mock, + CatalogueCategoryOut( + id=catalogue_category_id, + name="Category A", + code="category-a", + is_leaf=False, + path="/category-a", + parent_path="/", + parent_id=None, + catalogue_item_properties=[], + ), + ) + # pylint: enable=duplicate-code + + with pytest.raises(NonLeafCategoryError) as exc: + catalogue_item_service.update( + catalogue_item_info["id"], + CatalogueItemPatchRequestSchema(catalogue_category_id=catalogue_category_id), + ) + assert str(exc.value) == "Cannot add catalogue item to a non-leaf catalogue category" + + +def test_update_add_non_mandatory_property( + test_helpers, catalogue_category_repository_mock, catalogue_item_repository_mock, catalogue_item_service +): + """ + Test adding a non-mandatory catalogue item property and a value. + """ + catalogue_item_info = { + "id": str(ObjectId()), + "catalogue_category_id": str(ObjectId()), + "name": "Catalogue Item A", + "description": "This is Catalogue Item A", + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + "properties": [ + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10", "unit": "cm"}, + ], + } + full_catalogue_item_info = { + **catalogue_item_info, + "properties": [{"name": "Property A", "value": 20, "unit": "mm"}] + catalogue_item_info["properties"], + } + catalogue_item = CatalogueItemOut(**full_catalogue_item_info) + + # Mock `get` to return a catalogue item + test_helpers.mock_get( + catalogue_item_repository_mock, + CatalogueItemOut(**catalogue_item_info), + ) + # Mock `get` to return a catalogue category + test_helpers.mock_get( + catalogue_category_repository_mock, + CatalogueCategoryOut( + id=catalogue_item.catalogue_category_id, + name="Category A", + code="category-a", + is_leaf=True, + path="/category-a", + parent_path="/", + parent_id=None, + catalogue_item_properties=[ + CatalogueItemProperty(name="Property A", type="number", unit="mm", mandatory=False), + CatalogueItemProperty(name="Property B", type="boolean", mandatory=True), + CatalogueItemProperty(name="Property C", type="string", unit="cm", mandatory=True), + ], + ), + ) + # Mock `update` to return the updated catalogue item + test_helpers.mock_update(catalogue_item_repository_mock, catalogue_item) + + updated_catalogue_item = catalogue_item_service.update( + catalogue_item.id, + CatalogueItemPatchRequestSchema( + properties=[{"name": prop.name, "value": prop.value} for prop in catalogue_item.properties] + ), + ) + + catalogue_item_repository_mock.update.assert_called_once_with( + catalogue_item.id, CatalogueItemIn(**full_catalogue_item_info) + ) + assert updated_catalogue_item == catalogue_item + + +def test_update_remove_non_mandatory_property( + test_helpers, catalogue_category_repository_mock, catalogue_item_repository_mock, catalogue_item_service +): + """ + Test removing a non-mandatory catalogue item property and its value. + """ + catalogue_item_info = { + "id": str(ObjectId()), + "catalogue_category_id": str(ObjectId()), + "name": "Catalogue Item A", + "description": "This is Catalogue Item A", + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + "properties": [ + {"name": "Property A", "value": 20, "unit": "mm"}, + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10", "unit": "cm"}, + ], + } + full_catalogue_item_info = {**catalogue_item_info, "properties": catalogue_item_info["properties"][-2:]} + catalogue_item = CatalogueItemOut(**full_catalogue_item_info) + + # Mock `get` to return a catalogue item + test_helpers.mock_get( + catalogue_item_repository_mock, + CatalogueItemOut(**catalogue_item_info), + ) + # Mock `get` to return a catalogue category + test_helpers.mock_get( + catalogue_category_repository_mock, + CatalogueCategoryOut( + id=catalogue_item.catalogue_category_id, + name="Category A", + code="category-a", + is_leaf=True, + path="/category-a", + parent_path="/", + parent_id=None, + catalogue_item_properties=[ + CatalogueItemProperty(name="Property A", type="number", unit="mm", mandatory=False), + CatalogueItemProperty(name="Property B", type="boolean", mandatory=True), + CatalogueItemProperty(name="Property C", type="string", unit="cm", mandatory=True), + ], + ), + ) + # Mock `update` to return the updated catalogue item + test_helpers.mock_update(catalogue_item_repository_mock, catalogue_item) + + updated_catalogue_item = catalogue_item_service.update( + catalogue_item.id, + CatalogueItemPatchRequestSchema( + properties=[{"name": prop.name, "value": prop.value} for prop in catalogue_item.properties] + ), + ) + + catalogue_item_repository_mock.update.assert_called_once_with( + catalogue_item.id, CatalogueItemIn(**full_catalogue_item_info) + ) + assert updated_catalogue_item == catalogue_item + + +def test_update_remove_mandatory_property( + test_helpers, catalogue_category_repository_mock, catalogue_item_repository_mock, catalogue_item_service +): + """ + Test removing a mandatory catalogue item property and its value. + """ + full_catalogue_item_info = { + "id": str(ObjectId()), + "catalogue_category_id": str(ObjectId()), + "name": "Catalogue Item A", + "description": "This is Catalogue Item A", + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + "properties": [ + {"name": "Property A", "value": 20, "unit": "mm"}, + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10", "unit": "cm"}, + ], + } + catalogue_item = CatalogueItemOut(**full_catalogue_item_info) + + # Mock `get` to return a catalogue item + test_helpers.mock_get( + catalogue_item_repository_mock, + CatalogueItemOut(**full_catalogue_item_info), + ) + # Mock `get` to return a catalogue category + test_helpers.mock_get( + catalogue_category_repository_mock, + CatalogueCategoryOut( + id=catalogue_item.catalogue_category_id, + name="Category A", + code="category-a", + is_leaf=True, + path="/category-a", + parent_path="/", + parent_id=None, + catalogue_item_properties=[ + CatalogueItemProperty(name="Property A", type="number", unit="mm", mandatory=False), + CatalogueItemProperty(name="Property B", type="boolean", mandatory=True), + CatalogueItemProperty(name="Property C", type="string", unit="cm", mandatory=True), + ], + ), + ) + + with pytest.raises(MissingMandatoryCatalogueItemProperty) as exc: + catalogue_item_service.update( + catalogue_item.id, + CatalogueItemPatchRequestSchema( + properties=[{"name": prop.name, "value": prop.value} for prop in catalogue_item.properties[:2]] + ), + ) + assert str(exc.value) == f"Missing mandatory catalogue item property: '{catalogue_item.properties[2].name}'" + + +def test_update_change_property_value( + test_helpers, catalogue_category_repository_mock, catalogue_item_repository_mock, catalogue_item_service +): + """ + Test updating a value of a property. + """ + catalogue_item_info = { + "id": str(ObjectId()), + "catalogue_category_id": str(ObjectId()), + "name": "Catalogue Item A", + "description": "This is Catalogue Item A", + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + "properties": [ + {"name": "Property A", "value": 20, "unit": "mm"}, + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10", "unit": "cm"}, + ], + } + full_catalogue_item_info = { + **catalogue_item_info, + "properties": [{"name": "Property A", "value": 1, "unit": "mm"}] + catalogue_item_info["properties"][-2:], + } + catalogue_item = CatalogueItemOut(**full_catalogue_item_info) + + # Mock `get` to return a catalogue item + test_helpers.mock_get( + catalogue_item_repository_mock, + CatalogueItemOut(**catalogue_item_info), + ) + # Mock `get` to return a catalogue category + test_helpers.mock_get( + catalogue_category_repository_mock, + CatalogueCategoryOut( + id=catalogue_item.catalogue_category_id, + name="Category A", + code="category-a", + is_leaf=True, + path="/category-a", + parent_path="/", + parent_id=None, + catalogue_item_properties=[ + CatalogueItemProperty(name="Property A", type="number", unit="mm", mandatory=False), + CatalogueItemProperty(name="Property B", type="boolean", mandatory=True), + CatalogueItemProperty(name="Property C", type="string", unit="cm", mandatory=True), + ], + ), + ) + # Mock `update` to return the updated catalogue item + test_helpers.mock_update(catalogue_item_repository_mock, catalogue_item) + + updated_catalogue_item = catalogue_item_service.update( + catalogue_item.id, + CatalogueItemPatchRequestSchema( + properties=[{"name": prop.name, "value": prop.value} for prop in catalogue_item.properties] + ), + ) + + catalogue_item_repository_mock.update.assert_called_once_with( + catalogue_item.id, CatalogueItemIn(**full_catalogue_item_info) + ) + assert updated_catalogue_item == catalogue_item + + +def test_update_change_value_for_string_property_invalid_type( + test_helpers, catalogue_category_repository_mock, catalogue_item_repository_mock, catalogue_item_service +): + """ + Test changing the value of a string property to an invalid type. + """ + catalogue_item_info = { + "id": str(ObjectId()), + "catalogue_category_id": str(ObjectId()), + "name": "Catalogue Item A", + "description": "This is Catalogue Item A", + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + "properties": [ + {"name": "Property A", "value": 20, "unit": "mm"}, + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10", "unit": "cm"}, + ], + } + + # Mock `get` to return a catalogue item + test_helpers.mock_get( + catalogue_item_repository_mock, + CatalogueItemOut(**catalogue_item_info), + ) + # Mock `get` to return a catalogue category + test_helpers.mock_get( + catalogue_category_repository_mock, + CatalogueCategoryOut( + id=catalogue_item_info["catalogue_category_id"], + name="Category A", + code="category-a", + is_leaf=True, + path="/category-a", + parent_path="/", + parent_id=None, + catalogue_item_properties=[ + CatalogueItemProperty(name="Property A", type="number", unit="mm", mandatory=False), + CatalogueItemProperty(name="Property B", type="boolean", mandatory=True), + CatalogueItemProperty(name="Property C", type="string", unit="cm", mandatory=True), + ], + ), + ) + + properties = [{"name": prop["name"], "value": prop["value"]} for prop in catalogue_item_info["properties"]] + properties[2]["value"] = True + with pytest.raises(InvalidCatalogueItemPropertyTypeError) as exc: + catalogue_item_service.update( + catalogue_item_info["id"], + CatalogueItemPatchRequestSchema(properties=properties), + ) + assert str(exc.value) == "Invalid value type for catalogue item property 'Property C'. Expected type: string." + + +def test_update_change_value_for_number_property_invalid_type( + test_helpers, catalogue_category_repository_mock, catalogue_item_repository_mock, catalogue_item_service +): + """ + Test changing the value of a number property to an invalid type. + """ + catalogue_item_info = { + "id": str(ObjectId()), + "catalogue_category_id": str(ObjectId()), + "name": "Catalogue Item A", + "description": "This is Catalogue Item A", + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + "properties": [ + {"name": "Property A", "value": 20, "unit": "mm"}, + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10", "unit": "cm"}, + ], + } + + # Mock `get` to return a catalogue item + test_helpers.mock_get( + catalogue_item_repository_mock, + CatalogueItemOut(**catalogue_item_info), + ) + # Mock `get` to return a catalogue category + test_helpers.mock_get( + catalogue_category_repository_mock, + CatalogueCategoryOut( + id=catalogue_item_info["catalogue_category_id"], + name="Category A", + code="category-a", + is_leaf=True, + path="/category-a", + parent_path="/", + parent_id=None, + catalogue_item_properties=[ + CatalogueItemProperty(name="Property A", type="number", unit="mm", mandatory=False), + CatalogueItemProperty(name="Property B", type="boolean", mandatory=True), + CatalogueItemProperty(name="Property C", type="string", unit="cm", mandatory=True), + ], + ), + ) + + properties = [{"name": prop["name"], "value": prop["value"]} for prop in catalogue_item_info["properties"]] + properties[0]["value"] = "20" + with pytest.raises(InvalidCatalogueItemPropertyTypeError) as exc: + catalogue_item_service.update( + catalogue_item_info["id"], + CatalogueItemPatchRequestSchema(properties=properties), + ) + assert str(exc.value) == "Invalid value type for catalogue item property 'Property A'. Expected type: number." + + +def test_update_change_value_for_boolean_property_invalid_type( + test_helpers, catalogue_category_repository_mock, catalogue_item_repository_mock, catalogue_item_service +): + """ + Test changing the value of a boolean property to an invalid type. + """ + catalogue_item_info = { + "id": str(ObjectId()), + "catalogue_category_id": str(ObjectId()), + "name": "Catalogue Item A", + "description": "This is Catalogue Item A", + "manufacturer": { + "name": "Manufacturer A", + "address": "1 Address, City, Country, Postcode", + "web_url": "https://www.manufacturer-a.co.uk", + }, + "properties": [ + {"name": "Property A", "value": 20, "unit": "mm"}, + {"name": "Property B", "value": False}, + {"name": "Property C", "value": "20x15x10", "unit": "cm"}, + ], + } + + # Mock `get` to return a catalogue item + test_helpers.mock_get( + catalogue_item_repository_mock, + CatalogueItemOut(**catalogue_item_info), + ) + # Mock `get` to return a catalogue category + test_helpers.mock_get( + catalogue_category_repository_mock, + CatalogueCategoryOut( + id=catalogue_item_info["catalogue_category_id"], + name="Category A", + code="category-a", + is_leaf=True, + path="/category-a", + parent_path="/", + parent_id=None, + catalogue_item_properties=[ + CatalogueItemProperty(name="Property A", type="number", unit="mm", mandatory=False), + CatalogueItemProperty(name="Property B", type="boolean", mandatory=True), + CatalogueItemProperty(name="Property C", type="string", unit="cm", mandatory=True), + ], + ), + ) + + properties = [{"name": prop["name"], "value": prop["value"]} for prop in catalogue_item_info["properties"]] + properties[1]["value"] = "False" + with pytest.raises(InvalidCatalogueItemPropertyTypeError) as exc: + catalogue_item_service.update( + catalogue_item_info["id"], + CatalogueItemPatchRequestSchema(properties=properties), + ) + assert str(exc.value) == "Invalid value type for catalogue item property 'Property B'. Expected type: boolean." diff --git a/test/unit/services/test_system.py b/test/unit/services/test_system.py new file mode 100644 index 00000000..ce345eb5 --- /dev/null +++ b/test/unit/services/test_system.py @@ -0,0 +1,251 @@ +""" +Unit tests for the `SystemService` service +""" + +from typing import Optional +from unittest.mock import MagicMock + +from bson import ObjectId + +from inventory_management_system_api.models.system import SystemIn, SystemOut +from inventory_management_system_api.schemas.system import SystemPostRequestSchema + + +def _test_list(test_helpers, system_repository_mock, system_service, path: Optional[str], parent_path: Optional[str]): + """ + Utility method that tests getting Systems + + Verifies that the `list` method properly handles the retrieval of systems with the given filters + """ + systems = [MagicMock(), MagicMock()] + + # Mock `list` to return a list of systems + test_helpers.mock_list( + system_repository_mock, + systems, + ) + + retrieved_systems = system_service.list(path, parent_path) + + system_repository_mock.list.assert_called_once_with(path, parent_path) + assert retrieved_systems == systems + + +def test_create(test_helpers, system_repository_mock, system_service): + """ + Test creating a System + + Verify that the `create` method properly handles the System to be created, generates the code and paths, + and calls the repository's create method + """ + # pylint: disable=duplicate-code + system_info = { + "name": "Test name", + "location": "Test location", + "owner": "Test owner", + "importance": "low", + "description": "Test description", + "parent_id": None, + } + full_system_info = { + **system_info, + "code": "test-name", + "path": "/test-name", + "parent_path": "/", + } + system = SystemOut(id=str(ObjectId()), **full_system_info) + # pylint: enable=duplicate-code + + # Mock `create` to return the created System + test_helpers.mock_create(system_repository_mock, system) + + created_system = system_service.create(SystemPostRequestSchema(**system_info)) + + system_repository_mock.create.assert_called_with(SystemIn(**full_system_info)) + assert created_system == system + + +def test_delete(system_repository_mock, system_service): + """ + Test deleting a System + + Verify that the `delete` method properly handles the deletion of a System by ID + """ + system_id = MagicMock() + + system_service.delete(system_id) + + system_repository_mock.delete.assert_called_once_with(system_id) + + +def test_create_with_parent_id(test_helpers, system_repository_mock, system_service): + """ + Test creating a System with a parent ID + + Verify that the `create` method properly handles the System to be created when it has a parent ID + """ + system_info = { + "name": "Test name b", + "location": "Test location", + "owner": "Test owner", + "importance": "low", + "description": "Test description", + "parent_id": str(ObjectId()), + } + full_system_info = { + **system_info, + "code": "test-name-b", + "path": "/test-name-a/test-name-b", + "parent_path": "/test-name-a", + } + system = SystemOut(id=str(ObjectId()), **full_system_info) + + # Mock `get` to return the parent system + test_helpers.mock_get( + system_repository_mock, + SystemOut( + id=system.parent_id, + name="Test name a", + location="Test location", + owner="Test owner", + importance="low", + description="Test description", + parent_id=None, + code="test-name-a", + path="/test-name-a", + parent_path="/", + ), + ) + + # Mock `create` to return the created System + test_helpers.mock_create(system_repository_mock, system) + + created_system = system_service.create(SystemPostRequestSchema(**system_info)) + + system_repository_mock.create.assert_called_with(SystemIn(**full_system_info)) + assert created_system == system + + +def test_create_with_whitespace_name(test_helpers, system_repository_mock, system_service): + """ + Test creating a System with a name containing leading/trailing/consecutive whitespaces + + Verify that the `create` method trims the whitespace from the System name and handles + it correctly + """ + system_info = { + "name": " Test name ", + "location": "Test location", + "owner": "Test owner", + "importance": "low", + "description": "Test description", + "parent_id": None, + } + full_system_info = { + **system_info, + "code": "test-name", + "path": "/test-name", + "parent_path": "/", + } + system = SystemOut(id=str(ObjectId()), **full_system_info) + + # Mock `create` to return the created System + test_helpers.mock_create(system_repository_mock, system) + + created_system = system_service.create(SystemPostRequestSchema(**system_info)) + + system_repository_mock.create.assert_called_with(SystemIn(**full_system_info)) + assert created_system == system + + +def test_get(test_helpers, system_repository_mock, system_service): + """ + Test getting a System + + Verify that the `get` method properly handles the retrieval of a System + """ + + system_id = str(ObjectId()) + system = MagicMock() + + # Mock `get` to return a System + test_helpers.mock_get(system_repository_mock, system) + + retrieved_system = system_service.get(system_id) + + system_repository_mock.get.assert_called_once_with(system_id) + assert retrieved_system == system + + +def test_get_with_non_existent_id(test_helpers, system_repository_mock, system_service): + """ + Test getting a System with a non-existent ID + + Verify that the `get` method properly handles the retrieval of a System with a non-existent ID + """ + system_id = str(ObjectId()) + + # Mock `get` to not return a System + test_helpers.mock_get(system_repository_mock, None) + + retrieved_system = system_service.get(system_id) + + system_repository_mock.get.assert_called_once_with(system_id) + assert retrieved_system is None + + +def test_list(test_helpers, system_repository_mock, system_service): + """ + Test getting Systems + + Verify that the `list` method properly handles the retrieval of Systems without filters + """ + _test_list(test_helpers, system_repository_mock, system_service, None, None) + + +def test_list_with_path_filter(test_helpers, system_repository_mock, system_service): + """ + Test getting Systems based on the provided path filter + + Verify that the `list` method properly handles the retrieval of Systems based on the provided path filter + """ + _test_list(test_helpers, system_repository_mock, system_service, "/test-name-a", None) + + +def test_list_with_parent_path_filter(test_helpers, system_repository_mock, system_service): + """ + Test getting Systems based on the provided parent path filter + + Verify that the `list` method properly handles the retrieval of Systems based on the provided parent path filter + """ + _test_list(test_helpers, system_repository_mock, system_service, None, "/") + + +def test_list_with_path_and_parent_path_filter(test_helpers, system_repository_mock, system_service): + """ + Test getting Systems based on the provided parent path and parent path filters + + Verify that the `list` method properly handles the retrieval of Systems based on the provided path and + parent path filters + """ + _test_list(test_helpers, system_repository_mock, system_service, "/test-name-a", "/") + + +def test_list_with_path_and_parent_path_filters_no_matching_results( + test_helpers, system_repository_mock, system_service +): + """ + Test getting Systems based on the provided parent path and parent path filters when there is no + matching results in the database + + Verify that the `list` method properly handles the retrieval of Systems based on the provided path and + parent path filters when there is no matching results in the database + """ + + # Mock `list` to return an empty list of Systems + test_helpers.mock_list(system_repository_mock, []) + + retrieved_systems = system_service.list("/test-name-a", "/") + + system_repository_mock.list.assert_called_once_with("/test-name-a", "/") + assert retrieved_systems == []