Skip to content

Commit

Permalink
fix bug on job cache
Browse files Browse the repository at this point in the history
initialize to empty map to prevent errors when updated from multiple threads
  • Loading branch information
arikalon1 committed May 21, 2024
1 parent 4494f3a commit b8b72f2
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions src/robusta/core/sinks/robusta/robusta_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry):
self.__services_cache: Dict[str, ServiceInfo] = {}
self.__nodes_cache: Dict[str, NodeInfo] = {}
self.__namespaces_cache: Dict[str, NamespaceInfo] = {}
# Some clusters have no jobs. Initializing jobs cache to None, and not empty dict
# helps differentiate between no jobs, to not initialized
self.__jobs_cache: Optional[Dict[str, JobInfo]] = None
self.__jobs_cache: Optional[Dict[str, JobInfo]] = {}
# Some clusters have no jobs. helps differentiate between no jobs, to not initialized
self.__jobs_cache_initialized: bool = False
self.__helm_releases_cache: Optional[Dict[str, HelmRelease]] = None
self.__init_service_resolver()
self.__thread = threading.Thread(target=self.__discover_cluster)
Expand Down Expand Up @@ -158,8 +158,9 @@ def __assert_node_cache_initialized(self):
self.__nodes_cache[node.name] = node

def __assert_jobs_cache_initialized(self):
if self.__jobs_cache is None:
if not self.__jobs_cache_initialized:
logging.info("Initializing jobs cache")
self.__jobs_cache_initialized = True
self.__jobs_cache: Dict[str, JobInfo] = {}
for job in self.dal.get_active_jobs():
self.__jobs_cache[job.get_service_key()] = job
Expand All @@ -179,7 +180,8 @@ def __assert_namespaces_cache_initialized(self):
def __reset_caches(self):
self.__services_cache: Dict[str, ServiceInfo] = {}
self.__nodes_cache: Dict[str, NodeInfo] = {}
self.__jobs_cache = None
self.__jobs_cache: Dict[str, JobInfo] = {}
self.__jobs_cache_initialized = False
self.__helm_releases_cache = None
self.__namespaces_cache: Dict[str, NamespaceInfo] = {}
self.__pods_running_count = 0
Expand Down

0 comments on commit b8b72f2

Please sign in to comment.