Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix another S3 multipart upload issue with marc exporter (PP-1693) #2053

Merged
merged 3 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/palace/manager/marc/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ def complete(self) -> set[str]:
# Upload the last chunk if the buffer is not empty, the final part has no
# minimum size requirement.
upload_part = self.storage_service.multipart_upload(
key, upload.upload_id, len(upload.parts), upload.buffer.encode()
key,
upload.upload_id,
len(upload.parts) + 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be better to have this "+1" logic in a single function with the explanation in its docstring and call that from the multiple places that need it. That might make it a little more clear what's going on for future us.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simplified the logic here so its easier to follow, and the part logic with offsets is all encapsulated in the same place.

The simplification comes at the expense of efficiency, since it will require a couple more calls to redis and another call to s3, but that inefficiency probably doesn't make much a difference here and its much more readable.

upload.buffer.encode(),
)
upload.parts.append(upload_part)

Expand Down
16 changes: 10 additions & 6 deletions tests/fixtures/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class MockMultipartUploadPart:
class MockMultipartUpload:
key: str
upload_id: str
parts: list[MockMultipartUploadPart] = field(default_factory=list)
parts: dict[int, MockMultipartUploadPart] = field(default_factory=dict)
content_type: str | None = None


Expand Down Expand Up @@ -143,8 +143,8 @@ def multipart_upload(
part = MultipartS3UploadPart(etag=etag, part_number=part_number)
assert key in self.upload_in_progress
assert self.upload_in_progress[key].upload_id == upload_id
self.upload_in_progress[key].parts.append(
MockMultipartUploadPart(part, content)
self.upload_in_progress[key].parts[part_number] = MockMultipartUploadPart(
part, content
)
return part

Expand All @@ -154,11 +154,15 @@ def multipart_complete(
assert key in self.upload_in_progress
assert self.upload_in_progress[key].upload_id == upload_id
complete_upload = self.upload_in_progress.pop(key)
for part_stored, part_passed_in in zip(complete_upload.parts, parts):
assert part_stored.part_data == part_passed_in
assert len(complete_upload.parts) == len(parts)
expected_parts = [x.part_data for x in complete_upload.parts.values()]
expected_parts.sort(key=lambda x: x.part_number)
assert parts == expected_parts
self.uploads[key] = MockS3ServiceUpload(
key,
b"".join(part_stored.content for part_stored in complete_upload.parts),
b"".join(
part_stored.content for part_stored in complete_upload.parts.values()
),
complete_upload.content_type,
)

Expand Down
65 changes: 63 additions & 2 deletions tests/manager/marc/test_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
)
from palace.manager.sqlalchemy.model.resource import Representation
from tests.fixtures.redis import RedisFixture
from tests.fixtures.s3 import S3ServiceFixture
from tests.fixtures.s3 import S3ServiceFixture, S3ServiceIntegrationFixture


class MarcUploadManagerFixture:
Expand Down Expand Up @@ -189,7 +189,7 @@ def test_sync(self, marc_upload_manager_fixture: MarcUploadManagerFixture):
]
assert upload.upload_id is not None
assert upload.content_type is Representation.MARC_MEDIA_TYPE
[part] = upload.parts
[part] = upload.parts.values()
assert part.content == marc_upload_manager_fixture.test_record1 * 5

# And the s3 part data and upload_id is synced to redis
Expand Down Expand Up @@ -332,3 +332,64 @@ def test__abort(

# The redis record should have been deleted
mock_delete.assert_called_once()

def test_real_storage_service(
self,
redis_fixture: RedisFixture,
s3_service_integration_fixture: S3ServiceIntegrationFixture,
):
"""
Full end-to-end test of the MarcUploadManager using the real S3Service
"""
s3_service = s3_service_integration_fixture.public
uploads = MarcFileUploadSession(redis_fixture.client, 99)
uploader = MarcUploadManager(s3_service, uploads)
batch_size = s3_service.MINIMUM_MULTIPART_UPLOAD_SIZE + 1

with uploader.begin() as locked:
assert locked

# Test all three cases for the complete() method.
#
# 1. A small record that doesn't need to be uploaded in parts, it just
# gets uploaded directly when complete is called (test1).
# 2. A large record that needs to be uploaded in parts, on the first sync
# call its buffer is large enough to trigger the upload. When complete
# is called, there is no data in the buffer, so no final part needs to be
# uploaded (test2).
# 3. A large record that needs to be uploaded in parts, on the first sync
# call its buffer is large enough to trigger the upload. When complete
# is called, there is data in the buffer, so a final part needs to be
# uploaded (test3).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor/pedantic: It took me a minute to process this explanation because of the sentence structure. Maybe start a new sentence or add a semicolon after "... in parts" at the beginning of each case explanation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-worked this comment.


uploader.add_record("test1", b"test_record")
uploader.add_record("test2", b"a" * batch_size)
uploader.add_record("test3", b"b" * batch_size)

# Start the sync. This will begin the multipart upload for test2 and test3.
uploader.sync()

# Add some more data
uploader.add_record("test1", b"test_record")
uploader.add_record("test2", b"a" * batch_size)
uploader.add_record("test3", b"b")

# Complete the uploads
completed = uploader.complete()

assert completed == {"test1", "test2", "test3"}
assert uploads.get() == {}
assert set(s3_service_integration_fixture.list_objects("public")) == completed

assert (
s3_service_integration_fixture.get_object("public", "test1")
== b"test_record" * 2
)
assert (
s3_service_integration_fixture.get_object("public", "test2")
== b"a" * batch_size * 2
)
assert (
s3_service_integration_fixture.get_object("public", "test3")
== b"b" * batch_size + b"b"
)