Skip to content

Commit

Permalink
Code review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathangreen committed Sep 11, 2024
1 parent d479a82 commit b3c5c67
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 38 deletions.
38 changes: 12 additions & 26 deletions src/palace/manager/marc/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,17 @@ def _s3_sync(self, needs_upload: Sequence[str]) -> None:
)
self.upload_session.add_part_and_clear_buffer(key, upload_part)

def sync(self) -> None:
def sync(self, *, complete: bool = False) -> None:
# First sync our buffers to redis
buffer_lengths = self.upload_session.append_buffers(self._buffers)
self._buffers.clear()

# Then, if any of our redis buffers are large enough, upload them to S3
# Then, if any of our redis buffers are large enough, or the upload is complete
# sync them to S3.
needs_upload = [
key
for key, length in buffer_lengths.items()
if length > self.storage_service.MINIMUM_MULTIPART_UPLOAD_SIZE
if length > self.storage_service.MINIMUM_MULTIPART_UPLOAD_SIZE or complete
]

if not needs_upload:
Expand Down Expand Up @@ -94,32 +95,17 @@ def _abort(self) -> None:

def complete(self) -> set[str]:
# Make sure any local data we have is synced
self.sync()
self.sync(complete=True)

in_progress = self.upload_session.get()
for key, upload in in_progress.items():
if upload.upload_id is None:
# We haven't started the upload. At this point there is no reason to start a
# multipart upload, just upload the file directly and continue.
self.storage_service.store(
key, upload.buffer, Representation.MARC_MEDIA_TYPE
)
else:
if upload.buffer != "":
# Upload the last chunk if the buffer is not empty, the final part has no
# minimum size requirement.
upload_part = self.storage_service.multipart_upload(
key,
upload.upload_id,
len(upload.parts) + 1,
upload.buffer.encode(),
)
upload.parts.append(upload_part)

# Complete the multipart upload
self.storage_service.multipart_complete(
key, upload.upload_id, upload.parts
)
# Since the sync method is called with complete=True, all the data should be in S3
# and have a valid upload_id. Therefore, we can complete the upload. Mypy doesn't
# know that upload_id shouldn't be None though, so we assert it for safety.
assert upload.upload_id is not None

# Complete the multipart upload
self.storage_service.multipart_complete(key, upload.upload_id, upload.parts)

# Delete our in-progress uploads data from redis
if in_progress:
Expand Down
5 changes: 5 additions & 0 deletions src/palace/manager/service/redis/models/marc.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ def get_part_num_and_buffer(self, key: str) -> tuple[int, str]:
)

buffer_data: str = self._parse_value_or_raise(results[0])
# AWS S3 requires part numbers to start at 1, so we need to increment by 1.
#
# NOTE: This is not true in minio (our local development environment), minio
# allows both 0 and 1 as the first part number. So tests will pass if this is
# changed, but it will fail when running in an actual AWS environment.
part_number: int = self._parse_value_or_raise(results[1]) + 1

return part_number, buffer_data
Expand Down
22 changes: 11 additions & 11 deletions tests/manager/marc/test_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,18 +349,18 @@ def test_real_storage_service(
with uploader.begin() as locked:
assert locked

# Test all three cases for the complete() method.
# Test three buffer size cases for the complete() method.
#
# 1. A small record that doesn't need to be uploaded in parts, it just
# gets uploaded directly when complete is called (test1).
# 2. A large record that needs to be uploaded in parts, on the first sync
# call its buffer is large enough to trigger the upload. When complete
# is called, there is no data in the buffer, so no final part needs to be
# uploaded (test2).
# 3. A large record that needs to be uploaded in parts, on the first sync
# call its buffer is large enough to trigger the upload. When complete
# is called, there is data in the buffer, so a final part needs to be
# uploaded (test3).
# 1. A small record that isn't in S3 at the time `complete` is called (test1).
# 2. A large record that needs to be uploaded in parts. On the first `sync`
# call, its buffer is large enough to trigger an upload. When `complete` is
# called, the buffer has enough data waiting that it would normally trigger
# another upload (test2).
# 3. A large record that needs to be uploaded in parts. On the first `sync`
# call, its buffer is large enough to trigger the upload. When `complete`
# is called, there is data in the buffer, but not enough to trigger another
# upload normally. However, since the record is complete, it still
# needs to be uploaded (test3).

uploader.add_record("test1", b"test_record")
uploader.add_record("test2", b"a" * batch_size)
Expand Down
6 changes: 5 additions & 1 deletion tests/manager/service/redis/models/test_marc.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,8 @@ def test_get_part_num_and_buffer(

marc_file_upload_session_fixture.load_test_data()

# If the buffer has been set, but no parts have been added
# If the buffer has been set, but no parts have been added. The first part number
# should be 1. The buffer should be the same as the original data.
assert uploads.get_part_num_and_buffer(
marc_file_upload_session_fixture.mock_upload_key_1
) == (
Expand All @@ -426,10 +427,12 @@ def test_get_part_num_and_buffer(

with uploads.lock() as locked:
assert locked
# Add part 1
uploads.add_part_and_clear_buffer(
marc_file_upload_session_fixture.mock_upload_key_1,
marc_file_upload_session_fixture.part_1,
)
# Add part 2
uploads.add_part_and_clear_buffer(
marc_file_upload_session_fixture.mock_upload_key_1,
marc_file_upload_session_fixture.part_2,
Expand All @@ -440,6 +443,7 @@ def test_get_part_num_and_buffer(
}
)

# The next part number should be 3, and the buffer should be the new data
assert uploads.get_part_num_and_buffer(
marc_file_upload_session_fixture.mock_upload_key_1
) == (3, "1234567")
Expand Down

0 comments on commit b3c5c67

Please sign in to comment.