From fdbe9c00f3355242fbb970bbfcac715244ce6e8e Mon Sep 17 00:00:00 2001 From: Max Novelli Date: Thu, 9 Jan 2025 17:11:06 +0100 Subject: [PATCH] added check on number of offline ingestors and relevant configuration --- resources/config.sample.json | 6 ++++- src/scicat_configuration.py | 2 ++ src/scicat_online_ingestor.py | 46 +++++++++++++++++++++++++++++------ 3 files changed, 45 insertions(+), 9 deletions(-) diff --git a/resources/config.sample.json b/resources/config.sample.json index ff2bb55..fb40ea3 100644 --- a/resources/config.sample.json +++ b/resources/config.sample.json @@ -1,4 +1,6 @@ { + "nexus_file" : "", + "done_writing_message_file": "", "config_file": "", "id": "", "dataset": { @@ -15,6 +17,8 @@ "ingestion": { "dry_run": false, "offline_ingestor_executable": "background_ingestor", + "max_offline_ingestors": 10, + "offline_ingestors_wait_time": 10, "schemas_directory": "schemas", "check_if_dataset_exists_by_pid": true, "check_if_dataset_exists_by_metadata": true, @@ -32,7 +36,7 @@ } }, "kafka": { - "topics": "KAFKA_TOPIC_1,KAFKA_TOPIC_2", + "topics": ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"], "group_id": "GROUP_ID", "bootstrap_servers": "localhost:9093", "security_protocol": "sasl_ssl", diff --git a/src/scicat_configuration.py b/src/scicat_configuration.py index 9ae9381..39bf726 100644 --- a/src/scicat_configuration.py +++ b/src/scicat_configuration.py @@ -226,6 +226,8 @@ class FileHandlingOptions: class IngestionOptions: dry_run: bool = False offline_ingestor_executable: str | list[str]= "background_ingestor" + max_offline_ingestors: int = 10 + offline_ingestors_wait_time: int = 10 schemas_directory: str = "schemas" check_if_dataset_exists_by_pid: bool = True check_if_dataset_exists_by_metadata: bool = True diff --git a/src/scicat_online_ingestor.py b/src/scicat_online_ingestor.py index c6ea099..230d74e 100644 --- a/src/scicat_online_ingestor.py +++ b/src/scicat_online_ingestor.py @@ -6,6 +6,7 @@ import logging import pathlib import subprocess +from time import sleep try: __version__ = importlib.metadata.version(__package__ or __name__) @@ -59,7 +60,22 @@ def dump_message_to_file_if_needed( logger.info("Message file saved") -def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logger): +def _individual_message_commit( + job_id, + message, + consumer, + logger: logging.Logger +): + logger.info("Executing commit for message with job id %s", job_id) + consumer.commit(message=message) + + +def _check_offline_ingestors( + offline_ingestors, + consumer, + config, + logger: logging.Logger +) -> int: logger.info("%s offline ingestors running", len(offline_ingestors)) for job_id, job_item in offline_ingestors.items(): result = job_item["proc"].poll() @@ -69,8 +85,10 @@ def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logg ) if result == 0: logger.info("Offline ingestor successful for job id %s", job_id) - logger.info("Executing commit for message with job id %s", job_id) - consumer.commit(message=job_item["message"]) + # if background process is successful + # check if we need to commit the individual message + if config.kafka.individual_message_commit: + _individual_message_commit(job_id,job_item["message"], consumer, logger) else: logger.error("Offline ingestor error for job id %s", job_id) logger.info( @@ -78,6 +96,8 @@ def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logg ) offline_ingestors.pop(job_id) + return len(offline_ingestors) + def build_online_config(logger: logging.Logger | None = None) -> OnlineIngestorConfig: arg_parser = build_arg_parser( @@ -169,16 +189,26 @@ def main() -> None: if config.ingestion.dry_run: logger.info("Dry run mode enabled. Skipping background ingestor.") else: + logger.info("Checking number of offline ingestor") + offline_ingestor_runnings: int = _check_offline_ingestors( + offline_ingestors, + consumer, + config, + logger) + while offline_ingestor_runnings >= config.ingestion.max_offline_ingestors: + sleep(config.ingestion.offline_ingestors_wait_time) + offline_ingestor_runnings = _check_offline_ingestors( + offline_ingestors, + consumer, + config, + logger) + + logger.info("Offline ingestors currently running {}".format(offline_ingestor_runnings)) logger.info("Running background ingestor with command above") proc = subprocess.Popen(cmd) # noqa: S603 # save info about the background process offline_ingestors[job_id] = {"proc": proc, "message": message} - # if background process is successful - # check if we need to commit the individual message - if config.kafka.individual_message_commit: - _individual_message_commit(offline_ingestors, consumer, logger) - if __name__ == "__main__": main()