Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(batch-exports): Ignore completed upload ids in S3 when retrying #27305

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ def __init__(self, message: str = "Endpoint URL is invalid."):
super().__init__(message)


class UploadAlreadyFinishedError(Exception):
"""Exception raised when trying to resume an upload that already finished."""

def __init__(self):
super().__init__("Attempted to resume a multipart upload that has already completed or aborted.")


Part = dict[str, str | int]


Expand Down Expand Up @@ -317,16 +324,33 @@ async def start(self) -> str:

return upload_id

def continue_from_state(self, state: S3MultiPartUploadState):
async def continue_from_state(self, state: S3MultiPartUploadState):
"""Continue this S3MultiPartUpload from a previous state.

This method is intended to be used with the state found in an Activity heartbeat.
"""
is_multipart_upload_active = await self.is_multipart_upload_active(state.upload_id)

if not is_multipart_upload_active:
raise UploadAlreadyFinishedError()

self.upload_id = state.upload_id
self.parts = state.parts

return self.upload_id

async def is_multipart_upload_active(self, upload_id: str) -> bool:
"""Check if provided `upload_id` corresponds to an active multipart upload.

If this method returns `False`, it indicates the `upload_id` corresponds to
a multipart upload that has already completed or aborted.
"""
async with self.s3_client() as s3_client:
response = await s3_client.list_multipart_uploads(Bucket=self.bucket_name, Prefix=self.key)

active_uploads = {upload["UploadId"] for upload in response.get("Uploads", [])}
return upload_id in active_uploads

async def complete(self) -> str:
if self.is_upload_in_progress() is False:
raise NoUploadInProgressError()
Expand Down Expand Up @@ -614,7 +638,12 @@ async def initialize_and_resume_multipart_upload(
s3_upload = initialize_upload(inputs, file_number)

if details.upload_state:
s3_upload.continue_from_state(details.upload_state)
try:
await s3_upload.continue_from_state(details.upload_state)
except UploadAlreadyFinishedError:
await logger.awarning(
f"Attempted resume already finished upload. Upload '{details.upload_state.upload_id}' will be ignored"
)

if inputs.compression == "brotli":
# Even if we receive details we cannot resume a brotli compressed upload as
Expand Down
137 changes: 137 additions & 0 deletions posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1901,6 +1901,143 @@ def track_hearbeat_details(*details):
)


async def test_insert_into_s3_activity_ignores_completed_upload_id(
clickhouse_client, ateam, bucket_name, s3_batch_export, minio_client, activity_environment, s3_key_prefix
):
"""Test that if the insert_into_s3_activity activity fails, it will ignore completed uploads."""
data_interval_end = dt.datetime.fromisoformat("2023-04-20T14:30:00.000000+00:00")
data_interval_start = data_interval_end - s3_batch_export.interval_time_delta

n_expected_parts = 3

for i in range(1, n_expected_parts + 1):
part_inserted_at = data_interval_end - s3_batch_export.interval_time_delta / i

await generate_test_events_in_clickhouse(
client=clickhouse_client,
team_id=ateam.pk,
start_time=data_interval_start,
end_time=data_interval_end,
count=1,
count_outside_range=0,
count_other_team=0,
duplicate=False,
# We need at least 5MB for a multi-part upload which is what we are testing.
properties={"$chonky": ("a" * 5 * 2048**2)},
inserted_at=part_inserted_at,
)

attempt = 0

class FakeSession(aioboto3.Session):
@contextlib.asynccontextmanager
async def client(self, *args, **kwargs):
client = self._session.create_client(*args, **kwargs)

async with client as client:
original_upload_part = client.upload_part

async def faulty_upload_part(*args, **kwargs):
nonlocal attempt

attempt = attempt + 1

if attempt >= 2:
raise botocore.exceptions.ClientError(
error_response={
"Error": {"Code": "RequestTimeout", "Message": "Oh no!"},
"ResponseMetadata": {"MaxAttemptsReached": True, "RetryAttempts": 2}, # type: ignore
},
operation_name="UploadPart",
)
else:
return await original_upload_part(*args, **kwargs)

client.upload_part = faulty_upload_part

yield client

heartbeat_details: list[S3HeartbeatDetails] = []

def track_hearbeat_details(*details):
"""Record heartbeat details received."""
nonlocal heartbeat_details

s3_details = S3HeartbeatDetails.from_activity_details(details)
heartbeat_details.append(s3_details)

activity_environment.on_heartbeat = track_hearbeat_details

insert_inputs = S3InsertInputs(
bucket_name=bucket_name,
region="us-east-1",
prefix=s3_key_prefix,
team_id=ateam.pk,
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
aws_access_key_id="object_storage_root_user",
aws_secret_access_key="object_storage_root_password",
endpoint_url=settings.OBJECT_STORAGE_ENDPOINT,
)

with (
override_settings(BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=1, CLICKHOUSE_MAX_BLOCK_SIZE_DEFAULT=1),
mock.patch("posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session", FakeSession),
):
with pytest.raises(IntermittentUploadPartTimeoutError):
# we expect this to raise an exception
await activity_environment.run(insert_into_s3_activity, insert_inputs)

assert len(heartbeat_details) > 0

detail = heartbeat_details[-1]

# we expect to have only uploaded part 1 of first file
assert detail.files_uploaded == 0
assert detail.upload_state is not None
assert detail.upload_state.upload_id is not None
assert len(detail.upload_state.parts) == 1

assert len(detail.done_ranges) == 1

# now we resume from the heartbeat
previous_info = asdict(activity_environment.info)
previous_info["heartbeat_details"] = detail.serialize_details()
await minio_client.complete_multipart_upload(
Bucket=bucket_name,
Key=get_s3_key(insert_inputs),
UploadId=detail.upload_state.upload_id,
MultipartUpload={"Parts": detail.upload_state.parts},
)

new_info = activity.Info(
**previous_info,
)
activity_environment.info = new_info
with override_settings(BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=1, CLICKHOUSE_MAX_BLOCK_SIZE_DEFAULT=1):
await activity_environment.run(insert_into_s3_activity, insert_inputs)

assert len(heartbeat_details) > 0
detail = heartbeat_details[-1]
# we expect to have uploaded the file now
assert detail.files_uploaded == 1
assert detail.upload_state is None
assert len(detail.done_ranges) == 1
assert detail.done_ranges[0] == (data_interval_start, data_interval_end)

await assert_clickhouse_records_in_s3(
s3_compatible_client=minio_client,
clickhouse_client=clickhouse_client,
bucket_name=bucket_name,
key_prefix=s3_key_prefix,
team_id=ateam.pk,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
# When we resume from a heartbeat, we expect duplicates (the last done range will be re-exported)
allow_duplicates=True,
)


async def test_s3_multi_part_upload_raises_retryable_exception(bucket_name, s3_key_prefix):
"""Test a retryable exception is raised instead of a `RequestTimeout`.

Expand Down