Skip to content

Commit

Permalink
fix pipeline hang when context is not configured
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-schick committed Jul 11, 2022
1 parent ca00f30 commit 74fa203
Showing 1 changed file with 42 additions and 26 deletions.
68 changes: 42 additions & 26 deletions mara_pipelines/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ def exit_contexts(active_contexts: {str: contexts.ExecutionContext}, exception:
print(f"failed to exit execution context '{context_alias}'. Exception: {e}")
pass

statistics_process: multiprocessing.Process = None
# execution contexts
active_contexts: {str: contexts.ExecutionContext} = {}

try:
# capture output of print statements and other unplanned output
logger.redirect_output(event_queue, pipeline.path())
Expand Down Expand Up @@ -109,8 +113,6 @@ def with_all_upstreams(nodes: {pipelines.Node}):
# queue whole pipeline
queue([pipeline])

# execution contexts
active_contexts: {str: contexts.ExecutionContext} = {}
# book keeping
run_start_time = datetime.datetime.now(tz.utc)
# all nodes that already ran or that won't be run anymore
Expand Down Expand Up @@ -263,34 +265,47 @@ def track_finished_pipelines():
logger.redirect_output(event_queue, pipeline.path())

else:
# initialize context
next_node_context = next_node.context() or config.default_execution_context()
if next_node_context not in active_contexts:
# enter context
new_context = contexts.context(next_node_context)

# TODO add better logging here
print(f"enter execution context '{next_node_context}'")

if not new_context.__enter__() or not new_context.is_active:
raise Exception(f'Could not enter execution context {next_node_context}')

active_contexts[next_node_context] = new_context

# run a task in a subprocess
task_start_time = datetime.datetime.now(tz.utc)
if next_node.parent in running_pipelines:
running_pipelines[next_node.parent][1] += 1
event_queue.put(
pipeline_events.NodeStarted(next_node.path(), datetime.datetime.now(tz.utc), False))
pipeline_events.NodeStarted(next_node.path(), task_start_time, False))
event_queue.put(pipeline_events.Output(
node_path=next_node.path(), format=logger.Format.ITALICS,
message='★ ' + node_cost.format_duration(
node_durations_and_run_times.get(tuple(next_node.path()), [0, 0])[0])))

status_queue = multiprocessing_context.Queue()
process = TaskProcess(next_node, event_queue, status_queue, active_contexts[next_node_context])
process.start()
running_task_processes[next_node] = process
# initialize context
next_node_context = next_node.context() or config.default_execution_context()
if next_node_context not in active_contexts:
# enter context
try:
logger.log(message=f"enter execution context '{next_node_context}'", format=logger.Format.STANDARD)

new_context = contexts.context(next_node_context)

if not new_context.__enter__() or not new_context.is_active:
raise Exception(f'Could not enter execution context {next_node_context}')

active_contexts[next_node_context] = new_context
except Exception as e:
logger.log(message=f"Could not initiate execution context", format=logger.Format.ITALICS,
is_error=True)
logger.log(message=traceback.format_exc(),
format=pipeline_events.Output.Format.VERBATIM, is_error=True)
event_queue.put(pipeline_events.NodeFinished(
node_path=next_node.path(), start_time=task_start_time,
end_time=datetime.datetime.now(tz.utc), is_pipeline=False, succeeded=False))

failed_pipelines.add(next_node.parent)
processed_nodes.add(next_node)

if next_node_context in active_contexts:
status_queue = multiprocessing_context.Queue()
process = TaskProcess(next_node, event_queue, status_queue, active_contexts[next_node_context])
process.start()
running_task_processes[next_node] = process

# check whether some of the running processes finished
for task_process in list(running_task_processes.values()): # type: TaskProcess
Expand Down Expand Up @@ -334,12 +349,13 @@ def track_finished_pipelines():
# exit active contexts
exit_contexts(active_contexts)

# run again because `dequeue` might have moved more nodes to `finished_nodes`
track_finished_pipelines()
# run again because `dequeue` might have moved more nodes to `finished_nodes`
track_finished_pipelines()

# kill the stats process (joining or terminating does not work in gunicorn)
os.kill(statistics_process.pid, signal.SIGKILL)
statistics_process.join()
if statistics_process:
# kill the stats process (joining or terminating does not work in gunicorn)
os.kill(statistics_process.pid, signal.SIGKILL)
statistics_process.join()

# run finished
event_queue.put(pipeline_events.RunFinished(node_path=pipeline.path(), end_time=datetime.datetime.now(tz.utc),
Expand Down

0 comments on commit 74fa203

Please sign in to comment.