From 1486d35c3d0f95c478515911cbc29765be08707a Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Wed, 1 Jan 2025 10:37:17 -0500 Subject: [PATCH 01/26] Strip out warnings suppressions. --- .../actor/data_file_chunk_handlers.py | 5 +- .../actor/data_file_download_directory.py | 6 +- .../actor/data_file_stream_handler.py | 6 +- .../controlled_message_processor.py | 65 ++++++++++-------- .../controlled_message_reproducer.py | 66 +++++++++++-------- .../kafka_wrapper/openmsistream_consumer.py | 6 +- .../openmsistream_kafka_crypto.py | 12 ++-- .../kafka_wrapper/openmsistream_producer.py | 6 +- openmsistream/kafka_wrapper/serialization.py | 6 +- openmsistream/kafka_wrapper/utilities.py | 6 +- openmsistream/tools/provision_wrapper.py | 8 +-- 11 files changed, 92 insertions(+), 100 deletions(-) diff --git a/openmsistream/data_file_io/actor/data_file_chunk_handlers.py b/openmsistream/data_file_io/actor/data_file_chunk_handlers.py index c3c1cdea..b4187de2 100644 --- a/openmsistream/data_file_io/actor/data_file_chunk_handlers.py +++ b/openmsistream/data_file_io/actor/data_file_chunk_handlers.py @@ -1,13 +1,10 @@ """Anything that receives DataFileChunk messages from a topic and does something with them""" # imports -import warnings from abc import ABC, abstractmethod from threading import Lock -with warnings.catch_warnings(): - warnings.simplefilter("ignore") - from kafkacrypto import KafkaCryptoMessage +from kafkacrypto import KafkaCryptoMessage from openmsitoolbox import LogOwner from ...kafka_wrapper.controlled_message_processor import ControlledMessageProcessor from ...kafka_wrapper.controlled_message_reproducer import ControlledMessageReproducer diff --git a/openmsistream/data_file_io/actor/data_file_download_directory.py b/openmsistream/data_file_io/actor/data_file_download_directory.py index 81fbcae4..ebf5bd4b 100644 --- a/openmsistream/data_file_io/actor/data_file_download_directory.py +++ b/openmsistream/data_file_io/actor/data_file_download_directory.py @@ -4,11 +4,9 @@ """ # imports -import datetime, warnings +import datetime -with warnings.catch_warnings(): - warnings.simplefilter("ignore") - from kafkacrypto.message import KafkaCryptoMessage +from kafkacrypto.message import KafkaCryptoMessage from openmsitoolbox import Runnable from ...utilities import OpenMSIStreamArgumentParser from ...utilities.config import RUN_CONST diff --git a/openmsistream/data_file_io/actor/data_file_stream_handler.py b/openmsistream/data_file_io/actor/data_file_stream_handler.py index 3c91b0b2..f54629ea 100644 --- a/openmsistream/data_file_io/actor/data_file_stream_handler.py +++ b/openmsistream/data_file_io/actor/data_file_stream_handler.py @@ -4,12 +4,10 @@ """ # imports -import pathlib, warnings +import pathlib from abc import ABC -with warnings.catch_warnings(): - warnings.simplefilter("ignore") - from kafkacrypto.message import KafkaCryptoMessage +from kafkacrypto.message import KafkaCryptoMessage from openmsitoolbox import Runnable from openmsitoolbox.utilities.misc import populated_kwargs from ...utilities import OpenMSIStreamArgumentParser diff --git a/openmsistream/kafka_wrapper/controlled_message_processor.py b/openmsistream/kafka_wrapper/controlled_message_processor.py index b198a892..13e97b57 100644 --- a/openmsistream/kafka_wrapper/controlled_message_processor.py +++ b/openmsistream/kafka_wrapper/controlled_message_processor.py @@ -3,13 +3,8 @@ """ # imports -import warnings from abc import ABC, abstractmethod -with warnings.catch_warnings(): - warnings.simplefilter("ignore") - from kafkacrypto import KafkaCryptoMessage - from kafkacrypto.confluent_kafka_wrapper import Message from ..utilities.heartbeat_producibles import MessageProcessorHeartbeatProducible from ..utilities.controlled_processes_heartbeats import ( ControlledProcessMultiThreadedHeartbeats, @@ -127,19 +122,25 @@ def __do_alive_loop_iteration(self, consumer): with self.lock: self.n_msgs_read += 1 self.n_msgs_read_since_last_heartbeat += 1 - if ( - hasattr(msg, "key") - and hasattr(msg, "value") - and ( - isinstance(msg.key, KafkaCryptoMessage) - or isinstance(msg.value, KafkaCryptoMessage) - ) - ): - self.n_bytes_read_since_last_heartbeat += len(bytes(msg)) - elif isinstance(msg, Message): - self.n_bytes_read_since_last_heartbeat += len(msg.value) - else: + # + # This accounting seems imprecise and incomplete. + # + keylen = 0 + vallen = 0 + if hasattr(msg, "key"): + try: + keylen = len(bytes(msg.key)) + except: + keylen = len(msg.key) + if hasattr(msg, "value"): + try: + vallen = len(bytes(msg.value)) + except: + vallen = len(msg.value) + if keylen == 0 and vallen == 0: self.n_bytes_read_since_last_heartbeat += len(msg) + else: + self.n_bytes_read_since_last_heartbeat += keylen+vallen self.last_message = msg # send the message to the _process_message function retval = self._process_message(self.lock, msg) @@ -148,19 +149,25 @@ def __do_alive_loop_iteration(self, consumer): with self.lock: self.n_msgs_processed += 1 self.n_msgs_processed_since_last_heartbeat += 1 - if ( - hasattr(msg, "key") - and hasattr(msg, "value") - and ( - isinstance(msg.key, KafkaCryptoMessage) - or isinstance(msg.value, KafkaCryptoMessage) - ) - ): - self.n_bytes_processed_since_last_heartbeat += len(bytes(msg)) - elif isinstance(msg, Message): - self.n_bytes_processed_since_last_heartbeat += len(msg.value) + # + # This accounting seems imprecise and incomplete. + # + keylen = 0 + vallen = 0 + if hasattr(msg, "key"): + try: + keylen = len(bytes(msg.key)) + except: + keylen = len(msg.key) + if hasattr(msg, "value"): + try: + vallen = len(bytes(msg.value)) + except: + vallen = len(msg.value) + if keylen == 0 and vallen == 0: + self.n_bytes_read_since_last_heartbeat += len(msg) else: - self.n_bytes_processed_since_last_heartbeat += len(msg) + self.n_bytes_read_since_last_heartbeat += keylen+vallen if not consumer.message_consumed_before(msg): tps = consumer.commit(msg) if tps is None: diff --git a/openmsistream/kafka_wrapper/controlled_message_reproducer.py b/openmsistream/kafka_wrapper/controlled_message_reproducer.py index 41fd54a0..deb50e73 100644 --- a/openmsistream/kafka_wrapper/controlled_message_reproducer.py +++ b/openmsistream/kafka_wrapper/controlled_message_reproducer.py @@ -4,14 +4,10 @@ """ # imports -import time, warnings +import time from abc import ABC, abstractmethod from queue import Queue -with warnings.catch_warnings(): - warnings.simplefilter("ignore") - from kafkacrypto import KafkaCryptoMessage - from kafkacrypto.confluent_kafka_wrapper import Message from ..utilities.config import RUN_CONST from ..utilities.heartbeat_producibles import MessageReproducerHeartbeatProducible from ..utilities.controlled_processes_heartbeats import ( @@ -202,19 +198,25 @@ def __consume_messages_while_alive(self, consumer): with self.lock: self.n_msgs_read += 1 self.n_msgs_read_since_last_heartbeat += 1 - if ( - hasattr(msg, "key") - and hasattr(msg, "value") - and ( - isinstance(msg.key, KafkaCryptoMessage) - or isinstance(msg.value, KafkaCryptoMessage) - ) - ): - self.n_bytes_read_since_last_heartbeat += len(bytes(msg)) - elif isinstance(msg, Message): - self.n_bytes_read_since_last_heartbeat += len(msg.value) - else: + # + # This accounting seems imprecise and incomplete. + # + keylen = 0 + vallen = 0 + if hasattr(msg, "key"): + try: + keylen = len(bytes(msg.key)) + except: + keylen = len(msg.key) + if hasattr(msg, "value"): + try: + vallen = len(bytes(msg.value)) + except: + vallen = len(msg.value) + if keylen == 0 and vallen == 0: self.n_bytes_read_since_last_heartbeat += len(msg) + else: + self.n_bytes_read_since_last_heartbeat += keylen+vallen self.last_message = msg # send the message to the _process_message function retval = self._process_message(self.lock, msg) @@ -223,19 +225,25 @@ def __consume_messages_while_alive(self, consumer): with self.lock: self.n_msgs_processed += 1 self.n_msgs_processed_since_last_heartbeat += 1 - if ( - hasattr(msg, "key") - and hasattr(msg, "value") - and ( - isinstance(msg.key, KafkaCryptoMessage) - or isinstance(msg.value, KafkaCryptoMessage) - ) - ): - self.n_bytes_processed_since_last_heartbeat += len(bytes(msg)) - elif isinstance(msg, Message): - self.n_bytes_processed_since_last_heartbeat += len(msg.value) + # + # This accounting seems imprecise and incomplete. + # + keylen = 0 + vallen = 0 + if hasattr(msg, "key"): + try: + keylen = len(bytes(msg.key)) + except: + keylen = len(msg.key) + if hasattr(msg, "value"): + try: + vallen = len(bytes(msg.value)) + except: + vallen = len(msg.value) + if keylen == 0 and vallen == 0: + self.n_bytes_read_since_last_heartbeat += len(msg) else: - self.n_bytes_processed_since_last_heartbeat += len(msg) + self.n_bytes_read_since_last_heartbeat += keylen+vallen if not consumer.message_consumed_before(msg): tps = consumer.commit(msg) if tps is None: diff --git a/openmsistream/kafka_wrapper/openmsistream_consumer.py b/openmsistream/kafka_wrapper/openmsistream_consumer.py index 897eb1b7..2cb9f2c2 100644 --- a/openmsistream/kafka_wrapper/openmsistream_consumer.py +++ b/openmsistream/kafka_wrapper/openmsistream_consumer.py @@ -1,13 +1,11 @@ """A wrapped Kafka Consumer. May consume encrypted messages.""" # imports -import uuid, warnings, gc +import uuid, gc import methodtools from confluent_kafka import DeserializingConsumer, Message -with warnings.catch_warnings(): - warnings.simplefilter("ignore") - from kafkacrypto import KafkaConsumer +from kafkacrypto import KafkaConsumer from openmsitoolbox import LogOwner from openmsitoolbox.utilities.misc import ( raise_err_with_optional_logger, diff --git a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py index 1ee075fa..da4dc2b7 100644 --- a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py +++ b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py @@ -3,11 +3,9 @@ """ # imports -import pathlib, warnings, logging, configparser +import pathlib, configparser -with warnings.catch_warnings(): - warnings.simplefilter("ignore") - from kafkacrypto import KafkaProducer, KafkaConsumer, KafkaCrypto, KafkaCryptoStore +from kafkacrypto import KafkaProducer, KafkaConsumer, KafkaCrypto, KafkaCryptoStore from openmsitoolbox.utilities.misc import change_dir @@ -62,16 +60,14 @@ def __init__(self, broker_configs, config_file): """ Constructor method """ + # get kafka crypto configs, and set logging level for kafkacrypto loggers kcp_cfgs, kcc_cfgs = self.__get_configs_from_file(broker_configs, config_file) with change_dir(pathlib.Path(config_file).parent): - kc_logger = logging.getLogger("kafkacrypto") - kc_logger.setLevel(logging.ERROR) # start up the producer and consumer self._kcp = KafkaProducer(**kcp_cfgs) self._kcc = KafkaConsumer(**kcc_cfgs) # initialize the KafkaCrypto object self._kc = KafkaCrypto(None, self._kcp, self._kcc, config_file) - kc_logger.setLevel(logging.WARNING) self.__config_file = config_file def close(self): @@ -106,7 +102,7 @@ def __get_configs_from_file(self, broker_configs, config_file): with open(config_file, "w") as cfg_fp: config.write(cfg_fp) # Parse the config file and get consumer and producer configs - cfg_parser = KafkaCryptoStore(config_file, conf_global_logger=False) + cfg_parser = KafkaCryptoStore(config_file, conf_global_logger=False) # this sets logging levels for kafkacrypto loggers only kcc_cfgs = cfg_parser.get_kafka_config("consumer", extra="crypto") kcp_cfgs = cfg_parser.get_kafka_config("producer", extra="crypto") cfg_parser.close() diff --git a/openmsistream/kafka_wrapper/openmsistream_producer.py b/openmsistream/kafka_wrapper/openmsistream_producer.py index 6c4d3805..5fb2d629 100644 --- a/openmsistream/kafka_wrapper/openmsistream_producer.py +++ b/openmsistream/kafka_wrapper/openmsistream_producer.py @@ -1,12 +1,10 @@ """A wrapped Kafka Producer. May produce encrypted messages.""" # imports -import time, warnings, gc +import time, gc from confluent_kafka import SerializingProducer -with warnings.catch_warnings(): - warnings.simplefilter("ignore") - from kafkacrypto import KafkaProducer +from kafkacrypto import KafkaProducer from openmsitoolbox import LogOwner from openmsitoolbox.utilities.misc import ( raise_err_with_optional_logger, diff --git a/openmsistream/kafka_wrapper/serialization.py b/openmsistream/kafka_wrapper/serialization.py index 3c1c2c3c..13e1fba4 100644 --- a/openmsistream/kafka_wrapper/serialization.py +++ b/openmsistream/kafka_wrapper/serialization.py @@ -1,15 +1,13 @@ """Classes for serialization and deserialization operations""" # imports -import pathlib, inspect, time, warnings +import pathlib, inspect, time from hashlib import sha512 import msgpack from confluent_kafka.error import SerializationError from confluent_kafka.serialization import Serializer, Deserializer -with warnings.catch_warnings(): - warnings.simplefilter("ignore") - from kafkacrypto.message import KafkaCryptoMessage, KafkaCryptoMessageError +from kafkacrypto.message import KafkaCryptoMessage, KafkaCryptoMessageError from ..data_file_io.entity.data_file_chunk import DataFileChunk ####################### COMPOUND (DE)SERIALIZERS FOR STACKING MULTIPLE STEPS ####################### diff --git a/openmsistream/kafka_wrapper/utilities.py b/openmsistream/kafka_wrapper/utilities.py index 0a4dc405..f1b3cb59 100644 --- a/openmsistream/kafka_wrapper/utilities.py +++ b/openmsistream/kafka_wrapper/utilities.py @@ -1,13 +1,9 @@ """Miscellaneous functions/classes used elsewhere in the Kafka wrapper""" # imports -import warnings from collections import namedtuple from confluent_kafka import OFFSET_BEGINNING - -with warnings.catch_warnings(): - warnings.simplefilter("ignore") - from kafkacrypto import KafkaConsumer +from kafkacrypto import KafkaConsumer def add_kwargs_to_configs(configs, logger, **kwargs): diff --git a/openmsistream/tools/provision_wrapper.py b/openmsistream/tools/provision_wrapper.py index be44205f..f0401fc7 100644 --- a/openmsistream/tools/provision_wrapper.py +++ b/openmsistream/tools/provision_wrapper.py @@ -4,20 +4,18 @@ """ # imports -import pathlib, shutil, logging, warnings +import pathlib, shutil, logging from argparse import ArgumentParser import urllib.request -with warnings.catch_warnings(): - warnings.simplefilter("ignore") - import kafkacrypto +import kafkacrypto from openmsitoolbox.logging import OpenMSILogger from openmsitoolbox.utilities.misc import change_dir from ..utilities.config import RUN_CONST from ..utilities.config_file_parser import ConfigFileParser # constants -LOGGER = OpenMSILogger("ProvisionNode", logging.INFO) +LOGGER = OpenMSILogger("ProvisionNode", logging.INFO) # TODO: instead of INFO, load from a file somewhere? KC_PATH = kafkacrypto.__path__ SP_NAME = "simple-provision.py" OP_NAME = "online-provision.py" From 202cfb1845630423d66ec4320761a98c641a23ec Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Wed, 1 Jan 2025 11:17:36 -0500 Subject: [PATCH 02/26] Propagate default logging level through producers/consumers too. --- .../kafka_wrapper/openmsistream_consumer.py | 2 +- .../openmsistream_kafka_crypto.py | 18 ++++++++++++------ .../kafka_wrapper/openmsistream_producer.py | 2 +- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/openmsistream/kafka_wrapper/openmsistream_consumer.py b/openmsistream/kafka_wrapper/openmsistream_consumer.py index 2cb9f2c2..4f0c5ed0 100644 --- a/openmsistream/kafka_wrapper/openmsistream_consumer.py +++ b/openmsistream/kafka_wrapper/openmsistream_consumer.py @@ -175,7 +175,7 @@ def get_consumer_args_kwargs( ) debug_msg_with_optional_logger(logger, debugmsg) k_c = OpenMSIStreamKafkaCrypto( - parser.broker_configs, parser.kc_config_file_str + parser.broker_configs, parser.kc_config_file_str, self.logger().level if (logging is None) else logging.level ) if "key.deserializer" in all_configs: keydes = CompoundDeserializer( diff --git a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py index da4dc2b7..1bf908f9 100644 --- a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py +++ b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py @@ -56,12 +56,12 @@ def value_deserializer(self): """ return self._kc.getValueDeserializer() - def __init__(self, broker_configs, config_file): + def __init__(self, broker_configs, config_file, log_level): """ Constructor method """ # get kafka crypto configs, and set logging level for kafkacrypto loggers - kcp_cfgs, kcc_cfgs = self.__get_configs_from_file(broker_configs, config_file) + kcp_cfgs, kcc_cfgs = self.__get_configs_from_file(broker_configs, config_file, log_level) with change_dir(pathlib.Path(config_file).parent): # start up the producer and consumer self._kcp = KafkaProducer(**kcp_cfgs) @@ -83,13 +83,18 @@ def close(self): self._kcp = None self._kcc = None - def __get_configs_from_file(self, broker_configs, config_file): + def __get_configs_from_file(self, broker_configs, config_file, default_log_level): """Return the dictionaries of crypto producer and consumer configs determined from the KafkaCrypto config file and overwritten with the given broker configs from the OpenMSIStream config file. KafkaCryptoStore must be used when parsing the crypto config file to ensure options (and clearing of options) is properly handled. """ + # Make updates to kafkaconfig file according to what we are passed + config = configparser.ConfigParser(delimiters=":") + config.read(config_file) + # Unilaterally pdate default log_level (can be overridden in -crypto subsection by user) + config.set("{config_file.stem}", "log_level", default_log_level) # If ssl.ca.location is set in the broker configs, make sure it's written to the # KafkaCrypto config file as well in the right place if "ssl.ca.location" in broker_configs: @@ -99,10 +104,11 @@ def __get_configs_from_file(self, broker_configs, config_file): option_name = "ssl_cafile" if config.has_option(section_name, option_name): config.set(section_name, option_name, broker_configs["ssl.ca.location"]) - with open(config_file, "w") as cfg_fp: - config.write(cfg_fp) + # Save changes to config_file + with open(config_file, "w") as cfg_fp: + config.write(cfg_fp) # Parse the config file and get consumer and producer configs - cfg_parser = KafkaCryptoStore(config_file, conf_global_logger=False) # this sets logging levels for kafkacrypto loggers only + cfg_parser = KafkaCryptoStore(config_file, conf_global_logger=False) # this sets logging levels for kafkacrypto loggers kcc_cfgs = cfg_parser.get_kafka_config("consumer", extra="crypto") kcp_cfgs = cfg_parser.get_kafka_config("producer", extra="crypto") cfg_parser.close() diff --git a/openmsistream/kafka_wrapper/openmsistream_producer.py b/openmsistream/kafka_wrapper/openmsistream_producer.py index 5fb2d629..143edf6c 100644 --- a/openmsistream/kafka_wrapper/openmsistream_producer.py +++ b/openmsistream/kafka_wrapper/openmsistream_producer.py @@ -123,7 +123,7 @@ def get_producer_args_kwargs( ) debug_msg_with_optional_logger(logger, debugmsg) k_c = OpenMSIStreamKafkaCrypto( - parser.broker_configs, parser.kc_config_file_str + parser.broker_configs, parser.kc_config_file_str, self.logger().level if (logging is None) else logging.level ) if "key.serializer" in all_configs: keyser = CompoundSerializer( From 81d26880752ce23339fb1dd64b2362114cac2d55 Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Wed, 1 Jan 2025 11:31:19 -0500 Subject: [PATCH 03/26] Fix static method log level setting --- openmsistream/kafka_wrapper/openmsistream_consumer.py | 4 ++-- openmsistream/kafka_wrapper/openmsistream_producer.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/openmsistream/kafka_wrapper/openmsistream_consumer.py b/openmsistream/kafka_wrapper/openmsistream_consumer.py index 4f0c5ed0..1ad210d5 100644 --- a/openmsistream/kafka_wrapper/openmsistream_consumer.py +++ b/openmsistream/kafka_wrapper/openmsistream_consumer.py @@ -1,7 +1,7 @@ """A wrapped Kafka Consumer. May consume encrypted messages.""" # imports -import uuid, gc +import uuid, gc, logging import methodtools from confluent_kafka import DeserializingConsumer, Message @@ -175,7 +175,7 @@ def get_consumer_args_kwargs( ) debug_msg_with_optional_logger(logger, debugmsg) k_c = OpenMSIStreamKafkaCrypto( - parser.broker_configs, parser.kc_config_file_str, self.logger().level if (logging is None) else logging.level + parser.broker_configs, parser.kc_config_file_str, logging.WARNING if (logger.level is None) else logger.level ) if "key.deserializer" in all_configs: keydes = CompoundDeserializer( diff --git a/openmsistream/kafka_wrapper/openmsistream_producer.py b/openmsistream/kafka_wrapper/openmsistream_producer.py index 143edf6c..89e81909 100644 --- a/openmsistream/kafka_wrapper/openmsistream_producer.py +++ b/openmsistream/kafka_wrapper/openmsistream_producer.py @@ -1,7 +1,7 @@ """A wrapped Kafka Producer. May produce encrypted messages.""" # imports -import time, gc +import time, gc, logging from confluent_kafka import SerializingProducer from kafkacrypto import KafkaProducer @@ -123,7 +123,7 @@ def get_producer_args_kwargs( ) debug_msg_with_optional_logger(logger, debugmsg) k_c = OpenMSIStreamKafkaCrypto( - parser.broker_configs, parser.kc_config_file_str, self.logger().level if (logging is None) else logging.level + parser.broker_configs, parser.kc_config_file_str, logging.WARNING if (logger is None) else logger.level ) if "key.serializer" in all_configs: keyser = CompoundSerializer( From bb4ca061b43917e8dbaa212e040a2aef8e57842a Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Wed, 1 Jan 2025 11:38:31 -0500 Subject: [PATCH 04/26] Make code formatter happy. --- .../kafka_wrapper/controlled_message_processor.py | 4 ++-- .../kafka_wrapper/controlled_message_reproducer.py | 4 ++-- openmsistream/kafka_wrapper/openmsistream_consumer.py | 4 +++- openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py | 8 ++++++-- openmsistream/kafka_wrapper/openmsistream_producer.py | 4 +++- openmsistream/tools/provision_wrapper.py | 4 +++- 6 files changed, 19 insertions(+), 9 deletions(-) diff --git a/openmsistream/kafka_wrapper/controlled_message_processor.py b/openmsistream/kafka_wrapper/controlled_message_processor.py index 13e97b57..ffca6230 100644 --- a/openmsistream/kafka_wrapper/controlled_message_processor.py +++ b/openmsistream/kafka_wrapper/controlled_message_processor.py @@ -140,7 +140,7 @@ def __do_alive_loop_iteration(self, consumer): if keylen == 0 and vallen == 0: self.n_bytes_read_since_last_heartbeat += len(msg) else: - self.n_bytes_read_since_last_heartbeat += keylen+vallen + self.n_bytes_read_since_last_heartbeat += keylen + vallen self.last_message = msg # send the message to the _process_message function retval = self._process_message(self.lock, msg) @@ -167,7 +167,7 @@ def __do_alive_loop_iteration(self, consumer): if keylen == 0 and vallen == 0: self.n_bytes_read_since_last_heartbeat += len(msg) else: - self.n_bytes_read_since_last_heartbeat += keylen+vallen + self.n_bytes_read_since_last_heartbeat += keylen + vallen if not consumer.message_consumed_before(msg): tps = consumer.commit(msg) if tps is None: diff --git a/openmsistream/kafka_wrapper/controlled_message_reproducer.py b/openmsistream/kafka_wrapper/controlled_message_reproducer.py index deb50e73..712868a0 100644 --- a/openmsistream/kafka_wrapper/controlled_message_reproducer.py +++ b/openmsistream/kafka_wrapper/controlled_message_reproducer.py @@ -216,7 +216,7 @@ def __consume_messages_while_alive(self, consumer): if keylen == 0 and vallen == 0: self.n_bytes_read_since_last_heartbeat += len(msg) else: - self.n_bytes_read_since_last_heartbeat += keylen+vallen + self.n_bytes_read_since_last_heartbeat += keylen + vallen self.last_message = msg # send the message to the _process_message function retval = self._process_message(self.lock, msg) @@ -243,7 +243,7 @@ def __consume_messages_while_alive(self, consumer): if keylen == 0 and vallen == 0: self.n_bytes_read_since_last_heartbeat += len(msg) else: - self.n_bytes_read_since_last_heartbeat += keylen+vallen + self.n_bytes_read_since_last_heartbeat += keylen + vallen if not consumer.message_consumed_before(msg): tps = consumer.commit(msg) if tps is None: diff --git a/openmsistream/kafka_wrapper/openmsistream_consumer.py b/openmsistream/kafka_wrapper/openmsistream_consumer.py index 1ad210d5..127f8937 100644 --- a/openmsistream/kafka_wrapper/openmsistream_consumer.py +++ b/openmsistream/kafka_wrapper/openmsistream_consumer.py @@ -175,7 +175,9 @@ def get_consumer_args_kwargs( ) debug_msg_with_optional_logger(logger, debugmsg) k_c = OpenMSIStreamKafkaCrypto( - parser.broker_configs, parser.kc_config_file_str, logging.WARNING if (logger.level is None) else logger.level + parser.broker_configs, + parser.kc_config_file_str, + logging.WARNING if (logger.level is None) else logger.level, ) if "key.deserializer" in all_configs: keydes = CompoundDeserializer( diff --git a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py index 1bf908f9..eb8adc82 100644 --- a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py +++ b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py @@ -61,7 +61,9 @@ def __init__(self, broker_configs, config_file, log_level): Constructor method """ # get kafka crypto configs, and set logging level for kafkacrypto loggers - kcp_cfgs, kcc_cfgs = self.__get_configs_from_file(broker_configs, config_file, log_level) + kcp_cfgs, kcc_cfgs = self.__get_configs_from_file( + broker_configs, config_file, log_level + ) with change_dir(pathlib.Path(config_file).parent): # start up the producer and consumer self._kcp = KafkaProducer(**kcp_cfgs) @@ -108,7 +110,9 @@ def __get_configs_from_file(self, broker_configs, config_file, default_log_level with open(config_file, "w") as cfg_fp: config.write(cfg_fp) # Parse the config file and get consumer and producer configs - cfg_parser = KafkaCryptoStore(config_file, conf_global_logger=False) # this sets logging levels for kafkacrypto loggers + cfg_parser = KafkaCryptoStore( + config_file, conf_global_logger=False + ) # this sets logging levels for kafkacrypto loggers kcc_cfgs = cfg_parser.get_kafka_config("consumer", extra="crypto") kcp_cfgs = cfg_parser.get_kafka_config("producer", extra="crypto") cfg_parser.close() diff --git a/openmsistream/kafka_wrapper/openmsistream_producer.py b/openmsistream/kafka_wrapper/openmsistream_producer.py index 89e81909..b95d6930 100644 --- a/openmsistream/kafka_wrapper/openmsistream_producer.py +++ b/openmsistream/kafka_wrapper/openmsistream_producer.py @@ -123,7 +123,9 @@ def get_producer_args_kwargs( ) debug_msg_with_optional_logger(logger, debugmsg) k_c = OpenMSIStreamKafkaCrypto( - parser.broker_configs, parser.kc_config_file_str, logging.WARNING if (logger is None) else logger.level + parser.broker_configs, + parser.kc_config_file_str, + logging.WARNING if (logger is None) else logger.level ) if "key.serializer" in all_configs: keyser = CompoundSerializer( diff --git a/openmsistream/tools/provision_wrapper.py b/openmsistream/tools/provision_wrapper.py index f0401fc7..abc20e13 100644 --- a/openmsistream/tools/provision_wrapper.py +++ b/openmsistream/tools/provision_wrapper.py @@ -15,7 +15,9 @@ from ..utilities.config_file_parser import ConfigFileParser # constants -LOGGER = OpenMSILogger("ProvisionNode", logging.INFO) # TODO: instead of INFO, load from a file somewhere? +LOGGER = OpenMSILogger( + "ProvisionNode", logging.INFO +) # TODO: instead of INFO, load from a file somewhere? KC_PATH = kafkacrypto.__path__ SP_NAME = "simple-provision.py" OP_NAME = "online-provision.py" From fcbe67617569a40213575dd6cce86a03a7e463f1 Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Wed, 1 Jan 2025 11:40:41 -0500 Subject: [PATCH 05/26] Make code formatters happier. --- openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py | 2 +- openmsistream/kafka_wrapper/openmsistream_producer.py | 2 +- openmsistream/tools/provision_wrapper.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py index eb8adc82..f7760322 100644 --- a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py +++ b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py @@ -112,7 +112,7 @@ def __get_configs_from_file(self, broker_configs, config_file, default_log_level # Parse the config file and get consumer and producer configs cfg_parser = KafkaCryptoStore( config_file, conf_global_logger=False - ) # this sets logging levels for kafkacrypto loggers + ) # this sets logging levels for kafkacrypto loggers kcc_cfgs = cfg_parser.get_kafka_config("consumer", extra="crypto") kcp_cfgs = cfg_parser.get_kafka_config("producer", extra="crypto") cfg_parser.close() diff --git a/openmsistream/kafka_wrapper/openmsistream_producer.py b/openmsistream/kafka_wrapper/openmsistream_producer.py index b95d6930..3bb253b9 100644 --- a/openmsistream/kafka_wrapper/openmsistream_producer.py +++ b/openmsistream/kafka_wrapper/openmsistream_producer.py @@ -125,7 +125,7 @@ def get_producer_args_kwargs( k_c = OpenMSIStreamKafkaCrypto( parser.broker_configs, parser.kc_config_file_str, - logging.WARNING if (logger is None) else logger.level + logging.WARNING if (logger is None) else logger.level, ) if "key.serializer" in all_configs: keyser = CompoundSerializer( diff --git a/openmsistream/tools/provision_wrapper.py b/openmsistream/tools/provision_wrapper.py index abc20e13..35270837 100644 --- a/openmsistream/tools/provision_wrapper.py +++ b/openmsistream/tools/provision_wrapper.py @@ -17,7 +17,7 @@ # constants LOGGER = OpenMSILogger( "ProvisionNode", logging.INFO -) # TODO: instead of INFO, load from a file somewhere? +) # TODO: instead of INFO, load from a file somewhere? KC_PATH = kafkacrypto.__path__ SP_NAME = "simple-provision.py" OP_NAME = "online-provision.py" From 94e9542aabd42a589b484c7eb43ebe3e484e828d Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Wed, 1 Jan 2025 11:53:18 -0500 Subject: [PATCH 06/26] Further style fixups, including moving message length logic to a new module to satisfy the max branches limit. --- .../controlled_message_processor.py | 43 +++---------------- .../controlled_message_reproducer.py | 43 +++---------------- openmsistream/utilities/messages.py | 22 ++++++++++ 3 files changed, 32 insertions(+), 76 deletions(-) create mode 100644 openmsistream/utilities/messages.py diff --git a/openmsistream/kafka_wrapper/controlled_message_processor.py b/openmsistream/kafka_wrapper/controlled_message_processor.py index ffca6230..fe2099a0 100644 --- a/openmsistream/kafka_wrapper/controlled_message_processor.py +++ b/openmsistream/kafka_wrapper/controlled_message_processor.py @@ -6,6 +6,7 @@ from abc import ABC, abstractmethod from ..utilities.heartbeat_producibles import MessageProcessorHeartbeatProducible +from ..utilities.messages import get_message_length from ..utilities.controlled_processes_heartbeats import ( ControlledProcessMultiThreadedHeartbeats, ) @@ -122,25 +123,8 @@ def __do_alive_loop_iteration(self, consumer): with self.lock: self.n_msgs_read += 1 self.n_msgs_read_since_last_heartbeat += 1 - # - # This accounting seems imprecise and incomplete. - # - keylen = 0 - vallen = 0 - if hasattr(msg, "key"): - try: - keylen = len(bytes(msg.key)) - except: - keylen = len(msg.key) - if hasattr(msg, "value"): - try: - vallen = len(bytes(msg.value)) - except: - vallen = len(msg.value) - if keylen == 0 and vallen == 0: - self.n_bytes_read_since_last_heartbeat += len(msg) - else: - self.n_bytes_read_since_last_heartbeat += keylen + vallen + self.n_bytes_read_since_last_heartbeat += + get_message_length(msg) self.last_message = msg # send the message to the _process_message function retval = self._process_message(self.lock, msg) @@ -149,25 +133,8 @@ def __do_alive_loop_iteration(self, consumer): with self.lock: self.n_msgs_processed += 1 self.n_msgs_processed_since_last_heartbeat += 1 - # - # This accounting seems imprecise and incomplete. - # - keylen = 0 - vallen = 0 - if hasattr(msg, "key"): - try: - keylen = len(bytes(msg.key)) - except: - keylen = len(msg.key) - if hasattr(msg, "value"): - try: - vallen = len(bytes(msg.value)) - except: - vallen = len(msg.value) - if keylen == 0 and vallen == 0: - self.n_bytes_read_since_last_heartbeat += len(msg) - else: - self.n_bytes_read_since_last_heartbeat += keylen + vallen + self.n_bytes_read_since_last_heartbeat += + get_message_length(msg) if not consumer.message_consumed_before(msg): tps = consumer.commit(msg) if tps is None: diff --git a/openmsistream/kafka_wrapper/controlled_message_reproducer.py b/openmsistream/kafka_wrapper/controlled_message_reproducer.py index 712868a0..fd5b727f 100644 --- a/openmsistream/kafka_wrapper/controlled_message_reproducer.py +++ b/openmsistream/kafka_wrapper/controlled_message_reproducer.py @@ -10,6 +10,7 @@ from ..utilities.config import RUN_CONST from ..utilities.heartbeat_producibles import MessageReproducerHeartbeatProducible +from ..utilities.messages import get_message_length from ..utilities.controlled_processes_heartbeats import ( ControlledProcessMultiThreadedHeartbeats, ) @@ -198,25 +199,8 @@ def __consume_messages_while_alive(self, consumer): with self.lock: self.n_msgs_read += 1 self.n_msgs_read_since_last_heartbeat += 1 - # - # This accounting seems imprecise and incomplete. - # - keylen = 0 - vallen = 0 - if hasattr(msg, "key"): - try: - keylen = len(bytes(msg.key)) - except: - keylen = len(msg.key) - if hasattr(msg, "value"): - try: - vallen = len(bytes(msg.value)) - except: - vallen = len(msg.value) - if keylen == 0 and vallen == 0: - self.n_bytes_read_since_last_heartbeat += len(msg) - else: - self.n_bytes_read_since_last_heartbeat += keylen + vallen + self.n_bytes_read_since_last_heartbeat += + get_message_length(msg) self.last_message = msg # send the message to the _process_message function retval = self._process_message(self.lock, msg) @@ -225,25 +209,8 @@ def __consume_messages_while_alive(self, consumer): with self.lock: self.n_msgs_processed += 1 self.n_msgs_processed_since_last_heartbeat += 1 - # - # This accounting seems imprecise and incomplete. - # - keylen = 0 - vallen = 0 - if hasattr(msg, "key"): - try: - keylen = len(bytes(msg.key)) - except: - keylen = len(msg.key) - if hasattr(msg, "value"): - try: - vallen = len(bytes(msg.value)) - except: - vallen = len(msg.value) - if keylen == 0 and vallen == 0: - self.n_bytes_read_since_last_heartbeat += len(msg) - else: - self.n_bytes_read_since_last_heartbeat += keylen + vallen + self.n_bytes_read_since_last_heartbeat += + get_message_length(msg) if not consumer.message_consumed_before(msg): tps = consumer.commit(msg) if tps is None: diff --git a/openmsistream/utilities/messages.py b/openmsistream/utilities/messages.py new file mode 100644 index 00000000..91e4ee73 --- /dev/null +++ b/openmsistream/utilities/messages.py @@ -0,0 +1,22 @@ +" Various message helpers " +def get_message_length(msg): + # + # This accounting seems imprecise and incomplete. + # + keylen = 0 + vallen = 0 + if hasattr(msg, "key"): + try: + keylen = len(bytes(msg.key)) + except TypeError: + keylen = len(msg.key) + if hasattr(msg, "value"): + try: + vallen = len(bytes(msg.value)) + except TypeError: + vallen = len(msg.value) + if keylen == 0 and vallen == 0: + return len(msg) + else: + return (keylen + vallen) + From 67b56fd063ebc0d18ccdd86ae9d5a2d7227e694f Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Wed, 1 Jan 2025 11:55:43 -0500 Subject: [PATCH 07/26] Styles --- openmsistream/kafka_wrapper/controlled_message_processor.py | 6 ++---- .../kafka_wrapper/controlled_message_reproducer.py | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/openmsistream/kafka_wrapper/controlled_message_processor.py b/openmsistream/kafka_wrapper/controlled_message_processor.py index fe2099a0..1fe1c651 100644 --- a/openmsistream/kafka_wrapper/controlled_message_processor.py +++ b/openmsistream/kafka_wrapper/controlled_message_processor.py @@ -123,8 +123,7 @@ def __do_alive_loop_iteration(self, consumer): with self.lock: self.n_msgs_read += 1 self.n_msgs_read_since_last_heartbeat += 1 - self.n_bytes_read_since_last_heartbeat += - get_message_length(msg) + self.n_bytes_read_since_last_heartbeat += get_message_length(msg) self.last_message = msg # send the message to the _process_message function retval = self._process_message(self.lock, msg) @@ -133,8 +132,7 @@ def __do_alive_loop_iteration(self, consumer): with self.lock: self.n_msgs_processed += 1 self.n_msgs_processed_since_last_heartbeat += 1 - self.n_bytes_read_since_last_heartbeat += - get_message_length(msg) + self.n_bytes_read_since_last_heartbeat += get_message_length(msg) if not consumer.message_consumed_before(msg): tps = consumer.commit(msg) if tps is None: diff --git a/openmsistream/kafka_wrapper/controlled_message_reproducer.py b/openmsistream/kafka_wrapper/controlled_message_reproducer.py index fd5b727f..6028e0c9 100644 --- a/openmsistream/kafka_wrapper/controlled_message_reproducer.py +++ b/openmsistream/kafka_wrapper/controlled_message_reproducer.py @@ -199,8 +199,7 @@ def __consume_messages_while_alive(self, consumer): with self.lock: self.n_msgs_read += 1 self.n_msgs_read_since_last_heartbeat += 1 - self.n_bytes_read_since_last_heartbeat += - get_message_length(msg) + self.n_bytes_read_since_last_heartbeat += get_message_length(msg) self.last_message = msg # send the message to the _process_message function retval = self._process_message(self.lock, msg) @@ -209,8 +208,7 @@ def __consume_messages_while_alive(self, consumer): with self.lock: self.n_msgs_processed += 1 self.n_msgs_processed_since_last_heartbeat += 1 - self.n_bytes_read_since_last_heartbeat += - get_message_length(msg) + self.n_bytes_read_since_last_heartbeat += get_message_length(msg) if not consumer.message_consumed_before(msg): tps = consumer.commit(msg) if tps is None: From 9ff48e533910979254402bd39ab552fbe0f5d065 Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Wed, 1 Jan 2025 11:56:38 -0500 Subject: [PATCH 08/26] Styles --- openmsistream/utilities/messages.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmsistream/utilities/messages.py b/openmsistream/utilities/messages.py index 91e4ee73..867d95d9 100644 --- a/openmsistream/utilities/messages.py +++ b/openmsistream/utilities/messages.py @@ -18,5 +18,5 @@ def get_message_length(msg): if keylen == 0 and vallen == 0: return len(msg) else: - return (keylen + vallen) + return keylen + vallen From 25340ead36aebaf9b2cc87734f8c9797416b26f4 Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Wed, 1 Jan 2025 11:57:50 -0500 Subject: [PATCH 09/26] Styles --- openmsistream/utilities/messages.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/openmsistream/utilities/messages.py b/openmsistream/utilities/messages.py index 867d95d9..a547dcdc 100644 --- a/openmsistream/utilities/messages.py +++ b/openmsistream/utilities/messages.py @@ -1,4 +1,6 @@ " Various message helpers " + + def get_message_length(msg): # # This accounting seems imprecise and incomplete. @@ -19,4 +21,3 @@ def get_message_length(msg): return len(msg) else: return keylen + vallen - From d6adb135905a4b497a8fa96e7cee02abc231f43a Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Wed, 1 Jan 2025 12:00:22 -0500 Subject: [PATCH 10/26] Styles --- openmsistream/tools/provision_wrapper.py | 2 +- openmsistream/utilities/messages.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/openmsistream/tools/provision_wrapper.py b/openmsistream/tools/provision_wrapper.py index 35270837..1def93a1 100644 --- a/openmsistream/tools/provision_wrapper.py +++ b/openmsistream/tools/provision_wrapper.py @@ -17,7 +17,7 @@ # constants LOGGER = OpenMSILogger( "ProvisionNode", logging.INFO -) # TODO: instead of INFO, load from a file somewhere? +) # Instead of INFO, load from a file somewhere? KC_PATH = kafkacrypto.__path__ SP_NAME = "simple-provision.py" OP_NAME = "online-provision.py" diff --git a/openmsistream/utilities/messages.py b/openmsistream/utilities/messages.py index a547dcdc..b8b18c95 100644 --- a/openmsistream/utilities/messages.py +++ b/openmsistream/utilities/messages.py @@ -2,6 +2,9 @@ def get_message_length(msg): + """ + returns length of a message + """ # # This accounting seems imprecise and incomplete. # @@ -19,5 +22,4 @@ def get_message_length(msg): vallen = len(msg.value) if keylen == 0 and vallen == 0: return len(msg) - else: - return keylen + vallen + return keylen + vallen From d6689543cf41dd86aa59267b26b3d4bd7bc627ea Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Wed, 1 Jan 2025 12:02:42 -0500 Subject: [PATCH 11/26] Update serialization test. --- test/test_scripts/test_serialization.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/test_scripts/test_serialization.py b/test/test_scripts/test_serialization.py index c691b453..9618fc45 100644 --- a/test/test_scripts/test_serialization.py +++ b/test/test_scripts/test_serialization.py @@ -1,5 +1,5 @@ # imports -import pathlib, time +import pathlib, time, logging from confluent_kafka.error import SerializationError from openmsistream import UploadDataFile from openmsistream.kafka_wrapper.config_file_parser import KafkaConfigFileParser @@ -120,11 +120,11 @@ def test_encrypted_compound_serdes_kafka(self): parser1 = KafkaConfigFileParser( TEST_CONST.TEST_CFG_FILE_PATH_ENC, logger=self.logger ) - kc1 = OpenMSIStreamKafkaCrypto(parser1.broker_configs, parser1.kc_config_file_str) + kc1 = OpenMSIStreamKafkaCrypto(parser1.broker_configs, parser1.kc_config_file_str, logging.WARNING) parser2 = KafkaConfigFileParser( TEST_CONST.TEST_CFG_FILE_PATH_ENC_2, logger=self.logger ) - kc2 = OpenMSIStreamKafkaCrypto(parser2.broker_configs, parser2.kc_config_file_str) + kc2 = OpenMSIStreamKafkaCrypto(parser2.broker_configs, parser2.kc_config_file_str, logging.WARNING) dfcs = DataFileChunkSerializer() dfcds = DataFileChunkDeserializer() comp_ser = CompoundSerializer(dfcs, kc1.value_serializer) From f858581e2e57b74e514bffdd84004266c7b305da Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Wed, 1 Jan 2025 12:04:59 -0500 Subject: [PATCH 12/26] Styles --- test/test_scripts/test_serialization.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test/test_scripts/test_serialization.py b/test/test_scripts/test_serialization.py index 9618fc45..f046431d 100644 --- a/test/test_scripts/test_serialization.py +++ b/test/test_scripts/test_serialization.py @@ -120,11 +120,15 @@ def test_encrypted_compound_serdes_kafka(self): parser1 = KafkaConfigFileParser( TEST_CONST.TEST_CFG_FILE_PATH_ENC, logger=self.logger ) - kc1 = OpenMSIStreamKafkaCrypto(parser1.broker_configs, parser1.kc_config_file_str, logging.WARNING) + kc1 = OpenMSIStreamKafkaCrypto( + parser1.broker_configs, parser1.kc_config_file_str, logging.WARNING + ) parser2 = KafkaConfigFileParser( TEST_CONST.TEST_CFG_FILE_PATH_ENC_2, logger=self.logger ) - kc2 = OpenMSIStreamKafkaCrypto(parser2.broker_configs, parser2.kc_config_file_str, logging.WARNING) + kc2 = OpenMSIStreamKafkaCrypto( + parser2.broker_configs, parser2.kc_config_file_str, logging.WARNING + ) dfcs = DataFileChunkSerializer() dfcds = DataFileChunkDeserializer() comp_ser = CompoundSerializer(dfcs, kc1.value_serializer) From d1b574461dbdaa5d29be7700fa860e45a6edcfa1 Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Fri, 3 Jan 2025 08:12:53 -0500 Subject: [PATCH 13/26] retrigger checks From 3bfa53788be15410f6c2cfa87c774bb574f16fde Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Mon, 6 Jan 2025 16:05:53 -0500 Subject: [PATCH 14/26] Update version and allow CI tests to rerun. --- openmsistream/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmsistream/version.py b/openmsistream/version.py index 320141d8..ef4e23e1 100644 --- a/openmsistream/version.py +++ b/openmsistream/version.py @@ -1 +1 @@ -__version__ = "1.8.2" +__version__ = "1.8.3.dev1" From a3081f622bc3108dec7f17b8bc6a222d76196f05 Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Mon, 6 Jan 2025 16:09:57 -0500 Subject: [PATCH 15/26] Nit fixup --- openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py index f7760322..755231df 100644 --- a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py +++ b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py @@ -96,7 +96,7 @@ def __get_configs_from_file(self, broker_configs, config_file, default_log_level config = configparser.ConfigParser(delimiters=":") config.read(config_file) # Unilaterally pdate default log_level (can be overridden in -crypto subsection by user) - config.set("{config_file.stem}", "log_level", default_log_level) + config.set("{config_file.stem}", "log_level", str(default_log_level)) # If ssl.ca.location is set in the broker configs, make sure it's written to the # KafkaCrypto config file as well in the right place if "ssl.ca.location" in broker_configs: From 2308d676a81dffe393880d877776b40192f4f8c5 Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Mon, 6 Jan 2025 16:12:34 -0500 Subject: [PATCH 16/26] Nit fixup --- openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py index 755231df..9732b69c 100644 --- a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py +++ b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py @@ -96,7 +96,7 @@ def __get_configs_from_file(self, broker_configs, config_file, default_log_level config = configparser.ConfigParser(delimiters=":") config.read(config_file) # Unilaterally pdate default log_level (can be overridden in -crypto subsection by user) - config.set("{config_file.stem}", "log_level", str(default_log_level)) + config.set(f"{config_file.stem}", "log_level", str(default_log_level)) # If ssl.ca.location is set in the broker configs, make sure it's written to the # KafkaCrypto config file as well in the right place if "ssl.ca.location" in broker_configs: From 0c2c5f03b61cf434862269df7e82255e58f473e6 Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Mon, 6 Jan 2025 16:17:21 -0500 Subject: [PATCH 17/26] Nit fixup --- openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py index 9732b69c..9bc9fb0d 100644 --- a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py +++ b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py @@ -96,12 +96,11 @@ def __get_configs_from_file(self, broker_configs, config_file, default_log_level config = configparser.ConfigParser(delimiters=":") config.read(config_file) # Unilaterally pdate default log_level (can be overridden in -crypto subsection by user) - config.set(f"{config_file.stem}", "log_level", str(default_log_level)) + section_name = f"{config_file.stem}" + config.set(section_name, "log_level", str(default_log_level)) # If ssl.ca.location is set in the broker configs, make sure it's written to the # KafkaCrypto config file as well in the right place if "ssl.ca.location" in broker_configs: - config = configparser.ConfigParser(delimiters=":") - config.read(config_file) section_name = f"{config_file.stem}-kafka" option_name = "ssl_cafile" if config.has_option(section_name, option_name): From 1163b80aee3ce56735acb4bb285e21e295a1a71f Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Mon, 6 Jan 2025 16:21:27 -0500 Subject: [PATCH 18/26] Fix longstanding nit --- openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py index 9bc9fb0d..6a7d880d 100644 --- a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py +++ b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py @@ -96,12 +96,12 @@ def __get_configs_from_file(self, broker_configs, config_file, default_log_level config = configparser.ConfigParser(delimiters=":") config.read(config_file) # Unilaterally pdate default log_level (can be overridden in -crypto subsection by user) - section_name = f"{config_file.stem}" + section_name = f"{config.stem}" config.set(section_name, "log_level", str(default_log_level)) # If ssl.ca.location is set in the broker configs, make sure it's written to the # KafkaCrypto config file as well in the right place if "ssl.ca.location" in broker_configs: - section_name = f"{config_file.stem}-kafka" + section_name = f"{config.stem}-kafka" option_name = "ssl_cafile" if config.has_option(section_name, option_name): config.set(section_name, option_name, broker_configs["ssl.ca.location"]) From acec86780825fd4d1b4c2cae850022f94fb1d8f0 Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Mon, 6 Jan 2025 16:35:48 -0500 Subject: [PATCH 19/26] Fix expected config_file parameter type --- openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py index 6a7d880d..1f502ee8 100644 --- a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py +++ b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py @@ -18,7 +18,7 @@ class OpenMSIStreamKafkaCrypto: to the broker :type broker_configs: dict :param config_file: the path to the KafkaCrypto config file for the node being used - :type config_file: :class:`pathlib.Path` + :type config_file: str """ @property @@ -62,7 +62,7 @@ def __init__(self, broker_configs, config_file, log_level): """ # get kafka crypto configs, and set logging level for kafkacrypto loggers kcp_cfgs, kcc_cfgs = self.__get_configs_from_file( - broker_configs, config_file, log_level + broker_configs, pathlib.Path(config_file), log_level ) with change_dir(pathlib.Path(config_file).parent): # start up the producer and consumer @@ -96,12 +96,12 @@ def __get_configs_from_file(self, broker_configs, config_file, default_log_level config = configparser.ConfigParser(delimiters=":") config.read(config_file) # Unilaterally pdate default log_level (can be overridden in -crypto subsection by user) - section_name = f"{config.stem}" + section_name = f"{config_file.stem}" config.set(section_name, "log_level", str(default_log_level)) # If ssl.ca.location is set in the broker configs, make sure it's written to the # KafkaCrypto config file as well in the right place if "ssl.ca.location" in broker_configs: - section_name = f"{config.stem}-kafka" + section_name = f"{config_file.stem}-kafka" option_name = "ssl_cafile" if config.has_option(section_name, option_name): config.set(section_name, option_name, broker_configs["ssl.ca.location"]) From 49fdbfd4712a4216a5f55937db2baeed0ed5e8b1 Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Mon, 6 Jan 2025 16:38:29 -0500 Subject: [PATCH 20/26] Nit --- openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py index 1f502ee8..8046ca4e 100644 --- a/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py +++ b/openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py @@ -110,7 +110,7 @@ def __get_configs_from_file(self, broker_configs, config_file, default_log_level config.write(cfg_fp) # Parse the config file and get consumer and producer configs cfg_parser = KafkaCryptoStore( - config_file, conf_global_logger=False + str(config_file), conf_global_logger=False ) # this sets logging levels for kafkacrypto loggers kcc_cfgs = cfg_parser.get_kafka_config("consumer", extra="crypto") kcp_cfgs = cfg_parser.get_kafka_config("producer", extra="crypto") From a231bec53003838b5c8541cff91ea14796f6f85c Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Mon, 6 Jan 2025 16:52:40 -0500 Subject: [PATCH 21/26] Nit --- openmsistream/utilities/messages.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/openmsistream/utilities/messages.py b/openmsistream/utilities/messages.py index b8b18c95..15a1be99 100644 --- a/openmsistream/utilities/messages.py +++ b/openmsistream/utilities/messages.py @@ -14,12 +14,18 @@ def get_message_length(msg): try: keylen = len(bytes(msg.key)) except TypeError: - keylen = len(msg.key) + try: + keylen = len(msg.key) + except TypeError: + pass if hasattr(msg, "value"): try: vallen = len(bytes(msg.value)) except TypeError: - vallen = len(msg.value) + try: + vallen = len(msg.value) + except: + pass if keylen == 0 and vallen == 0: return len(msg) return keylen + vallen From 4e5c18cf4555f6bc69cfeaa9a7cbb97f2ac38b6f Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Mon, 6 Jan 2025 16:54:17 -0500 Subject: [PATCH 22/26] Add missing type --- openmsistream/utilities/messages.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmsistream/utilities/messages.py b/openmsistream/utilities/messages.py index 15a1be99..d971689b 100644 --- a/openmsistream/utilities/messages.py +++ b/openmsistream/utilities/messages.py @@ -24,7 +24,7 @@ def get_message_length(msg): except TypeError: try: vallen = len(msg.value) - except: + except TypeError: pass if keylen == 0 and vallen == 0: return len(msg) From 726111c8ebfee17afbba0831187f3d1f0691645a Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Mon, 6 Jan 2025 17:21:55 -0500 Subject: [PATCH 23/26] Update message length calc to match old (incorrect) method --- openmsistream/utilities/messages.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/openmsistream/utilities/messages.py b/openmsistream/utilities/messages.py index d971689b..2e0e95e9 100644 --- a/openmsistream/utilities/messages.py +++ b/openmsistream/utilities/messages.py @@ -10,6 +10,7 @@ def get_message_length(msg): # keylen = 0 vallen = 0 + totlen = 0 if hasattr(msg, "key"): try: keylen = len(bytes(msg.key)) @@ -26,6 +27,11 @@ def get_message_length(msg): vallen = len(msg.value) except TypeError: pass - if keylen == 0 and vallen == 0: - return len(msg) - return keylen + vallen + try: + totlen = len(bytes(msg)) + except TypeError: + try: + totlen = len(msg) + except TypeError: + pass + return max(keylen+vallen, totlen) From 902fee4a026be705d0cb2eab8e5afaf4749e44db Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Mon, 6 Jan 2025 17:25:23 -0500 Subject: [PATCH 24/26] The CI and I disagree on appropriate styling --- openmsistream/utilities/messages.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmsistream/utilities/messages.py b/openmsistream/utilities/messages.py index 2e0e95e9..67e95a3d 100644 --- a/openmsistream/utilities/messages.py +++ b/openmsistream/utilities/messages.py @@ -34,4 +34,4 @@ def get_message_length(msg): totlen = len(msg) except TypeError: pass - return max(keylen+vallen, totlen) + return max(keylen + vallen, totlen) From e733dfa3984851c2f638872e02e4def99c480f46 Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Mon, 6 Jan 2025 17:34:57 -0500 Subject: [PATCH 25/26] Fix accounting bug found by CI --- openmsistream/kafka_wrapper/controlled_message_reproducer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmsistream/kafka_wrapper/controlled_message_reproducer.py b/openmsistream/kafka_wrapper/controlled_message_reproducer.py index 6028e0c9..ade5bf69 100644 --- a/openmsistream/kafka_wrapper/controlled_message_reproducer.py +++ b/openmsistream/kafka_wrapper/controlled_message_reproducer.py @@ -208,7 +208,7 @@ def __consume_messages_while_alive(self, consumer): with self.lock: self.n_msgs_processed += 1 self.n_msgs_processed_since_last_heartbeat += 1 - self.n_bytes_read_since_last_heartbeat += get_message_length(msg) + self.n_bytes_processed_since_last_heartbeat += get_message_length(msg) if not consumer.message_consumed_before(msg): tps = consumer.commit(msg) if tps is None: From f00462673bb3960ba166c61c366ae7d33c9e9bdb Mon Sep 17 00:00:00 2001 From: "Tyrel M. McQueen" Date: Mon, 6 Jan 2025 19:44:51 -0500 Subject: [PATCH 26/26] Require compatible openmsitoolbox version. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 8c9cf379..3f0c171d 100644 --- a/setup.py +++ b/setup.py @@ -59,7 +59,7 @@ "matplotlib", "methodtools", "msgpack", - "openmsitoolbox>=1.2.3", + "openmsitoolbox>=1.2.4", "watchdog>=3.0.0", ], extras_require={