Skip to content

Commit

Permalink
Merge pull request #161 from dmwm/update_osearch
Browse files Browse the repository at this point in the history
Update osearch
  • Loading branch information
nikodemas authored Jul 19, 2024
2 parents b341085 + fc43260 commit 23ac8ff
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
6 changes: 3 additions & 3 deletions src/python/CMSSpark/osearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Requirements:
- OpenSearch cluster host name.
- Secret file one line of 'username:password' of the OpenSearch. Ask to CMS Monitoring team.
- Index mapping schema and settings, see get_index_schema() below.
- Index template starting with 'test-' if you want to send to es-cms cluster "test" tenant
- Index template starting with 'test-' if you want to send to os-cms cluster "test" tenant
- `index_mod`: Let's assume you've an index_template of "test-foo". Depending on your input and provided timestamp, data
will be sent to below indexes:
- index_mod="": `test-foo`
Expand Down Expand Up @@ -44,7 +44,7 @@ def get_index_schema():


_index_template = 'test-foo'
client = osearch.get_es_client("es-cms1.cern.ch/es", 'secret_opensearch.txt', get_index_schema())
client = osearch.get_es_client("os-cms.cern.ch/os", 'secret_opensearch.txt', get_index_schema())

# index_mod="": 'test-foo', index_mod="Y": 'test-foo-YYYY', index_mod="M": 'test-foo-YYYY-MM', index_mod="D": 'test-foo-YYYY-MM-DD',
idx = client.get_or_create_index(timestamp=time.time(), index_template=_index_template, index_mod="")
Expand All @@ -65,7 +65,7 @@ from CMSSpark.osearch import osearch
for part in df.rdd.mapPartitions().toLocalIterator():
# You can define below calls in a function for re-usability
_index_template = 'test-foo'
client = osearch.get_es_client("es-cms1.cern.ch/es", 'secret_opensearch.txt', get_index_schema())
client = osearch.get_es_client("os-cms.cern.ch/os", 'secret_opensearch.txt', get_index_schema())
print(f"Length of partition: {len(part)}")
idx = client.get_or_create_index(timestamp=part[0]['timestamp'], index_template=_index_template,
index_mod="D") # sends to daily index
Expand Down
30 changes: 15 additions & 15 deletions src/python/CMSSpark/osearch/osearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_index_schema():
}
_index_template = 'test-foo'
client = osearch.get_es_client("es-cms1.cern.ch/es", 'secret_opensearch.txt', get_index_schema())
client = osearch.get_es_client("os-cms.cern.ch/os", 'secret_opensearch.txt', get_index_schema())
# index_mod="": 'test-foo', index_mod="Y": 'test-foo-YYYY', index_mod="M": 'test-foo-YYYY-MM', index_mod="D": 'test-foo-YYYY-MM-DD',
idx = client.get_or_create_index(timestamp=time.time(), index_template=_index_template, index_mod="")
Expand All @@ -41,7 +41,7 @@ def get_index_schema():
# ------------------------------- Big Spark dataframe -------------------------------------
_index_template = 'test-foo'
client = osearch.get_es_client("es-cms1.cern.ch/es", 'secret_opensearch.txt', get_index_schema())
client = osearch.get_es_client("os-cms.cern.ch/os", 'secret_opensearch.txt', get_index_schema())
for part in df.rdd.mapPartitions().toLocalIterator():
print(f"Length of partition: {len(part)}")
# You can define below calls in a function for reusability
Expand All @@ -58,14 +58,11 @@ def get_index_schema():
from collections import Counter as collectionsCounter
from datetime import datetime

from opensearchpy import OpenSearch
from opensearchpy import OpenSearch, exceptions

# Global OpenSearch connection client
_opensearch_client = None

# Global index cache, keep tracks of indices that are already created with mapping in the OpenSearch instance
_index_cache = set()


def get_es_client(host, secret_file, index_mapping_and_settings):
"""Creates OpenSearch client
Expand Down Expand Up @@ -126,12 +123,9 @@ def get_or_create_index(self, timestamp, index_template, index_mod=""):
empty string uses single index as "index_template"
Returns yearly/monthly/daily index string depending on the index_mode and creates it if it does not exist.
- It checks if index mapping is already created by checking _index_cache set.
- And returns from _index_cache set if index exists
- It checks if index already exists and returns its name.
- Else, it creates the index with mapping which happens in the first batch of the month ideally.
"""
global _index_cache

timestamp = int(timestamp)
if index_mod.upper() == "Y":
idx = time.strftime("%s-%%Y" % index_template, datetime.utcfromtimestamp(timestamp).timetuple())
Expand All @@ -141,12 +135,18 @@ def get_or_create_index(self, timestamp, index_template, index_mod=""):
idx = time.strftime("%s-%%Y-%%m-%%d" % index_template, datetime.utcfromtimestamp(timestamp).timetuple())
else:
idx = index_template

if idx in _index_cache:

try:
self.handle.indices.get(idx)
logging.info(f"Index found: {idx}")
return idx
except exceptions.NotFoundError:
logging.info(f"Index {idx} doesn't exist, creating new index")
get_es_client(self.host, self.secret_file, self.index_mapping_and_settings).make_mapping(idx=idx)
return idx
get_es_client(self.host, self.secret_file, self.index_mapping_and_settings).make_mapping(idx=idx)
_index_cache.add(idx)
return idx
except Exception as e:
logging.error(f"Couldn't get or create index: {e}")
return None

@staticmethod
def parse_errors(result):
Expand Down

0 comments on commit 23ac8ff

Please sign in to comment.