Skip to content

Commit

Permalink
Adding extra safety mechanisms in case of failed jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
wvangeit committed Apr 26, 2024
1 parent fe8bad6 commit 7b3fb16
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 104 deletions.
2 changes: 1 addition & 1 deletion .osparc/osparc-map/runtime.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ compose-spec:
osparc-map:
image: $${SIMCORE_REGISTRY}/simcore/services/dynamic/osparc-map:$${SERVICE_VERSION}
environment:
- OSPARC_API_HOST=https://api.osparc-master.speag.com
- OSPARC_API_HOST=$${OSPARC_VARIABLE_API_HOST}
- OSPARC_API_KEY=$${OSPARC_VARIABLE_API_KEY}
- OSPARC_API_SECRET=$${OSPARC_VARIABLE_API_SECRET}
container-http-entrypoint: osparc-map
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ MAKEFLAGS += -j2
export DOCKER_IMAGE_NAME ?= osparc-map
export DOCKER_IMAGE_TAG ?= 0.0.7

export MASTER_AWS_REGISTRY ?= registry.osparc-master-zmt.click
export MASTER_REGISTRY ?= registry.osparc-master.speag.com
export LOCAL_REGISTRY ?= registry:5000

Expand Down Expand Up @@ -66,6 +67,12 @@ publish-master: ## push to local throw away registry to test integration
docker push $(MASTER_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG)
@curl $(MASTER_REGISTRY)/v2/_catalog | jq

.PHONY: publish-master-aws
publish-master-aws: ## push to local throw away registry to test integration
docker tag simcore/services/dynamic/${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_TAG} $(MASTER_AWS_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG)
docker push $(MASTER_AWS_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG)
@curl $(MASTER_AWS_REGISTRY)/v2/_catalog | jq

.PHONY: help
help: ## this colorful help
@echo "Recipes for '$(notdir $(CURDIR))':"
Expand Down
247 changes: 144 additions & 103 deletions docker_scripts/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
)
logger = logging.getLogger(__name__)

MAX_JOB_CREATE_ATTEMPTS = 5
MAX_TRIALS = 5

HTTP_PORT = 8888

POLLING_INTERVAL = 1 # second
Expand Down Expand Up @@ -65,10 +68,27 @@ def __init__(self, *args, **kwargs):

@contextlib.contextmanager
def create_study_job(template_id, job_inputs, studies_api):
job = studies_api.create_study_job(
study_id=template_id,
job_inputs=job_inputs,
)
n_of_create_attempts = 0
while True:
try:
n_of_create_attempts += 1
job = studies_api.create_study_job(
study_id=template_id,
job_inputs=job_inputs,
)
break
except osparc_client.exceptions.ApiException:
if n_of_create_attempts >= MAX_JOB_CREATE_ATTEMPTS:
raise Exception(
f"Tried {n_of_create_attempts} to create a job from "
"the study, but failed"
)
else:
logger.info(
"Received an API Exception from server "
"when creating job, retrying..."
)

try:
yield job
finally:
Expand Down Expand Up @@ -170,6 +190,7 @@ def start(self):
logger.info(
f"Waiting for input file at {self.input_tasks_path}..."
)
self.handshaker.retry_last_write()
time.sleep(self.polling_interval)
waiter += 1

Expand Down Expand Up @@ -222,113 +243,133 @@ def start(self):
def run_tasks(self, tasks_uuid, input_tasks, n_of_workers):
logger.info(f"Evaluating: {input_tasks}")

def map_func(task):
logger.info(f"Running worker for task: {task}")

input = task["input"]
output = task["output"]

job_inputs = {"values": {}}

for param_name, param_input in input.items():
param_type = param_input["type"]
param_value = param_input["value"]
if param_type == "FileJSON":
param_filename = param_input["filename"]
tmp_dir = tempfile.TemporaryDirectory()
tmp_dir_path = pl.Path(tmp_dir.name)
tmp_input_file_path = tmp_dir_path / param_filename
tmp_input_file_path.write_text(json.dumps(param_value))

input_data_file = osparc.FilesApi(
self.api_client
).upload_file(file=tmp_input_file_path)
job_inputs["values"][param_name] = input_data_file
elif param_type == "file":
file_info = json.loads(param_value)
input_data_file = osparc_client.models.file.File(
id=file_info["id"],
filename=file_info["filename"],
content_type=file_info["content_type"],
checksum=file_info["checksum"],
e_tag=file_info["e_tag"],
)
job_inputs["values"][param_name] = input_data_file
elif param_type == "integer":
job_inputs["values"][param_name] = int(param_value)
elif param_type == "float":
job_inputs["values"][param_name] = float(param_value)
else:
job_inputs["values"][param_name] = param_value

logger.debug(f"Sending inputs: {job_inputs}")

with create_study_job(
self.template_id, job_inputs, self.studies_api
) as job:
job_status = self.studies_api.start_study_job(
study_id=self.template_id, job_id=job.id
)

while (
job_status.state != "SUCCESS"
and job_status.state != "FAILED"
):
job_status = self.studies_api.inspect_study_job(
def map_func(task, trial_number=1):
try:
logger.info(f"Running worker for task: {task}")

input = task["input"]
output = task["output"]

job_inputs = {"values": {}}

for param_name, param_input in input.items():
param_type = param_input["type"]
param_value = param_input["value"]
if param_type == "FileJSON":
param_filename = param_input["filename"]
tmp_dir = tempfile.TemporaryDirectory()
tmp_dir_path = pl.Path(tmp_dir.name)
tmp_input_file_path = tmp_dir_path / param_filename
tmp_input_file_path.write_text(json.dumps(param_value))

input_data_file = osparc.FilesApi(
self.api_client
).upload_file(file=tmp_input_file_path)
job_inputs["values"][param_name] = input_data_file
elif param_type == "file":
file_info = json.loads(param_value)
input_data_file = osparc_client.models.file.File(
id=file_info["id"],
filename=file_info["filename"],
content_type=file_info["content_type"],
checksum=file_info["checksum"],
e_tag=file_info["e_tag"],
)
job_inputs["values"][param_name] = input_data_file
elif param_type == "integer":
job_inputs["values"][param_name] = int(param_value)
elif param_type == "float":
job_inputs["values"][param_name] = float(param_value)
else:
job_inputs["values"][param_name] = param_value

logger.debug(f"Sending inputs: {job_inputs}")

with create_study_job(
self.template_id, job_inputs, self.studies_api
) as job:
job_status = self.studies_api.start_study_job(
study_id=self.template_id, job_id=job.id
)
time.sleep(1)

task["status"] = job_status.state
while (
job_status.state != "SUCCESS"
and job_status.state != "FAILED"
):
job_status = self.studies_api.inspect_study_job(
study_id=self.template_id, job_id=job.id
)
time.sleep(1)

task["status"] = job_status.state

if job_status.state == "FAILED":
logger.error(f"Task failed: {task}")
raise Exception("Job returned a failed status")
else:
results = self.studies_api.get_study_job_outputs(
study_id=self.template_id, job_id=job.id
).results

for probe_name, probe_output in results.items():
if probe_name not in output:
raise ValueError(
f"Unknown probe in output: {probe_name}"
)
probe_type = output[probe_name]["type"]

if job_status.state == "FAILED":
logger.error(f"Task failed: {task}")
else:
results = self.studies_api.get_study_job_outputs(
study_id=self.template_id, job_id=job.id
).results

for probe_name, probe_output in results.items():
if probe_name not in output:
raise ValueError(
f"Unknown probe in output: {probe_name}"
)
probe_type = output[probe_name]["type"]

if probe_type == "FileJSON":
output_file = pl.Path(
osparc.FilesApi(self.api_client).download_file(
probe_output.id
if probe_type == "FileJSON":
output_file = pl.Path(
osparc.FilesApi(
self.api_client
).download_file(probe_output.id)
)
)
with zipfile.ZipFile(output_file, "r") as zip_file:
file_results_path = zipfile.Path(
zip_file, at=output[probe_name]["filename"]
with zipfile.ZipFile(
output_file, "r"
) as zip_file:
file_results_path = zipfile.Path(
zip_file,
at=output[probe_name]["filename"],
)
file_results = json.loads(
file_results_path.read_text()
)

output[probe_name]["value"] = file_results
elif probe_type == "file":
tmp_output_data_file = osparc.FilesApi(
self.api_client
).download_file(probe_output.id)
output_data_file = osparc.FilesApi(
self.api_client
).upload_file(tmp_output_data_file)

output[probe_name]["value"] = json.dumps(
output_data_file.to_dict()
)
file_results = json.loads(
file_results_path.read_text()
elif probe_type == "integer":
output[probe_name]["value"] = int(probe_output)
elif probe_type == "float":
output[probe_name]["value"] = float(
probe_output
)
else:
output[probe_name]["value"] = probe_output

output[probe_name]["value"] = file_results
elif probe_type == "file":
tmp_output_data_file = osparc.FilesApi(
self.api_client
).download_file(probe_output.id)
output_data_file = osparc.FilesApi(
self.api_client
).upload_file(tmp_output_data_file)

output[probe_name]["value"] = json.dumps(
output_data_file.to_dict()
)
elif probe_type == "integer":
output[probe_name]["value"] = int(probe_output)
elif probe_type == "float":
output[probe_name]["value"] = float(probe_output)
else:
output[probe_name]["value"] = probe_output

logger.info(f"Worker has finished task: {task}")
logger.info(f"Worker has finished task: {task}")
except Exception as error:
if trial_number >= MAX_TRIALS:
logger.info(
f"Task {task} failed with error {error} in "
f"trial {trial_number}, not retrying, raising error"
)
raise error
else:
logger.info(
f"Task {task} failed with error {error} in "
f"trial {trial_number}, retrying "
)
task = map_func(task, trial_number=trial_number + 1)

return task

Expand Down

0 comments on commit 7b3fb16

Please sign in to comment.