From a5c8073b2084fb6bd2c4ea4013d2c83504e0cdc6 Mon Sep 17 00:00:00 2001 From: nikodemas <47255905+nikodemas@users.noreply.github.com> Date: Fri, 12 Jul 2024 16:59:11 +0200 Subject: [PATCH 1/3] Update osearch.py --- src/python/CMSSpark/osearch/osearch.py | 30 +++++++++++++------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/python/CMSSpark/osearch/osearch.py b/src/python/CMSSpark/osearch/osearch.py index 186595e..4417068 100644 --- a/src/python/CMSSpark/osearch/osearch.py +++ b/src/python/CMSSpark/osearch/osearch.py @@ -32,7 +32,7 @@ def get_index_schema(): } _index_template = 'test-foo' - client = osearch.get_es_client("es-cms1.cern.ch/es", 'secret_opensearch.txt', get_index_schema()) + client = osearch.get_es_client("os-cms.cern.ch/os", 'secret_opensearch.txt', get_index_schema()) # index_mod="": 'test-foo', index_mod="Y": 'test-foo-YYYY', index_mod="M": 'test-foo-YYYY-MM', index_mod="D": 'test-foo-YYYY-MM-DD', idx = client.get_or_create_index(timestamp=time.time(), index_template=_index_template, index_mod="") @@ -41,7 +41,7 @@ def get_index_schema(): # ------------------------------- Big Spark dataframe ------------------------------------- _index_template = 'test-foo' - client = osearch.get_es_client("es-cms1.cern.ch/es", 'secret_opensearch.txt', get_index_schema()) + client = osearch.get_es_client("os-cms.cern.ch/es", 'secret_opensearch.txt', get_index_schema()) for part in df.rdd.mapPartitions().toLocalIterator(): print(f"Length of partition: {len(part)}") # You can define below calls in a function for reusability @@ -58,14 +58,11 @@ def get_index_schema(): from collections import Counter as collectionsCounter from datetime import datetime -from opensearchpy import OpenSearch +from opensearchpy import OpenSearch, exceptions # Global OpenSearch connection client _opensearch_client = None -# Global index cache, keep tracks of indices that are already created with mapping in the OpenSearch instance -_index_cache = set() - def get_es_client(host, secret_file, index_mapping_and_settings): """Creates OpenSearch client @@ -126,12 +123,9 @@ def get_or_create_index(self, timestamp, index_template, index_mod=""): empty string uses single index as "index_template" Returns yearly/monthly/daily index string depending on the index_mode and creates it if it does not exist. - - It checks if index mapping is already created by checking _index_cache set. - - And returns from _index_cache set if index exists + - It checks if index already exists and returns its name. - Else, it creates the index with mapping which happens in the first batch of the month ideally. """ - global _index_cache - timestamp = int(timestamp) if index_mod.upper() == "Y": idx = time.strftime("%s-%%Y" % index_template, datetime.utcfromtimestamp(timestamp).timetuple()) @@ -141,12 +135,18 @@ def get_or_create_index(self, timestamp, index_template, index_mod=""): idx = time.strftime("%s-%%Y-%%m-%%d" % index_template, datetime.utcfromtimestamp(timestamp).timetuple()) else: idx = index_template - - if idx in _index_cache: + + try: + self.handle.indices.get(idx) + logging.info(f"Index found: {idx}") + return idx + except exceptions.NotFoundError: + logging.info(f"Index {idx} doesn't exist, creating new index") + get_es_client(self.host, self.secret_file, self.index_mapping_and_settings).make_mapping(idx=idx) return idx - get_es_client(self.host, self.secret_file, self.index_mapping_and_settings).make_mapping(idx=idx) - _index_cache.add(idx) - return idx + except Exception as e: + logging.error(f"Couldn't get or create index: {e}") + return None @staticmethod def parse_errors(result): From 573000dc241e63c48bdfde90cc1bc2db5f3083f0 Mon Sep 17 00:00:00 2001 From: nikodemas <47255905+nikodemas@users.noreply.github.com> Date: Fri, 12 Jul 2024 17:02:00 +0200 Subject: [PATCH 2/3] Update README.md --- src/python/CMSSpark/osearch/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/python/CMSSpark/osearch/README.md b/src/python/CMSSpark/osearch/README.md index e35911a..cfd50c2 100644 --- a/src/python/CMSSpark/osearch/README.md +++ b/src/python/CMSSpark/osearch/README.md @@ -6,7 +6,7 @@ Requirements: - OpenSearch cluster host name. - Secret file one line of 'username:password' of the OpenSearch. Ask to CMS Monitoring team. - Index mapping schema and settings, see get_index_schema() below. -- Index template starting with 'test-' if you want to send to es-cms cluster "test" tenant +- Index template starting with 'test-' if you want to send to os-cms cluster "test" tenant - `index_mod`: Let's assume you've an index_template of "test-foo". Depending on your input and provided timestamp, data will be sent to below indexes: - index_mod="": `test-foo` @@ -44,7 +44,7 @@ def get_index_schema(): _index_template = 'test-foo' -client = osearch.get_es_client("es-cms1.cern.ch/es", 'secret_opensearch.txt', get_index_schema()) +client = osearch.get_es_client("os-cms.cern.ch/os", 'secret_opensearch.txt', get_index_schema()) # index_mod="": 'test-foo', index_mod="Y": 'test-foo-YYYY', index_mod="M": 'test-foo-YYYY-MM', index_mod="D": 'test-foo-YYYY-MM-DD', idx = client.get_or_create_index(timestamp=time.time(), index_template=_index_template, index_mod="") @@ -65,7 +65,7 @@ from CMSSpark.osearch import osearch for part in df.rdd.mapPartitions().toLocalIterator(): # You can define below calls in a function for re-usability _index_template = 'test-foo' - client = osearch.get_es_client("es-cms1.cern.ch/es", 'secret_opensearch.txt', get_index_schema()) + client = osearch.get_es_client("os-cms.cern.ch/os", 'secret_opensearch.txt', get_index_schema()) print(f"Length of partition: {len(part)}") idx = client.get_or_create_index(timestamp=part[0]['timestamp'], index_template=_index_template, index_mod="D") # sends to daily index From fc4326060a8b8ff99a73ad9e5d8c6f87ecbaf4e5 Mon Sep 17 00:00:00 2001 From: nikodemas <47255905+nikodemas@users.noreply.github.com> Date: Fri, 12 Jul 2024 17:03:07 +0200 Subject: [PATCH 3/3] Update osearch.py --- src/python/CMSSpark/osearch/osearch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/CMSSpark/osearch/osearch.py b/src/python/CMSSpark/osearch/osearch.py index 4417068..c81d61f 100644 --- a/src/python/CMSSpark/osearch/osearch.py +++ b/src/python/CMSSpark/osearch/osearch.py @@ -41,7 +41,7 @@ def get_index_schema(): # ------------------------------- Big Spark dataframe ------------------------------------- _index_template = 'test-foo' - client = osearch.get_es_client("os-cms.cern.ch/es", 'secret_opensearch.txt', get_index_schema()) + client = osearch.get_es_client("os-cms.cern.ch/os", 'secret_opensearch.txt', get_index_schema()) for part in df.rdd.mapPartitions().toLocalIterator(): print(f"Length of partition: {len(part)}") # You can define below calls in a function for reusability