Skip to content

Commit

Permalink
Merge pull request #24 from SciCatProject/integration-test
Browse files Browse the repository at this point in the history
Add more integration helpers.
  • Loading branch information
nitrosx authored May 31, 2024
2 parents eecc1ad + 89d1d88 commit 87c2061
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ jobs:
- run: python -m pip install -e .
- run: docker-compose version
- run: docker-compose -f tests/docker-compose-file-writer.yml up -d
- run: scicat_ingestor -c resources/config.sample.json --verbose
- run: python tests/_scicat_ingestor.py -c resources/config.sample.json --verbose
- run: docker-compose -f tests/docker-compose-file-writer.yml down
31 changes: 27 additions & 4 deletions src/scicat_ingestor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
import logging
from collections.abc import Generator
from contextlib import contextmanager

from scicat_configuration import build_main_arg_parser, build_scicat_config
from scicat_kafka import build_consumer
from scicat_kafka import build_consumer, wrdn_messages
from scicat_logging import build_logger


Expand All @@ -15,6 +17,22 @@ def quit(logger: logging.Logger, unexpected: bool = True) -> None:
sys.exit(1 if unexpected else 0)


@contextmanager
def exit_at_exceptions(logger: logging.Logger) -> Generator[None, None, None]:
"""Exit the program if an exception is raised."""
try:
yield
except KeyboardInterrupt:
logger.info("Received keyboard interrupt.")
quit(logger, unexpected=False)
except Exception as e:
logger.error("An exception occurred: %s", e)
quit(logger, unexpected=True)
else:
logger.error("Loop finished unexpectedly.")
quit(logger, unexpected=True)


def main() -> None:
"""Main entry point of the app."""
arg_parser = build_main_arg_parser()
Expand All @@ -26,6 +44,11 @@ def main() -> None:
logger.info('Starting the Scicat Ingestor with the following configuration:')
logger.info(config.to_dict())

# Kafka consumer
if build_consumer(config.kafka_options, logger) is None:
quit(logger)
with exit_at_exceptions(logger):
# Kafka consumer
if (consumer := build_consumer(config.kafka_options, logger)) is None:
raise RuntimeError("Failed to build the Kafka consumer")

# Receive messages
for message in wrdn_messages(consumer, logger):
logger.info("Processing message: %s", message)
61 changes: 61 additions & 0 deletions src/scicat_kafka.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
import logging
from collections.abc import Generator

from confluent_kafka import Consumer
from streaming_data_types import deserialise_wrdn
from streaming_data_types.finished_writing_wrdn import (
FILE_IDENTIFIER as WRDN_FILE_IDENTIFIER,
)
from streaming_data_types.finished_writing_wrdn import WritingFinished

from scicat_configuration import kafkaOptions

Expand Down Expand Up @@ -66,3 +72,58 @@ def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool:
else:
logger.info("Kafka consumer successfully instantiated")
return True


def _validate_data_type(message_content: bytes, logger: logging.Logger) -> bool:
logger.info("Data type: %s", (data_type := message_content[4:8]))
if data_type == WRDN_FILE_IDENTIFIER:
logger.info("WRDN message received.")
return True
else:
logger.error("Unexpected data type: %s", data_type)
return False


def _filter_error_encountered(
wrdn_content: WritingFinished, logger: logging.Logger
) -> WritingFinished | None:
"""Filter out messages with the ``error_encountered`` flag set to True."""
if wrdn_content.error_encountered:
logger.error(
"``error_encountered`` flag True. "
"Unable to deserialize message. Skipping the message."
)
return wrdn_content
else:
return None


def _deserialise_wrdn(
message_content: bytes, logger: logging.Logger
) -> WritingFinished | None:
if _validate_data_type(message_content, logger):
logger.info("Deserialising WRDN message")
wrdn_content: WritingFinished = deserialise_wrdn(message_content)
logger.info("Deserialised WRDN message: %.5000s", wrdn_content)
return _filter_error_encountered(wrdn_content, logger)


def wrdn_messages(
consumer: Consumer, logger: logging.Logger
) -> Generator[WritingFinished | None, None, None]:
"""Wait for a WRDN message and yield it.
Yield ``None`` if no message is received or an error is encountered.
"""
while True:
# The decision to proceed or stop will be done by the caller.
message = consumer.poll(timeout=1.0)
if message is None:
logger.info("Received no messages")
yield None
elif message.error():
logger.error("Consumer error: %s", message.error())
yield None
else:
logger.info("Received message.")
yield _deserialise_wrdn(message.value(), logger)
24 changes: 24 additions & 0 deletions tests/_scicat_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Entry point for integration test.
# All system arguments are passed to the ``scicat_ingestor``.


if __name__ == "__main__":
import signal
import subprocess
import sys
from time import sleep

# Run the main function in a subprocess
process = subprocess.Popen(
[
"scicat_ingestor",
*(sys.argv[1:] or ["--verbose", "-c", "resources/config.sample.json"]),
]
)

# Send a SIGINT signal to the process after 5 seconds
sleep(5)
process.send_signal(signal.SIGINT)

# Kill the process after 5 more seconds
sleep(5)

0 comments on commit 87c2061

Please sign in to comment.