Skip to content

Commit

Permalink
added check on number of offline ingestors and relevant configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
nitrosx committed Jan 9, 2025
1 parent a50458e commit fdbe9c0
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 9 deletions.
6 changes: 5 additions & 1 deletion resources/config.sample.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{
"nexus_file" : "",
"done_writing_message_file": "",
"config_file": "",
"id": "",
"dataset": {
Expand All @@ -15,6 +17,8 @@
"ingestion": {
"dry_run": false,
"offline_ingestor_executable": "background_ingestor",
"max_offline_ingestors": 10,
"offline_ingestors_wait_time": 10,
"schemas_directory": "schemas",
"check_if_dataset_exists_by_pid": true,
"check_if_dataset_exists_by_metadata": true,
Expand All @@ -32,7 +36,7 @@
}
},
"kafka": {
"topics": "KAFKA_TOPIC_1,KAFKA_TOPIC_2",
"topics": ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"],
"group_id": "GROUP_ID",
"bootstrap_servers": "localhost:9093",
"security_protocol": "sasl_ssl",
Expand Down
2 changes: 2 additions & 0 deletions src/scicat_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ class FileHandlingOptions:
class IngestionOptions:
dry_run: bool = False
offline_ingestor_executable: str | list[str]= "background_ingestor"
max_offline_ingestors: int = 10
offline_ingestors_wait_time: int = 10
schemas_directory: str = "schemas"
check_if_dataset_exists_by_pid: bool = True
check_if_dataset_exists_by_metadata: bool = True
Expand Down
46 changes: 38 additions & 8 deletions src/scicat_online_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import pathlib
import subprocess
from time import sleep

try:
__version__ = importlib.metadata.version(__package__ or __name__)
Expand Down Expand Up @@ -59,7 +60,22 @@ def dump_message_to_file_if_needed(
logger.info("Message file saved")


def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logger):
def _individual_message_commit(
job_id,
message,
consumer,
logger: logging.Logger
):
logger.info("Executing commit for message with job id %s", job_id)
consumer.commit(message=message)


def _check_offline_ingestors(
offline_ingestors,
consumer,
config,
logger: logging.Logger
) -> int:
logger.info("%s offline ingestors running", len(offline_ingestors))
for job_id, job_item in offline_ingestors.items():
result = job_item["proc"].poll()
Expand All @@ -69,15 +85,19 @@ def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logg
)
if result == 0:
logger.info("Offline ingestor successful for job id %s", job_id)
logger.info("Executing commit for message with job id %s", job_id)
consumer.commit(message=job_item["message"])
# if background process is successful
# check if we need to commit the individual message
if config.kafka.individual_message_commit:
_individual_message_commit(job_id,job_item["message"], consumer, logger)
else:
logger.error("Offline ingestor error for job id %s", job_id)
logger.info(
"Removed ingestor for message with job id %s from queue", job_id
)
offline_ingestors.pop(job_id)

return len(offline_ingestors)


def build_online_config(logger: logging.Logger | None = None) -> OnlineIngestorConfig:
arg_parser = build_arg_parser(
Expand Down Expand Up @@ -169,16 +189,26 @@ def main() -> None:
if config.ingestion.dry_run:
logger.info("Dry run mode enabled. Skipping background ingestor.")
else:
logger.info("Checking number of offline ingestor")
offline_ingestor_runnings: int = _check_offline_ingestors(
offline_ingestors,
consumer,
config,
logger)
while offline_ingestor_runnings >= config.ingestion.max_offline_ingestors:
sleep(config.ingestion.offline_ingestors_wait_time)
offline_ingestor_runnings = _check_offline_ingestors(
offline_ingestors,
consumer,
config,
logger)

logger.info("Offline ingestors currently running {}".format(offline_ingestor_runnings))
logger.info("Running background ingestor with command above")
proc = subprocess.Popen(cmd) # noqa: S603
# save info about the background process
offline_ingestors[job_id] = {"proc": proc, "message": message}

# if background process is successful
# check if we need to commit the individual message
if config.kafka.individual_message_commit:
_individual_message_commit(offline_ingestors, consumer, logger)


if __name__ == "__main__":
main()

0 comments on commit fdbe9c0

Please sign in to comment.