From 099d98fefa3a82b08ef4506b6dc99bdef8296764 Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Tue, 28 May 2024 11:11:11 +0200 Subject: [PATCH 1/5] Add process exit handling context manager, wrdn message consuming generator, and integration test entry script. --- .github/workflows/integration.yml | 2 +- src/scicat_ingestor.py | 30 ++++++++++++++++++++++++++---- src/scicat_kafka.py | 31 +++++++++++++++++++++++++++++++ tests/_scicat_ingestor.py | 24 ++++++++++++++++++++++++ 4 files changed, 82 insertions(+), 5 deletions(-) create mode 100644 tests/_scicat_ingestor.py diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index dfeb8b2..c226d41 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -27,5 +27,5 @@ jobs: - run: python -m pip install -e . - run: docker-compose version - run: docker-compose -f tests/docker-compose-file-writer.yml up -d - - run: scicat_ingestor -c resources/config.sample.json --verbose + - run: python tests/_scicat_ingestor.py -c resources/config.sample.json --verbose - run: docker-compose -f tests/docker-compose-file-writer.yml down diff --git a/src/scicat_ingestor.py b/src/scicat_ingestor.py index 879de0c..eed629f 100644 --- a/src/scicat_ingestor.py +++ b/src/scicat_ingestor.py @@ -1,9 +1,11 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) import logging +from collections.abc import Generator +from contextlib import contextmanager from scicat_configuration import build_main_arg_parser, build_scicat_config -from scicat_kafka import build_consumer +from scicat_kafka import build_consumer, wrdn_messages from scicat_logging import build_logger @@ -15,6 +17,22 @@ def quit(logger: logging.Logger, unexpected: bool = True) -> None: sys.exit(1 if unexpected else 0) +@contextmanager +def exit_at_exceptions(logger: logging.Logger) -> Generator[None, None, None]: + """Exit the program if an exception is raised.""" + try: + yield + except KeyboardInterrupt: + logger.info("Received keyboard interrupt.") + quit(logger, unexpected=False) + except Exception as e: + logger.error("An exception occurred: %s", e) + quit(logger, unexpected=True) + else: + logger.error("Loop finished unexpectedly.") + quit(logger, unexpected=True) + + def main() -> None: """Main entry point of the app.""" arg_parser = build_main_arg_parser() @@ -26,6 +44,10 @@ def main() -> None: logger.info('Starting the Scicat Ingestor with the following configuration:') logger.info(config.to_dict()) - # Kafka consumer - if build_consumer(config.kafka_options, logger) is None: - quit(logger) + with exit_at_exceptions(logger): + # Kafka consumer + if (consumer := build_consumer(config.kafka_options, logger)) is None: + raise RuntimeError("Failed to build the Kafka consumer") + + for message in wrdn_messages(consumer, logger): + logger.info(f"Received message: {message}") diff --git a/src/scicat_kafka.py b/src/scicat_kafka.py index a832867..72a228a 100644 --- a/src/scicat_kafka.py +++ b/src/scicat_kafka.py @@ -1,8 +1,14 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) import logging +from collections.abc import Generator from confluent_kafka import Consumer +from streaming_data_types import deserialise_wrdn +from streaming_data_types.finished_writing_wrdn import ( + FILE_IDENTIFIER as WRDN_FILE_IDENTIFIER, +) +from streaming_data_types.finished_writing_wrdn import WritingFinished from scicat_configuration import kafkaOptions @@ -66,3 +72,28 @@ def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool: else: logger.info("Kafka consumer successfully instantiated") return True + + +def wrdn_messages( + consumer: Consumer, logger: logging.Logger +) -> Generator[WritingFinished, None, None]: + """Wait for a WRDN message and yield it.""" + while True: + message = consumer.poll(timeout=1.0) + if message is None: + logger.info("Received no messages") + continue + elif message.error(): + logger.error("Consumer error: %s", message.error()) + continue + else: + logger.info("Received message.") + + message_content: str = message.value() + data_type = message_content[4:8] + logger.info("Data type: %s", data_type) + if data_type == WRDN_FILE_IDENTIFIER: + logger.info("Deserialising WRDN message") + yield deserialise_wrdn(message_content) + else: + logger.error("Unexpected data type: %s", data_type) diff --git a/tests/_scicat_ingestor.py b/tests/_scicat_ingestor.py new file mode 100644 index 0000000..abebc25 --- /dev/null +++ b/tests/_scicat_ingestor.py @@ -0,0 +1,24 @@ +# Entry point for integration test. +# All system arguments are passed to the ``scicat_ingestor``. + + +if __name__ == "__main__": + import signal + import subprocess + import sys + from time import sleep + + # Run the main function in a subprocess + process = subprocess.Popen( + [ + "scicat_ingestor", + *(sys.argv[1:] or ["--verbose", "-c", "resources/config.sample.json"]), + ] + ) + + # Send a SIGINT signal to the process after 5 seconds + sleep(5) + process.send_signal(signal.SIGINT) + + # Kill the process after 5 more seconds + sleep(5) From 605cfc496a65f92d63466d6abd789c40b2f68264 Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Tue, 28 May 2024 11:35:08 +0200 Subject: [PATCH 2/5] Skip the error-encountered file. --- src/scicat_kafka.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/scicat_kafka.py b/src/scicat_kafka.py index 72a228a..4d8cadf 100644 --- a/src/scicat_kafka.py +++ b/src/scicat_kafka.py @@ -94,6 +94,14 @@ def wrdn_messages( logger.info("Data type: %s", data_type) if data_type == WRDN_FILE_IDENTIFIER: logger.info("Deserialising WRDN message") - yield deserialise_wrdn(message_content) + wrdn_content: WritingFinished = deserialise_wrdn(message_content) + if wrdn_content.error_encountered: + logger.error( + "``error_encountered`` flag True. " + "Unable to deserialize message. Skipping message." + ) + continue + else: + yield wrdn_content else: logger.error("Unexpected data type: %s", data_type) From 85329cbf16fa585529441ef63ab56e9beb366836 Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Tue, 28 May 2024 11:36:25 +0200 Subject: [PATCH 3/5] Add more comments and log message. --- src/scicat_ingestor.py | 1 + src/scicat_kafka.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/scicat_ingestor.py b/src/scicat_ingestor.py index eed629f..b399f6b 100644 --- a/src/scicat_ingestor.py +++ b/src/scicat_ingestor.py @@ -49,5 +49,6 @@ def main() -> None: if (consumer := build_consumer(config.kafka_options, logger)) is None: raise RuntimeError("Failed to build the Kafka consumer") + # Receive messages for message in wrdn_messages(consumer, logger): logger.info(f"Received message: {message}") diff --git a/src/scicat_kafka.py b/src/scicat_kafka.py index 4d8cadf..ea2bb7b 100644 --- a/src/scicat_kafka.py +++ b/src/scicat_kafka.py @@ -98,10 +98,11 @@ def wrdn_messages( if wrdn_content.error_encountered: logger.error( "``error_encountered`` flag True. " - "Unable to deserialize message. Skipping message." + "Unable to deserialize message. Skipping the message." ) continue else: + logger.info("Successfully desrialized a WRDN message.") yield wrdn_content else: logger.error("Unexpected data type: %s", data_type) From e7a599bd7faffe272b6374aa728ec34e332bd696 Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Tue, 28 May 2024 11:41:27 +0200 Subject: [PATCH 4/5] Preview the wrdn message. --- src/scicat_kafka.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/scicat_kafka.py b/src/scicat_kafka.py index ea2bb7b..90bdf7b 100644 --- a/src/scicat_kafka.py +++ b/src/scicat_kafka.py @@ -95,6 +95,7 @@ def wrdn_messages( if data_type == WRDN_FILE_IDENTIFIER: logger.info("Deserialising WRDN message") wrdn_content: WritingFinished = deserialise_wrdn(message_content) + logger.info("Deserialised WRDN message: %.5000s", wrdn_content) if wrdn_content.error_encountered: logger.error( "``error_encountered`` flag True. " From 89d1d888fa0d8d707fd2570b46f8873518f76d1e Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Tue, 28 May 2024 15:34:50 +0200 Subject: [PATCH 5/5] Split long function into smaller functions. --- src/scicat_ingestor.py | 2 +- src/scicat_kafka.py | 66 +++++++++++++++++++++++++++--------------- 2 files changed, 44 insertions(+), 24 deletions(-) diff --git a/src/scicat_ingestor.py b/src/scicat_ingestor.py index b399f6b..72ff38f 100644 --- a/src/scicat_ingestor.py +++ b/src/scicat_ingestor.py @@ -51,4 +51,4 @@ def main() -> None: # Receive messages for message in wrdn_messages(consumer, logger): - logger.info(f"Received message: {message}") + logger.info("Processing message: %s", message) diff --git a/src/scicat_kafka.py b/src/scicat_kafka.py index 90bdf7b..26535aa 100644 --- a/src/scicat_kafka.py +++ b/src/scicat_kafka.py @@ -74,36 +74,56 @@ def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool: return True +def _validate_data_type(message_content: bytes, logger: logging.Logger) -> bool: + logger.info("Data type: %s", (data_type := message_content[4:8])) + if data_type == WRDN_FILE_IDENTIFIER: + logger.info("WRDN message received.") + return True + else: + logger.error("Unexpected data type: %s", data_type) + return False + + +def _filter_error_encountered( + wrdn_content: WritingFinished, logger: logging.Logger +) -> WritingFinished | None: + """Filter out messages with the ``error_encountered`` flag set to True.""" + if wrdn_content.error_encountered: + logger.error( + "``error_encountered`` flag True. " + "Unable to deserialize message. Skipping the message." + ) + return wrdn_content + else: + return None + + +def _deserialise_wrdn( + message_content: bytes, logger: logging.Logger +) -> WritingFinished | None: + if _validate_data_type(message_content, logger): + logger.info("Deserialising WRDN message") + wrdn_content: WritingFinished = deserialise_wrdn(message_content) + logger.info("Deserialised WRDN message: %.5000s", wrdn_content) + return _filter_error_encountered(wrdn_content, logger) + + def wrdn_messages( consumer: Consumer, logger: logging.Logger -) -> Generator[WritingFinished, None, None]: - """Wait for a WRDN message and yield it.""" +) -> Generator[WritingFinished | None, None, None]: + """Wait for a WRDN message and yield it. + + Yield ``None`` if no message is received or an error is encountered. + """ while True: + # The decision to proceed or stop will be done by the caller. message = consumer.poll(timeout=1.0) if message is None: logger.info("Received no messages") - continue + yield None elif message.error(): logger.error("Consumer error: %s", message.error()) - continue + yield None else: logger.info("Received message.") - - message_content: str = message.value() - data_type = message_content[4:8] - logger.info("Data type: %s", data_type) - if data_type == WRDN_FILE_IDENTIFIER: - logger.info("Deserialising WRDN message") - wrdn_content: WritingFinished = deserialise_wrdn(message_content) - logger.info("Deserialised WRDN message: %.5000s", wrdn_content) - if wrdn_content.error_encountered: - logger.error( - "``error_encountered`` flag True. " - "Unable to deserialize message. Skipping the message." - ) - continue - else: - logger.info("Successfully desrialized a WRDN message.") - yield wrdn_content - else: - logger.error("Unexpected data type: %s", data_type) + yield _deserialise_wrdn(message.value(), logger)