diff --git a/src/palace/manager/marc/uploader.py b/src/palace/manager/marc/uploader.py index 81677977d..98026e83a 100644 --- a/src/palace/manager/marc/uploader.py +++ b/src/palace/manager/marc/uploader.py @@ -109,7 +109,10 @@ def complete(self) -> set[str]: # Upload the last chunk if the buffer is not empty, the final part has no # minimum size requirement. upload_part = self.storage_service.multipart_upload( - key, upload.upload_id, len(upload.parts), upload.buffer.encode() + key, + upload.upload_id, + len(upload.parts) + 1, + upload.buffer.encode(), ) upload.parts.append(upload_part) diff --git a/tests/fixtures/s3.py b/tests/fixtures/s3.py index ef6fbdc70..44c65dc94 100644 --- a/tests/fixtures/s3.py +++ b/tests/fixtures/s3.py @@ -86,7 +86,7 @@ class MockMultipartUploadPart: class MockMultipartUpload: key: str upload_id: str - parts: list[MockMultipartUploadPart] = field(default_factory=list) + parts: dict[int, MockMultipartUploadPart] = field(default_factory=dict) content_type: str | None = None @@ -143,8 +143,8 @@ def multipart_upload( part = MultipartS3UploadPart(etag=etag, part_number=part_number) assert key in self.upload_in_progress assert self.upload_in_progress[key].upload_id == upload_id - self.upload_in_progress[key].parts.append( - MockMultipartUploadPart(part, content) + self.upload_in_progress[key].parts[part_number] = MockMultipartUploadPart( + part, content ) return part @@ -154,11 +154,13 @@ def multipart_complete( assert key in self.upload_in_progress assert self.upload_in_progress[key].upload_id == upload_id complete_upload = self.upload_in_progress.pop(key) - for part_stored, part_passed_in in zip(complete_upload.parts, parts): + for part_stored, part_passed_in in zip(complete_upload.parts.values(), parts): assert part_stored.part_data == part_passed_in self.uploads[key] = MockS3ServiceUpload( key, - b"".join(part_stored.content for part_stored in complete_upload.parts), + b"".join( + part_stored.content for part_stored in complete_upload.parts.values() + ), complete_upload.content_type, ) diff --git a/tests/manager/marc/test_uploader.py b/tests/manager/marc/test_uploader.py index bb7898e34..3a28fdb23 100644 --- a/tests/manager/marc/test_uploader.py +++ b/tests/manager/marc/test_uploader.py @@ -10,7 +10,7 @@ ) from palace.manager.sqlalchemy.model.resource import Representation from tests.fixtures.redis import RedisFixture -from tests.fixtures.s3 import S3ServiceFixture +from tests.fixtures.s3 import S3ServiceFixture, S3ServiceIntegrationFixture class MarcUploadManagerFixture: @@ -189,7 +189,7 @@ def test_sync(self, marc_upload_manager_fixture: MarcUploadManagerFixture): ] assert upload.upload_id is not None assert upload.content_type is Representation.MARC_MEDIA_TYPE - [part] = upload.parts + [part] = upload.parts.values() assert part.content == marc_upload_manager_fixture.test_record1 * 5 # And the s3 part data and upload_id is synced to redis @@ -332,3 +332,65 @@ def test__abort( # The redis record should have been deleted mock_delete.assert_called_once() + + def test_real_storage_service( + self, + redis_fixture: RedisFixture, + s3_service_integration_fixture: S3ServiceIntegrationFixture, + ): + """ + Full end-to-end test of the MarcUploadManager using the real S3Service + """ + s3_service = s3_service_integration_fixture.public + uploads = MarcFileUploadSession(redis_fixture.client, 99) + uploader = MarcUploadManager(s3_service, uploads) + + with uploader.begin() as locked: + assert locked + + # Test all three cases for the complete() method. + # + # 1. A small record that doesn't need to be uploaded in parts, it just + # gets uploaded directly when complete is called (test1). + # 2. A large record that needs to be uploaded in parts, on the first sync + # call its buffer is large enough to trigger the upload. When complete + # is called, there is no data in the buffer, so no final part needs to be + # uploaded (test2). + # 3. A large record that needs to be uploaded in parts, on the first sync + # call its buffer is large enough to trigger the upload. When complete + # is called, there is data in the buffer, so a final part needs to be + # uploaded (test3). + + batch_size = s3_service.MINIMUM_MULTIPART_UPLOAD_SIZE + 1 + + uploader.add_record("test1", b"test_record") + uploader.add_record("test2", b"a" * batch_size) + uploader.add_record("test3", b"b" * batch_size) + + # Start the sync. This will begin the multipart upload for test2 and test3. + uploader.sync() + + # Add some more data + uploader.add_record("test1", b"test_record") + uploader.add_record("test2", b"a" * batch_size) + uploader.add_record("test3", b"b") + + # Complete the uploads + completed = uploader.complete() + + assert completed == {"test1", "test2", "test3"} + assert uploads.get() == {} + assert set(s3_service_integration_fixture.list_objects("public")) == completed + + assert ( + s3_service_integration_fixture.get_object("public", "test1") + == b"test_recordtest_record" + ) + assert ( + s3_service_integration_fixture.get_object("public", "test2") + == b"a" * (s3_service.MINIMUM_MULTIPART_UPLOAD_SIZE + 1) * 2 + ) + assert ( + s3_service_integration_fixture.get_object("public", "test3") + == b"b" * (s3_service.MINIMUM_MULTIPART_UPLOAD_SIZE + 1) + b"b" + )