Skip to content

Commit

Permalink
Fixing duplicate reindex calls in translate all
Browse files Browse the repository at this point in the history
  • Loading branch information
tjmadonna committed Jul 31, 2024
1 parent f8792a6 commit 79ca788
Showing 1 changed file with 27 additions and 24 deletions.
51 changes: 27 additions & 24 deletions src/sennet_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def translate_all(self):
start = time.time()
delete_failure_results = {}

# Make calls to entity-api to get a list of uuids for entity types
source_uuids_list = self._call_entity_api(
session=session,
endpoint_base="source",
Expand All @@ -204,32 +205,30 @@ def translate_all(self):
endpoint_suffix="entities",
url_property="uuid"
)
sample_uuids_list = self._call_entity_api(
session=session,
endpoint_base="sample",
endpoint_suffix="entities",
url_property="uuid"
)
dataset_uuids_list = self._call_entity_api(
session=session,
endpoint_base="dataset",
endpoint_suffix="entities",
url_property="uuid"
)

# Merge into a big list that with no duplicates
all_entities_uuids = set(
source_uuids_list
+ sample_uuids_list
+ dataset_uuids_list
+ upload_uuids_list
+ collection_uuids_list
)

# Only need this comparison for the live /reindex-all PUT call
if not self.skip_comparison:
# Make calls to entity-api to get a list of uuids for rest of entity types
sample_uuids_list = self._call_entity_api(
session=session,
endpoint_base="sample",
endpoint_suffix="entities",
url_property="uuid"
)
dataset_uuids_list = self._call_entity_api(
session=session,
endpoint_base="dataset",
endpoint_suffix="entities",
url_property="uuid"
)

# Merge into a big list that with no duplicates
all_entities_uuids = set(
source_uuids_list
+ sample_uuids_list
+ dataset_uuids_list
+ upload_uuids_list
+ collection_uuids_list
)

es_uuids = set()
index_names = get_all_reindex_enabled_indice_names(self.INDICES)

Expand All @@ -250,6 +249,9 @@ def translate_all(self):
failures = self._bulk_update(update, index.public, index.url)
delete_failure_results[index.public] = failures

# No need to update the entities that were just deleted
all_entities_uuids = all_entities_uuids - uuids_to_delete

failure_results = {}
for index in self.index_config:
failure_results[index.private] = []
Expand All @@ -261,7 +263,8 @@ def translate_all(self):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for index in self.index_config:
futures.extend([executor.submit(self._upsert_index, uuids, index, session) for uuids in batched_uuids])
for uuids in batched_uuids:
futures.append(executor.submit(self._upsert_index, uuids, index, session))

for f in concurrent.futures.as_completed(futures):
failures = f.result()
Expand Down

0 comments on commit 79ca788

Please sign in to comment.