Skip to content

Commit

Permalink
Fix another issue with multipart s3 upload and add additional test.
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathangreen committed Sep 11, 2024
1 parent 620c3e0 commit 3403453
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 8 deletions.
5 changes: 4 additions & 1 deletion src/palace/manager/marc/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ def complete(self) -> set[str]:
# Upload the last chunk if the buffer is not empty, the final part has no
# minimum size requirement.
upload_part = self.storage_service.multipart_upload(
key, upload.upload_id, len(upload.parts), upload.buffer.encode()
key,
upload.upload_id,
len(upload.parts) + 1,
upload.buffer.encode(),
)
upload.parts.append(upload_part)

Expand Down
12 changes: 7 additions & 5 deletions tests/fixtures/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class MockMultipartUploadPart:
class MockMultipartUpload:
key: str
upload_id: str
parts: list[MockMultipartUploadPart] = field(default_factory=list)
parts: dict[int, MockMultipartUploadPart] = field(default_factory=dict)
content_type: str | None = None


Expand Down Expand Up @@ -143,8 +143,8 @@ def multipart_upload(
part = MultipartS3UploadPart(etag=etag, part_number=part_number)
assert key in self.upload_in_progress
assert self.upload_in_progress[key].upload_id == upload_id
self.upload_in_progress[key].parts.append(
MockMultipartUploadPart(part, content)
self.upload_in_progress[key].parts[part_number] = MockMultipartUploadPart(
part, content
)
return part

Expand All @@ -154,11 +154,13 @@ def multipart_complete(
assert key in self.upload_in_progress
assert self.upload_in_progress[key].upload_id == upload_id
complete_upload = self.upload_in_progress.pop(key)
for part_stored, part_passed_in in zip(complete_upload.parts, parts):
for part_stored, part_passed_in in zip(complete_upload.parts.values(), parts):
assert part_stored.part_data == part_passed_in
self.uploads[key] = MockS3ServiceUpload(
key,
b"".join(part_stored.content for part_stored in complete_upload.parts),
b"".join(
part_stored.content for part_stored in complete_upload.parts.values()
),
complete_upload.content_type,
)

Expand Down
66 changes: 64 additions & 2 deletions tests/manager/marc/test_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
)
from palace.manager.sqlalchemy.model.resource import Representation
from tests.fixtures.redis import RedisFixture
from tests.fixtures.s3 import S3ServiceFixture
from tests.fixtures.s3 import S3ServiceFixture, S3ServiceIntegrationFixture


class MarcUploadManagerFixture:
Expand Down Expand Up @@ -189,7 +189,7 @@ def test_sync(self, marc_upload_manager_fixture: MarcUploadManagerFixture):
]
assert upload.upload_id is not None
assert upload.content_type is Representation.MARC_MEDIA_TYPE
[part] = upload.parts
[part] = upload.parts.values()
assert part.content == marc_upload_manager_fixture.test_record1 * 5

# And the s3 part data and upload_id is synced to redis
Expand Down Expand Up @@ -332,3 +332,65 @@ def test__abort(

# The redis record should have been deleted
mock_delete.assert_called_once()

def test_real_storage_service(
self,
redis_fixture: RedisFixture,
s3_service_integration_fixture: S3ServiceIntegrationFixture,
):
"""
Full end-to-end test of the MarcUploadManager using the real S3Service
"""
s3_service = s3_service_integration_fixture.public
uploads = MarcFileUploadSession(redis_fixture.client, 99)
uploader = MarcUploadManager(s3_service, uploads)

with uploader.begin() as locked:
assert locked

# Test all three cases for the complete() method.
#
# 1. A small record that doesn't need to be uploaded in parts, it just
# gets uploaded directly when complete is called (test1).
# 2. A large record that needs to be uploaded in parts, on the first sync
# call its buffer is large enough to trigger the upload. When complete
# is called, there is no data in the buffer, so no final part needs to be
# uploaded (test2).
# 3. A large record that needs to be uploaded in parts, on the first sync
# call its buffer is large enough to trigger the upload. When complete
# is called, there is data in the buffer, so a final part needs to be
# uploaded (test3).

batch_size = s3_service.MINIMUM_MULTIPART_UPLOAD_SIZE + 1

uploader.add_record("test1", b"test_record")
uploader.add_record("test2", b"a" * batch_size)
uploader.add_record("test3", b"b" * batch_size)

# Start the sync. This will begin the multipart upload for test2 and test3.
uploader.sync()

# Add some more data
uploader.add_record("test1", b"test_record")
uploader.add_record("test2", b"a" * batch_size)
uploader.add_record("test3", b"b")

# Complete the uploads
completed = uploader.complete()

assert completed == {"test1", "test2", "test3"}
assert uploads.get() == {}
assert set(s3_service_integration_fixture.list_objects("public")) == completed

assert (
s3_service_integration_fixture.get_object("public", "test1")
== b"test_recordtest_record"
)
assert (
s3_service_integration_fixture.get_object("public", "test2")
== b"a" * (s3_service.MINIMUM_MULTIPART_UPLOAD_SIZE + 1) * 2
)
assert (
s3_service_integration_fixture.get_object("public", "test3")
== b"b" * (s3_service.MINIMUM_MULTIPART_UPLOAD_SIZE + 1) + b"b"
)

0 comments on commit 3403453

Please sign in to comment.