From 748b3dde057bfc90623e186842b14538a0162049 Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Wed, 23 Aug 2023 19:47:32 -0400 Subject: [PATCH] Remove content mutability / sync-time data fixups closes #3234 --- CHANGES/3234.misc | 1 + pulp_rpm/app/tasks/synchronizing.py | 135 +--------------------------- 2 files changed, 3 insertions(+), 133 deletions(-) create mode 100644 CHANGES/3234.misc diff --git a/CHANGES/3234.misc b/CHANGES/3234.misc new file mode 100644 index 000000000..e4f66fd1a --- /dev/null +++ b/CHANGES/3234.misc @@ -0,0 +1 @@ +Removed all content mutability in favor of moving towards immutable content. The data fixups which were the original purpose are less necessary now than they initially were, and in the future we probably ought to handle such issues via replacements of immutable content rather than mutating existing content. diff --git a/pulp_rpm/app/tasks/synchronizing.py b/pulp_rpm/app/tasks/synchronizing.py index aeebe24b6..3375f9c69 100644 --- a/pulp_rpm/app/tasks/synchronizing.py +++ b/pulp_rpm/app/tasks/synchronizing.py @@ -45,9 +45,8 @@ RemoteArtifactSaver, Stage, QueryExistingArtifacts, + QueryExistingContents, ) -from pulpcore.plugin.sync import sync_to_async_iterable - from pulp_rpm.app.advisory import hash_update_record from pulp_rpm.app.constants import ( CHECKSUM_TYPES, @@ -625,7 +624,7 @@ def pipeline_stages(self, new_version): [ ArtifactDownloader(), ArtifactSaver(), - RpmQueryExistingContents(), + QueryExistingContents(), RpmContentSaver(), RpmInterrelateContent(), RemoteArtifactSaver(fix_mismatched_remote_artifacts=True), @@ -1548,133 +1547,3 @@ def _handle_distribution_tree(declarative_content): if update_references_to_save: UpdateReference.objects.bulk_create(update_references_to_save, ignore_conflicts=True) - - -class RpmQueryExistingContents(Stage): - """ - Checks if DeclarativeContent objects already exist and replaces them in the stream if so. - - This was largely copied from pulpcore.plugin.stages.content.QueryExistingContents but with - some customizations - see the original docstring for more details. The goal of the - customization is to address any issues where data may not saved properly in the past, - e.g. https://github.com/pulp/pulp_rpm/issues/2643. - - Unlike the base-pulpcore workflow, this stage needs to change/fix incoming Content, - **and** make sure all existing-content is touch()'d as part of this sync. This forces - the code to care about ordering, to care about changing-fields, and to do the touch() as - the last thing it does. - - Because pulp_rpm needs to do qualitatively-different things here than what the core-version - does/cares about, maintaining compatibility is going to take effort. - - Fixes can be added or removed over time as necessary. - """ - - async def run(self): - """ - The coroutine for this stage. - - Returns: - The coroutine for this stage. - """ - async for batch in self.batches(): - content_q_by_type = defaultdict(lambda: Q(pk__in=[])) - d_content_by_nat_key = defaultdict(list) - for d_content in batch: - if d_content.content._state.adding: - model_type = type(d_content.content) - unit_q = d_content.content.q() - content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q - d_content_by_nat_key[d_content.content.natural_key()].append(d_content) - - # For each entry in the batch, determine if we need to "fix up" the data before - # letting it pass down the pipeline. - # - # NOTE that we "remember" entities we've touched, and **which fields** we've touched, - # in order to persist **just the changes we make** below. This helps us avoid some - # nasty deadlock-windows between save() and touch() in this stage. - # - # IF/WHEN/AS YOU ADD TO THIS CODE - make sure you update modified_results_by_pk and - # modified_fields_by_pk, or your changes WILL NOT BE PERSISTED. - - # holds the to-be-saved results by-pk - modified_results_by_pk = {} - # per to-be-saved pk, holds a list of field-names of the things we've changed' - modified_fields_by_pk = defaultdict(list) - query_types = content_q_by_type.keys() - for model_type in query_types: - async for result in sync_to_async_iterable( - model_type.objects.filter(content_q_by_type[model_type]).iterator() - ): - for d_content in d_content_by_nat_key[result.natural_key()]: - # ============ The below lines are added vs. pulpcore ============ - if model_type == Modulemd: - # Fix saved snippet if malformed in DB, covers #2735 - if result.snippet != d_content.content.snippet: - result.snippet = d_content.content.snippet - modified_results_by_pk[result.pk] = result - modified_fields_by_pk[result.pk].append("snippet") - - if model_type == ModulemdDefaults: - # Fix saved snippet if malformed in DB, covers #2735 - if result.snippet != d_content.content.snippet: - result.snippet = d_content.content.snippet - modified_results_by_pk[result.pk] = result - modified_fields_by_pk[result.pk].append("snippet") - - if model_type == Package: - # changelogs coming out of the database are list[list], - # coming from the stage are list[tuple] - normalized_result_changelogs = [tuple(ch) for ch in result.changelogs] - incorrect_changelogs = ( - normalized_result_changelogs != d_content.content.changelogs - ) - incorrect_modular_relation = ( - not result.is_modular and d_content.content.is_modular - ) - - if incorrect_changelogs: - # Covers a class of issues with changelog data on the CDN - result.changelogs = d_content.content.changelogs - modified_fields_by_pk[result.pk].append("changelogs") - if incorrect_modular_relation: - # Covers #2643 - result.is_modular = True - modified_fields_by_pk[result.pk].append("is_modular") - duplicated_files = len(result.files) != len(d_content.content.files) - if duplicated_files: - d_content.content.files = result.files - if incorrect_changelogs or incorrect_modular_relation: - log.debug("Updated data for package {}".format(result.nevra)) - modified_results_by_pk[result.pk] = result - # ================================================================== - - # THIS is the "Important Part" that the pulpcore code is doing - d_content.content = result - - # Save results in guaranteed-pid-order - # NOTE: we are saving **only** the fields we've changed to avoid a deadlock-collision - # with the touch() call that ends the stage, below. - modified_rslts_pks = sorted(modified_results_by_pk.keys()) - for pk in modified_rslts_pks: - await sync_to_async(modified_results_by_pk[pk].save)( - update_fields=modified_fields_by_pk[pk] - ) - - # touch in order to mark "last seen time" for the batch - # Also part of the pulpcore class - but it happens in a different order here, - # to avoid the ordering/deadlock problems introduced by the individual save() - for query_type in query_types: - try: - await sync_to_async( - query_type.objects.filter(content_q_by_type[query_type]).touch - )() - except AttributeError: - raise TypeError( - "Plugins which declare custom ORM managers on their content classes " - "should have those managers inherit from " - "pulpcore.plugin.models.ContentManager." - ) - - for d_content in batch: - await self.put(d_content)