Skip to content

Commit

Permalink
Merge pull request #69 from tmcqueen-materials/logging-rework
Browse files Browse the repository at this point in the history
Logging rework
  • Loading branch information
davidelbert authored Jan 7, 2025
2 parents 9df39e5 + f004626 commit 8312629
Show file tree
Hide file tree
Showing 15 changed files with 97 additions and 119 deletions.
5 changes: 1 addition & 4 deletions openmsistream/data_file_io/actor/data_file_chunk_handlers.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
"""Anything that receives DataFileChunk messages from a topic and does something with them"""

# imports
import warnings
from abc import ABC, abstractmethod
from threading import Lock

with warnings.catch_warnings():
warnings.simplefilter("ignore")
from kafkacrypto import KafkaCryptoMessage
from kafkacrypto import KafkaCryptoMessage
from openmsitoolbox import LogOwner
from ...kafka_wrapper.controlled_message_processor import ControlledMessageProcessor
from ...kafka_wrapper.controlled_message_reproducer import ControlledMessageReproducer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
"""

# imports
import datetime, warnings
import datetime

with warnings.catch_warnings():
warnings.simplefilter("ignore")
from kafkacrypto.message import KafkaCryptoMessage
from kafkacrypto.message import KafkaCryptoMessage
from openmsitoolbox import Runnable
from ...utilities import OpenMSIStreamArgumentParser
from ...utilities.config import RUN_CONST
Expand Down
6 changes: 2 additions & 4 deletions openmsistream/data_file_io/actor/data_file_stream_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
"""

# imports
import pathlib, warnings
import pathlib
from abc import ABC

with warnings.catch_warnings():
warnings.simplefilter("ignore")
from kafkacrypto.message import KafkaCryptoMessage
from kafkacrypto.message import KafkaCryptoMessage
from openmsitoolbox import Runnable
from openmsitoolbox.utilities.misc import populated_kwargs
from ...utilities import OpenMSIStreamArgumentParser
Expand Down
34 changes: 3 additions & 31 deletions openmsistream/kafka_wrapper/controlled_message_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@
"""

# imports
import warnings
from abc import ABC, abstractmethod

with warnings.catch_warnings():
warnings.simplefilter("ignore")
from kafkacrypto import KafkaCryptoMessage
from kafkacrypto.confluent_kafka_wrapper import Message
from ..utilities.heartbeat_producibles import MessageProcessorHeartbeatProducible
from ..utilities.messages import get_message_length
from ..utilities.controlled_processes_heartbeats import (
ControlledProcessMultiThreadedHeartbeats,
)
Expand Down Expand Up @@ -127,19 +123,7 @@ def __do_alive_loop_iteration(self, consumer):
with self.lock:
self.n_msgs_read += 1
self.n_msgs_read_since_last_heartbeat += 1
if (
hasattr(msg, "key")
and hasattr(msg, "value")
and (
isinstance(msg.key, KafkaCryptoMessage)
or isinstance(msg.value, KafkaCryptoMessage)
)
):
self.n_bytes_read_since_last_heartbeat += len(bytes(msg))
elif isinstance(msg, Message):
self.n_bytes_read_since_last_heartbeat += len(msg.value)
else:
self.n_bytes_read_since_last_heartbeat += len(msg)
self.n_bytes_read_since_last_heartbeat += get_message_length(msg)
self.last_message = msg
# send the message to the _process_message function
retval = self._process_message(self.lock, msg)
Expand All @@ -148,19 +132,7 @@ def __do_alive_loop_iteration(self, consumer):
with self.lock:
self.n_msgs_processed += 1
self.n_msgs_processed_since_last_heartbeat += 1
if (
hasattr(msg, "key")
and hasattr(msg, "value")
and (
isinstance(msg.key, KafkaCryptoMessage)
or isinstance(msg.value, KafkaCryptoMessage)
)
):
self.n_bytes_processed_since_last_heartbeat += len(bytes(msg))
elif isinstance(msg, Message):
self.n_bytes_processed_since_last_heartbeat += len(msg.value)
else:
self.n_bytes_processed_since_last_heartbeat += len(msg)
self.n_bytes_read_since_last_heartbeat += get_message_length(msg)
if not consumer.message_consumed_before(msg):
tps = consumer.commit(msg)
if tps is None:
Expand Down
35 changes: 4 additions & 31 deletions openmsistream/kafka_wrapper/controlled_message_reproducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
"""

# imports
import time, warnings
import time
from abc import ABC, abstractmethod
from queue import Queue

with warnings.catch_warnings():
warnings.simplefilter("ignore")
from kafkacrypto import KafkaCryptoMessage
from kafkacrypto.confluent_kafka_wrapper import Message
from ..utilities.config import RUN_CONST
from ..utilities.heartbeat_producibles import MessageReproducerHeartbeatProducible
from ..utilities.messages import get_message_length
from ..utilities.controlled_processes_heartbeats import (
ControlledProcessMultiThreadedHeartbeats,
)
Expand Down Expand Up @@ -202,19 +199,7 @@ def __consume_messages_while_alive(self, consumer):
with self.lock:
self.n_msgs_read += 1
self.n_msgs_read_since_last_heartbeat += 1
if (
hasattr(msg, "key")
and hasattr(msg, "value")
and (
isinstance(msg.key, KafkaCryptoMessage)
or isinstance(msg.value, KafkaCryptoMessage)
)
):
self.n_bytes_read_since_last_heartbeat += len(bytes(msg))
elif isinstance(msg, Message):
self.n_bytes_read_since_last_heartbeat += len(msg.value)
else:
self.n_bytes_read_since_last_heartbeat += len(msg)
self.n_bytes_read_since_last_heartbeat += get_message_length(msg)
self.last_message = msg
# send the message to the _process_message function
retval = self._process_message(self.lock, msg)
Expand All @@ -223,19 +208,7 @@ def __consume_messages_while_alive(self, consumer):
with self.lock:
self.n_msgs_processed += 1
self.n_msgs_processed_since_last_heartbeat += 1
if (
hasattr(msg, "key")
and hasattr(msg, "value")
and (
isinstance(msg.key, KafkaCryptoMessage)
or isinstance(msg.value, KafkaCryptoMessage)
)
):
self.n_bytes_processed_since_last_heartbeat += len(bytes(msg))
elif isinstance(msg, Message):
self.n_bytes_processed_since_last_heartbeat += len(msg.value)
else:
self.n_bytes_processed_since_last_heartbeat += len(msg)
self.n_bytes_processed_since_last_heartbeat += get_message_length(msg)
if not consumer.message_consumed_before(msg):
tps = consumer.commit(msg)
if tps is None:
Expand Down
10 changes: 5 additions & 5 deletions openmsistream/kafka_wrapper/openmsistream_consumer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""A wrapped Kafka Consumer. May consume encrypted messages."""

# imports
import uuid, warnings, gc
import uuid, gc, logging
import methodtools
from confluent_kafka import DeserializingConsumer, Message

with warnings.catch_warnings():
warnings.simplefilter("ignore")
from kafkacrypto import KafkaConsumer
from kafkacrypto import KafkaConsumer
from openmsitoolbox import LogOwner
from openmsitoolbox.utilities.misc import (
raise_err_with_optional_logger,
Expand Down Expand Up @@ -177,7 +175,9 @@ def get_consumer_args_kwargs(
)
debug_msg_with_optional_logger(logger, debugmsg)
k_c = OpenMSIStreamKafkaCrypto(
parser.broker_configs, parser.kc_config_file_str
parser.broker_configs,
parser.kc_config_file_str,
logging.WARNING if (logger.level is None) else logger.level,
)
if "key.deserializer" in all_configs:
keydes = CompoundDeserializer(
Expand Down
37 changes: 21 additions & 16 deletions openmsistream/kafka_wrapper/openmsistream_kafka_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
"""

# imports
import pathlib, warnings, logging, configparser
import pathlib, configparser

with warnings.catch_warnings():
warnings.simplefilter("ignore")
from kafkacrypto import KafkaProducer, KafkaConsumer, KafkaCrypto, KafkaCryptoStore
from kafkacrypto import KafkaProducer, KafkaConsumer, KafkaCrypto, KafkaCryptoStore
from openmsitoolbox.utilities.misc import change_dir


Expand All @@ -20,7 +18,7 @@ class OpenMSIStreamKafkaCrypto:
to the broker
:type broker_configs: dict
:param config_file: the path to the KafkaCrypto config file for the node being used
:type config_file: :class:`pathlib.Path`
:type config_file: str
"""

@property
Expand Down Expand Up @@ -58,20 +56,20 @@ def value_deserializer(self):
"""
return self._kc.getValueDeserializer()

def __init__(self, broker_configs, config_file):
def __init__(self, broker_configs, config_file, log_level):
"""
Constructor method
"""
kcp_cfgs, kcc_cfgs = self.__get_configs_from_file(broker_configs, config_file)
# get kafka crypto configs, and set logging level for kafkacrypto loggers
kcp_cfgs, kcc_cfgs = self.__get_configs_from_file(
broker_configs, pathlib.Path(config_file), log_level
)
with change_dir(pathlib.Path(config_file).parent):
kc_logger = logging.getLogger("kafkacrypto")
kc_logger.setLevel(logging.ERROR)
# start up the producer and consumer
self._kcp = KafkaProducer(**kcp_cfgs)
self._kcc = KafkaConsumer(**kcc_cfgs)
# initialize the KafkaCrypto object
self._kc = KafkaCrypto(None, self._kcp, self._kcc, config_file)
kc_logger.setLevel(logging.WARNING)
self.__config_file = config_file

def close(self):
Expand All @@ -87,26 +85,33 @@ def close(self):
self._kcp = None
self._kcc = None

def __get_configs_from_file(self, broker_configs, config_file):
def __get_configs_from_file(self, broker_configs, config_file, default_log_level):
"""Return the dictionaries of crypto producer and consumer configs determined
from the KafkaCrypto config file and overwritten with the given broker configs
from the OpenMSIStream config file. KafkaCryptoStore must be used when parsing
the crypto config file to ensure options (and clearing of options) is properly
handled.
"""
# Make updates to kafkaconfig file according to what we are passed
config = configparser.ConfigParser(delimiters=":")
config.read(config_file)
# Unilaterally pdate default log_level (can be overridden in -crypto subsection by user)
section_name = f"{config_file.stem}"
config.set(section_name, "log_level", str(default_log_level))
# If ssl.ca.location is set in the broker configs, make sure it's written to the
# KafkaCrypto config file as well in the right place
if "ssl.ca.location" in broker_configs:
config = configparser.ConfigParser(delimiters=":")
config.read(config_file)
section_name = f"{config_file.stem}-kafka"
option_name = "ssl_cafile"
if config.has_option(section_name, option_name):
config.set(section_name, option_name, broker_configs["ssl.ca.location"])
with open(config_file, "w") as cfg_fp:
config.write(cfg_fp)
# Save changes to config_file
with open(config_file, "w") as cfg_fp:
config.write(cfg_fp)
# Parse the config file and get consumer and producer configs
cfg_parser = KafkaCryptoStore(config_file, conf_global_logger=False)
cfg_parser = KafkaCryptoStore(
str(config_file), conf_global_logger=False
) # this sets logging levels for kafkacrypto loggers
kcc_cfgs = cfg_parser.get_kafka_config("consumer", extra="crypto")
kcp_cfgs = cfg_parser.get_kafka_config("producer", extra="crypto")
cfg_parser.close()
Expand Down
10 changes: 5 additions & 5 deletions openmsistream/kafka_wrapper/openmsistream_producer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
"""A wrapped Kafka Producer. May produce encrypted messages."""

# imports
import time, warnings, gc
import time, gc, logging
from confluent_kafka import SerializingProducer

with warnings.catch_warnings():
warnings.simplefilter("ignore")
from kafkacrypto import KafkaProducer
from kafkacrypto import KafkaProducer
from openmsitoolbox import LogOwner
from openmsitoolbox.utilities.misc import (
raise_err_with_optional_logger,
Expand Down Expand Up @@ -125,7 +123,9 @@ def get_producer_args_kwargs(
)
debug_msg_with_optional_logger(logger, debugmsg)
k_c = OpenMSIStreamKafkaCrypto(
parser.broker_configs, parser.kc_config_file_str
parser.broker_configs,
parser.kc_config_file_str,
logging.WARNING if (logger is None) else logger.level,
)
if "key.serializer" in all_configs:
keyser = CompoundSerializer(
Expand Down
6 changes: 2 additions & 4 deletions openmsistream/kafka_wrapper/serialization.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
"""Classes for serialization and deserialization operations"""

# imports
import pathlib, inspect, time, warnings
import pathlib, inspect, time
from hashlib import sha512
import msgpack
from confluent_kafka.error import SerializationError
from confluent_kafka.serialization import Serializer, Deserializer

with warnings.catch_warnings():
warnings.simplefilter("ignore")
from kafkacrypto.message import KafkaCryptoMessage, KafkaCryptoMessageError
from kafkacrypto.message import KafkaCryptoMessage, KafkaCryptoMessageError
from ..data_file_io.entity.data_file_chunk import DataFileChunk

####################### COMPOUND (DE)SERIALIZERS FOR STACKING MULTIPLE STEPS #######################
Expand Down
6 changes: 1 addition & 5 deletions openmsistream/kafka_wrapper/utilities.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
"""Miscellaneous functions/classes used elsewhere in the Kafka wrapper"""

# imports
import warnings
from collections import namedtuple
from confluent_kafka import OFFSET_BEGINNING

with warnings.catch_warnings():
warnings.simplefilter("ignore")
from kafkacrypto import KafkaConsumer
from kafkacrypto import KafkaConsumer


def add_kwargs_to_configs(configs, logger, **kwargs):
Expand Down
10 changes: 5 additions & 5 deletions openmsistream/tools/provision_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
"""

# imports
import pathlib, shutil, logging, warnings
import pathlib, shutil, logging
from argparse import ArgumentParser
import urllib.request

with warnings.catch_warnings():
warnings.simplefilter("ignore")
import kafkacrypto
import kafkacrypto
from openmsitoolbox.logging import OpenMSILogger
from openmsitoolbox.utilities.misc import change_dir
from ..utilities.config import RUN_CONST
from ..utilities.config_file_parser import ConfigFileParser

# constants
LOGGER = OpenMSILogger("ProvisionNode", logging.INFO)
LOGGER = OpenMSILogger(
"ProvisionNode", logging.INFO
) # Instead of INFO, load from a file somewhere?
KC_PATH = kafkacrypto.__path__
SP_NAME = "simple-provision.py"
OP_NAME = "online-provision.py"
Expand Down
Loading

0 comments on commit 8312629

Please sign in to comment.