Skip to content

Commit

Permalink
Remove content mutability / sync-time data fixups
Browse files Browse the repository at this point in the history
closes pulp#3234
  • Loading branch information
dralley committed Aug 28, 2023
1 parent 909ced6 commit 748b3dd
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 133 deletions.
1 change: 1 addition & 0 deletions CHANGES/3234.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Removed all content mutability in favor of moving towards immutable content. The data fixups which were the original purpose are less necessary now than they initially were, and in the future we probably ought to handle such issues via replacements of immutable content rather than mutating existing content.
135 changes: 2 additions & 133 deletions pulp_rpm/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@
RemoteArtifactSaver,
Stage,
QueryExistingArtifacts,
QueryExistingContents,
)
from pulpcore.plugin.sync import sync_to_async_iterable

from pulp_rpm.app.advisory import hash_update_record
from pulp_rpm.app.constants import (
CHECKSUM_TYPES,
Expand Down Expand Up @@ -625,7 +624,7 @@ def pipeline_stages(self, new_version):
[
ArtifactDownloader(),
ArtifactSaver(),
RpmQueryExistingContents(),
QueryExistingContents(),
RpmContentSaver(),
RpmInterrelateContent(),
RemoteArtifactSaver(fix_mismatched_remote_artifacts=True),
Expand Down Expand Up @@ -1548,133 +1547,3 @@ def _handle_distribution_tree(declarative_content):

if update_references_to_save:
UpdateReference.objects.bulk_create(update_references_to_save, ignore_conflicts=True)


class RpmQueryExistingContents(Stage):
"""
Checks if DeclarativeContent objects already exist and replaces them in the stream if so.
This was largely copied from pulpcore.plugin.stages.content.QueryExistingContents but with
some customizations - see the original docstring for more details. The goal of the
customization is to address any issues where data may not saved properly in the past,
e.g. https://github.com/pulp/pulp_rpm/issues/2643.
Unlike the base-pulpcore workflow, this stage needs to change/fix incoming Content,
**and** make sure all existing-content is touch()'d as part of this sync. This forces
the code to care about ordering, to care about changing-fields, and to do the touch() as
the last thing it does.
Because pulp_rpm needs to do qualitatively-different things here than what the core-version
does/cares about, maintaining compatibility is going to take effort.
Fixes can be added or removed over time as necessary.
"""

async def run(self):
"""
The coroutine for this stage.
Returns:
The coroutine for this stage.
"""
async for batch in self.batches():
content_q_by_type = defaultdict(lambda: Q(pk__in=[]))
d_content_by_nat_key = defaultdict(list)
for d_content in batch:
if d_content.content._state.adding:
model_type = type(d_content.content)
unit_q = d_content.content.q()
content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q
d_content_by_nat_key[d_content.content.natural_key()].append(d_content)

# For each entry in the batch, determine if we need to "fix up" the data before
# letting it pass down the pipeline.
#
# NOTE that we "remember" entities we've touched, and **which fields** we've touched,
# in order to persist **just the changes we make** below. This helps us avoid some
# nasty deadlock-windows between save() and touch() in this stage.
#
# IF/WHEN/AS YOU ADD TO THIS CODE - make sure you update modified_results_by_pk and
# modified_fields_by_pk, or your changes WILL NOT BE PERSISTED.

# holds the to-be-saved results by-pk
modified_results_by_pk = {}
# per to-be-saved pk, holds a list of field-names of the things we've changed'
modified_fields_by_pk = defaultdict(list)
query_types = content_q_by_type.keys()
for model_type in query_types:
async for result in sync_to_async_iterable(
model_type.objects.filter(content_q_by_type[model_type]).iterator()
):
for d_content in d_content_by_nat_key[result.natural_key()]:
# ============ The below lines are added vs. pulpcore ============
if model_type == Modulemd:
# Fix saved snippet if malformed in DB, covers #2735
if result.snippet != d_content.content.snippet:
result.snippet = d_content.content.snippet
modified_results_by_pk[result.pk] = result
modified_fields_by_pk[result.pk].append("snippet")

if model_type == ModulemdDefaults:
# Fix saved snippet if malformed in DB, covers #2735
if result.snippet != d_content.content.snippet:
result.snippet = d_content.content.snippet
modified_results_by_pk[result.pk] = result
modified_fields_by_pk[result.pk].append("snippet")

if model_type == Package:
# changelogs coming out of the database are list[list],
# coming from the stage are list[tuple]
normalized_result_changelogs = [tuple(ch) for ch in result.changelogs]
incorrect_changelogs = (
normalized_result_changelogs != d_content.content.changelogs
)
incorrect_modular_relation = (
not result.is_modular and d_content.content.is_modular
)

if incorrect_changelogs:
# Covers a class of issues with changelog data on the CDN
result.changelogs = d_content.content.changelogs
modified_fields_by_pk[result.pk].append("changelogs")
if incorrect_modular_relation:
# Covers #2643
result.is_modular = True
modified_fields_by_pk[result.pk].append("is_modular")
duplicated_files = len(result.files) != len(d_content.content.files)
if duplicated_files:
d_content.content.files = result.files
if incorrect_changelogs or incorrect_modular_relation:
log.debug("Updated data for package {}".format(result.nevra))
modified_results_by_pk[result.pk] = result
# ==================================================================

# THIS is the "Important Part" that the pulpcore code is doing
d_content.content = result

# Save results in guaranteed-pid-order
# NOTE: we are saving **only** the fields we've changed to avoid a deadlock-collision
# with the touch() call that ends the stage, below.
modified_rslts_pks = sorted(modified_results_by_pk.keys())
for pk in modified_rslts_pks:
await sync_to_async(modified_results_by_pk[pk].save)(
update_fields=modified_fields_by_pk[pk]
)

# touch in order to mark "last seen time" for the batch
# Also part of the pulpcore class - but it happens in a different order here,
# to avoid the ordering/deadlock problems introduced by the individual save()
for query_type in query_types:
try:
await sync_to_async(
query_type.objects.filter(content_q_by_type[query_type]).touch
)()
except AttributeError:
raise TypeError(
"Plugins which declare custom ORM managers on their content classes "
"should have those managers inherit from "
"pulpcore.plugin.models.ContentManager."
)

for d_content in batch:
await self.put(d_content)

0 comments on commit 748b3dd

Please sign in to comment.