Skip to content

Commit

Permalink
Merge pull request #246 from sennetconsortium/tjmadonna/244-bulk-upda…
Browse files Browse the repository at this point in the history
…te-size

Moving bulk_update_size to search config
  • Loading branch information
maxsibilla authored Oct 10, 2024
2 parents 6be7c31 + 725c835 commit c8236c1
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
5 changes: 4 additions & 1 deletion src/instance/search-config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
# default index name for endpoints that don't specify an index
default_index: entities

# used for bulk updates to elastic search (default is 50)
bulk_update_size: 50

# specify multiple indices
indices:
entities:
Expand All @@ -23,4 +26,4 @@ indices:
private: sn_dev_consortium_files
elasticsearch:
url: <ES URL>
mappings: "sennet_translation/file-default-config.yaml"
mappings: "sennet_translation/file-default-config.yaml"
9 changes: 5 additions & 4 deletions src/sennet_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import requests
from atlas_consortia_commons.object import enum_val
from atlas_consortia_commons.string import equals
from atlas_consortia_commons.ubkg import initialize_ubkg
from flask import Flask, Response
from hubmap_commons.hm_auth import AuthHelper # HuBMAP commons
Expand Down Expand Up @@ -50,7 +49,6 @@ class Translator(TranslatorInterface):
INDICES = {}
TRANSFORMERS = {} # Not used in SenNet
DEFAULT_ENTITY_API_URL = ""
BULK_UPDATE_SIZE = 50

failed_entity_api_calls = []
failed_entity_ids = []
Expand Down Expand Up @@ -113,6 +111,8 @@ def __init__(self, indices, app_client_id, app_client_secret, token, ubkg_instan
# Add index_version by parsing the VERSION file
self.index_version = ((Path(__file__).absolute().parent.parent / "VERSION").read_text()).strip()

self.bulk_update_size = indices.get("bulk_update_size", 50)

# Public methods

def init_auth_helper(self):
Expand Down Expand Up @@ -258,7 +258,7 @@ def translate_all(self):
failure_results[index.public] = []

all_entities_uuids = list(all_entities_uuids)
n = self.BULK_UPDATE_SIZE
n = self.bulk_update_size
batched_uuids = [all_entities_uuids[i:i + n] for i in range(0, len(all_entities_uuids), n)]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
Expand Down Expand Up @@ -515,7 +515,8 @@ def _upsert_index(self, entity_ids: list[str], index: Index, session: requests.S
pass

# Send bulk update when the batch size is reached
if len(priv_entities) >= self.BULK_UPDATE_SIZE or len(pub_entities) >= self.BULK_UPDATE_SIZE:
max_size = self.bulk_update_size
if len(priv_entities) >= max_size or len(pub_entities) >= max_size:
priv_update = BulkUpdate(upserts=priv_entities)
pub_update = BulkUpdate(upserts=pub_entities)

Expand Down

0 comments on commit c8236c1

Please sign in to comment.