From 7b3fb16b015b0c25fc3cdad5cfdc6941700c6b48 Mon Sep 17 00:00:00 2001 From: Werner Van Geit Date: Fri, 26 Apr 2024 15:34:19 +0200 Subject: [PATCH] Adding extra safety mechanisms in case of failed jobs --- .osparc/osparc-map/runtime.yml | 2 +- Makefile | 7 + docker_scripts/map.py | 247 +++++++++++++++++++-------------- 3 files changed, 152 insertions(+), 104 deletions(-) diff --git a/.osparc/osparc-map/runtime.yml b/.osparc/osparc-map/runtime.yml index cdef230..180edd3 100755 --- a/.osparc/osparc-map/runtime.yml +++ b/.osparc/osparc-map/runtime.yml @@ -25,7 +25,7 @@ compose-spec: osparc-map: image: $${SIMCORE_REGISTRY}/simcore/services/dynamic/osparc-map:$${SERVICE_VERSION} environment: - - OSPARC_API_HOST=https://api.osparc-master.speag.com + - OSPARC_API_HOST=$${OSPARC_VARIABLE_API_HOST} - OSPARC_API_KEY=$${OSPARC_VARIABLE_API_KEY} - OSPARC_API_SECRET=$${OSPARC_VARIABLE_API_SECRET} container-http-entrypoint: osparc-map diff --git a/Makefile b/Makefile index 120ff74..9e73998 100755 --- a/Makefile +++ b/Makefile @@ -7,6 +7,7 @@ MAKEFLAGS += -j2 export DOCKER_IMAGE_NAME ?= osparc-map export DOCKER_IMAGE_TAG ?= 0.0.7 +export MASTER_AWS_REGISTRY ?= registry.osparc-master-zmt.click export MASTER_REGISTRY ?= registry.osparc-master.speag.com export LOCAL_REGISTRY ?= registry:5000 @@ -66,6 +67,12 @@ publish-master: ## push to local throw away registry to test integration docker push $(MASTER_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG) @curl $(MASTER_REGISTRY)/v2/_catalog | jq +.PHONY: publish-master-aws +publish-master-aws: ## push to local throw away registry to test integration + docker tag simcore/services/dynamic/${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_TAG} $(MASTER_AWS_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG) + docker push $(MASTER_AWS_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG) + @curl $(MASTER_AWS_REGISTRY)/v2/_catalog | jq + .PHONY: help help: ## this colorful help @echo "Recipes for '$(notdir $(CURDIR))':" diff --git a/docker_scripts/map.py b/docker_scripts/map.py index 9524f00..ccdc7e6 100755 --- a/docker_scripts/map.py +++ b/docker_scripts/map.py @@ -22,6 +22,9 @@ ) logger = logging.getLogger(__name__) +MAX_JOB_CREATE_ATTEMPTS = 5 +MAX_TRIALS = 5 + HTTP_PORT = 8888 POLLING_INTERVAL = 1 # second @@ -65,10 +68,27 @@ def __init__(self, *args, **kwargs): @contextlib.contextmanager def create_study_job(template_id, job_inputs, studies_api): - job = studies_api.create_study_job( - study_id=template_id, - job_inputs=job_inputs, - ) + n_of_create_attempts = 0 + while True: + try: + n_of_create_attempts += 1 + job = studies_api.create_study_job( + study_id=template_id, + job_inputs=job_inputs, + ) + break + except osparc_client.exceptions.ApiException: + if n_of_create_attempts >= MAX_JOB_CREATE_ATTEMPTS: + raise Exception( + f"Tried {n_of_create_attempts} to create a job from " + "the study, but failed" + ) + else: + logger.info( + "Received an API Exception from server " + "when creating job, retrying..." + ) + try: yield job finally: @@ -170,6 +190,7 @@ def start(self): logger.info( f"Waiting for input file at {self.input_tasks_path}..." ) + self.handshaker.retry_last_write() time.sleep(self.polling_interval) waiter += 1 @@ -222,113 +243,133 @@ def start(self): def run_tasks(self, tasks_uuid, input_tasks, n_of_workers): logger.info(f"Evaluating: {input_tasks}") - def map_func(task): - logger.info(f"Running worker for task: {task}") - - input = task["input"] - output = task["output"] - - job_inputs = {"values": {}} - - for param_name, param_input in input.items(): - param_type = param_input["type"] - param_value = param_input["value"] - if param_type == "FileJSON": - param_filename = param_input["filename"] - tmp_dir = tempfile.TemporaryDirectory() - tmp_dir_path = pl.Path(tmp_dir.name) - tmp_input_file_path = tmp_dir_path / param_filename - tmp_input_file_path.write_text(json.dumps(param_value)) - - input_data_file = osparc.FilesApi( - self.api_client - ).upload_file(file=tmp_input_file_path) - job_inputs["values"][param_name] = input_data_file - elif param_type == "file": - file_info = json.loads(param_value) - input_data_file = osparc_client.models.file.File( - id=file_info["id"], - filename=file_info["filename"], - content_type=file_info["content_type"], - checksum=file_info["checksum"], - e_tag=file_info["e_tag"], - ) - job_inputs["values"][param_name] = input_data_file - elif param_type == "integer": - job_inputs["values"][param_name] = int(param_value) - elif param_type == "float": - job_inputs["values"][param_name] = float(param_value) - else: - job_inputs["values"][param_name] = param_value - - logger.debug(f"Sending inputs: {job_inputs}") - - with create_study_job( - self.template_id, job_inputs, self.studies_api - ) as job: - job_status = self.studies_api.start_study_job( - study_id=self.template_id, job_id=job.id - ) - - while ( - job_status.state != "SUCCESS" - and job_status.state != "FAILED" - ): - job_status = self.studies_api.inspect_study_job( + def map_func(task, trial_number=1): + try: + logger.info(f"Running worker for task: {task}") + + input = task["input"] + output = task["output"] + + job_inputs = {"values": {}} + + for param_name, param_input in input.items(): + param_type = param_input["type"] + param_value = param_input["value"] + if param_type == "FileJSON": + param_filename = param_input["filename"] + tmp_dir = tempfile.TemporaryDirectory() + tmp_dir_path = pl.Path(tmp_dir.name) + tmp_input_file_path = tmp_dir_path / param_filename + tmp_input_file_path.write_text(json.dumps(param_value)) + + input_data_file = osparc.FilesApi( + self.api_client + ).upload_file(file=tmp_input_file_path) + job_inputs["values"][param_name] = input_data_file + elif param_type == "file": + file_info = json.loads(param_value) + input_data_file = osparc_client.models.file.File( + id=file_info["id"], + filename=file_info["filename"], + content_type=file_info["content_type"], + checksum=file_info["checksum"], + e_tag=file_info["e_tag"], + ) + job_inputs["values"][param_name] = input_data_file + elif param_type == "integer": + job_inputs["values"][param_name] = int(param_value) + elif param_type == "float": + job_inputs["values"][param_name] = float(param_value) + else: + job_inputs["values"][param_name] = param_value + + logger.debug(f"Sending inputs: {job_inputs}") + + with create_study_job( + self.template_id, job_inputs, self.studies_api + ) as job: + job_status = self.studies_api.start_study_job( study_id=self.template_id, job_id=job.id ) - time.sleep(1) - task["status"] = job_status.state + while ( + job_status.state != "SUCCESS" + and job_status.state != "FAILED" + ): + job_status = self.studies_api.inspect_study_job( + study_id=self.template_id, job_id=job.id + ) + time.sleep(1) + + task["status"] = job_status.state + + if job_status.state == "FAILED": + logger.error(f"Task failed: {task}") + raise Exception("Job returned a failed status") + else: + results = self.studies_api.get_study_job_outputs( + study_id=self.template_id, job_id=job.id + ).results + + for probe_name, probe_output in results.items(): + if probe_name not in output: + raise ValueError( + f"Unknown probe in output: {probe_name}" + ) + probe_type = output[probe_name]["type"] - if job_status.state == "FAILED": - logger.error(f"Task failed: {task}") - else: - results = self.studies_api.get_study_job_outputs( - study_id=self.template_id, job_id=job.id - ).results - - for probe_name, probe_output in results.items(): - if probe_name not in output: - raise ValueError( - f"Unknown probe in output: {probe_name}" - ) - probe_type = output[probe_name]["type"] - - if probe_type == "FileJSON": - output_file = pl.Path( - osparc.FilesApi(self.api_client).download_file( - probe_output.id + if probe_type == "FileJSON": + output_file = pl.Path( + osparc.FilesApi( + self.api_client + ).download_file(probe_output.id) ) - ) - with zipfile.ZipFile(output_file, "r") as zip_file: - file_results_path = zipfile.Path( - zip_file, at=output[probe_name]["filename"] + with zipfile.ZipFile( + output_file, "r" + ) as zip_file: + file_results_path = zipfile.Path( + zip_file, + at=output[probe_name]["filename"], + ) + file_results = json.loads( + file_results_path.read_text() + ) + + output[probe_name]["value"] = file_results + elif probe_type == "file": + tmp_output_data_file = osparc.FilesApi( + self.api_client + ).download_file(probe_output.id) + output_data_file = osparc.FilesApi( + self.api_client + ).upload_file(tmp_output_data_file) + + output[probe_name]["value"] = json.dumps( + output_data_file.to_dict() ) - file_results = json.loads( - file_results_path.read_text() + elif probe_type == "integer": + output[probe_name]["value"] = int(probe_output) + elif probe_type == "float": + output[probe_name]["value"] = float( + probe_output ) + else: + output[probe_name]["value"] = probe_output - output[probe_name]["value"] = file_results - elif probe_type == "file": - tmp_output_data_file = osparc.FilesApi( - self.api_client - ).download_file(probe_output.id) - output_data_file = osparc.FilesApi( - self.api_client - ).upload_file(tmp_output_data_file) - - output[probe_name]["value"] = json.dumps( - output_data_file.to_dict() - ) - elif probe_type == "integer": - output[probe_name]["value"] = int(probe_output) - elif probe_type == "float": - output[probe_name]["value"] = float(probe_output) - else: - output[probe_name]["value"] = probe_output - - logger.info(f"Worker has finished task: {task}") + logger.info(f"Worker has finished task: {task}") + except Exception as error: + if trial_number >= MAX_TRIALS: + logger.info( + f"Task {task} failed with error {error} in " + f"trial {trial_number}, not retrying, raising error" + ) + raise error + else: + logger.info( + f"Task {task} failed with error {error} in " + f"trial {trial_number}, retrying " + ) + task = map_func(task, trial_number=trial_number + 1) return task