diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index 16aa67d65ddd5..e5b2562437d97 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -206,6 +206,13 @@ def __init__(self, message: str = "Endpoint URL is invalid."): super().__init__(message) +class UploadAlreadyFinishedError(Exception): + """Exception raised when trying to resume an upload that already finished.""" + + def __init__(self): + super().__init__("Attempted to resume a multipart upload that has already completed or aborted.") + + Part = dict[str, str | int] @@ -317,16 +324,33 @@ async def start(self) -> str: return upload_id - def continue_from_state(self, state: S3MultiPartUploadState): + async def continue_from_state(self, state: S3MultiPartUploadState): """Continue this S3MultiPartUpload from a previous state. This method is intended to be used with the state found in an Activity heartbeat. """ + is_multipart_upload_active = await self.is_multipart_upload_active(state.upload_id) + + if not is_multipart_upload_active: + raise UploadAlreadyFinishedError() + self.upload_id = state.upload_id self.parts = state.parts return self.upload_id + async def is_multipart_upload_active(self, upload_id: str) -> bool: + """Check if provided `upload_id` corresponds to an active multipart upload. + + If this method returns `False`, it indicates the `upload_id` corresponds to + a multipart upload that has already completed or aborted. + """ + async with self.s3_client() as s3_client: + response = await s3_client.list_multipart_uploads(Bucket=self.bucket_name, Prefix=self.key) + + active_uploads = {upload["UploadId"] for upload in response.get("Uploads", [])} + return upload_id in active_uploads + async def complete(self) -> str: if self.is_upload_in_progress() is False: raise NoUploadInProgressError() @@ -614,7 +638,12 @@ async def initialize_and_resume_multipart_upload( s3_upload = initialize_upload(inputs, file_number) if details.upload_state: - s3_upload.continue_from_state(details.upload_state) + try: + await s3_upload.continue_from_state(details.upload_state) + except UploadAlreadyFinishedError: + await logger.awarning( + f"Attempted resume already finished upload. Upload '{details.upload_state.upload_id}' will be ignored" + ) if inputs.compression == "brotli": # Even if we receive details we cannot resume a brotli compressed upload as diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index bc840f8f7f134..1e1346f60b6aa 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -1901,6 +1901,143 @@ def track_hearbeat_details(*details): ) +async def test_insert_into_s3_activity_ignores_completed_upload_id( + clickhouse_client, ateam, bucket_name, s3_batch_export, minio_client, activity_environment, s3_key_prefix +): + """Test that if the insert_into_s3_activity activity fails, it will ignore completed uploads.""" + data_interval_end = dt.datetime.fromisoformat("2023-04-20T14:30:00.000000+00:00") + data_interval_start = data_interval_end - s3_batch_export.interval_time_delta + + n_expected_parts = 3 + + for i in range(1, n_expected_parts + 1): + part_inserted_at = data_interval_end - s3_batch_export.interval_time_delta / i + + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + start_time=data_interval_start, + end_time=data_interval_end, + count=1, + count_outside_range=0, + count_other_team=0, + duplicate=False, + # We need at least 5MB for a multi-part upload which is what we are testing. + properties={"$chonky": ("a" * 5 * 2048**2)}, + inserted_at=part_inserted_at, + ) + + attempt = 0 + + class FakeSession(aioboto3.Session): + @contextlib.asynccontextmanager + async def client(self, *args, **kwargs): + client = self._session.create_client(*args, **kwargs) + + async with client as client: + original_upload_part = client.upload_part + + async def faulty_upload_part(*args, **kwargs): + nonlocal attempt + + attempt = attempt + 1 + + if attempt >= 2: + raise botocore.exceptions.ClientError( + error_response={ + "Error": {"Code": "RequestTimeout", "Message": "Oh no!"}, + "ResponseMetadata": {"MaxAttemptsReached": True, "RetryAttempts": 2}, # type: ignore + }, + operation_name="UploadPart", + ) + else: + return await original_upload_part(*args, **kwargs) + + client.upload_part = faulty_upload_part + + yield client + + heartbeat_details: list[S3HeartbeatDetails] = [] + + def track_hearbeat_details(*details): + """Record heartbeat details received.""" + nonlocal heartbeat_details + + s3_details = S3HeartbeatDetails.from_activity_details(details) + heartbeat_details.append(s3_details) + + activity_environment.on_heartbeat = track_hearbeat_details + + insert_inputs = S3InsertInputs( + bucket_name=bucket_name, + region="us-east-1", + prefix=s3_key_prefix, + team_id=ateam.pk, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + aws_access_key_id="object_storage_root_user", + aws_secret_access_key="object_storage_root_password", + endpoint_url=settings.OBJECT_STORAGE_ENDPOINT, + ) + + with ( + override_settings(BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=1, CLICKHOUSE_MAX_BLOCK_SIZE_DEFAULT=1), + mock.patch("posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session", FakeSession), + ): + with pytest.raises(IntermittentUploadPartTimeoutError): + # we expect this to raise an exception + await activity_environment.run(insert_into_s3_activity, insert_inputs) + + assert len(heartbeat_details) > 0 + + detail = heartbeat_details[-1] + + # we expect to have only uploaded part 1 of first file + assert detail.files_uploaded == 0 + assert detail.upload_state is not None + assert detail.upload_state.upload_id is not None + assert len(detail.upload_state.parts) == 1 + + assert len(detail.done_ranges) == 1 + + # now we resume from the heartbeat + previous_info = asdict(activity_environment.info) + previous_info["heartbeat_details"] = detail.serialize_details() + await minio_client.complete_multipart_upload( + Bucket=bucket_name, + Key=get_s3_key(insert_inputs), + UploadId=detail.upload_state.upload_id, + MultipartUpload={"Parts": detail.upload_state.parts}, + ) + + new_info = activity.Info( + **previous_info, + ) + activity_environment.info = new_info + with override_settings(BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=1, CLICKHOUSE_MAX_BLOCK_SIZE_DEFAULT=1): + await activity_environment.run(insert_into_s3_activity, insert_inputs) + + assert len(heartbeat_details) > 0 + detail = heartbeat_details[-1] + # we expect to have uploaded the file now + assert detail.files_uploaded == 1 + assert detail.upload_state is None + assert len(detail.done_ranges) == 1 + assert detail.done_ranges[0] == (data_interval_start, data_interval_end) + + await assert_clickhouse_records_in_s3( + s3_compatible_client=minio_client, + clickhouse_client=clickhouse_client, + bucket_name=bucket_name, + key_prefix=s3_key_prefix, + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + # When we resume from a heartbeat, we expect duplicates (the last done range will be re-exported) + allow_duplicates=True, + ) + + async def test_s3_multi_part_upload_raises_retryable_exception(bucket_name, s3_key_prefix): """Test a retryable exception is raised instead of a `RequestTimeout`.