Skip to content

Commit

Permalink
fix pipeline hang when context is not configured
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-schick committed Oct 10, 2022
1 parent 99e11d8 commit b926b32
Showing 1 changed file with 36 additions and 23 deletions.
59 changes: 36 additions & 23 deletions mara_pipelines/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def run_pipeline(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None,
def run():

statistics_process: multiprocessing.Process = None
# execution contexts
active_contexts: {str: contexts.ExecutionContext} = {}

def exit_contexts(active_contexts: {str: contexts.ExecutionContext}, exception: Exception = None):
for context_alias, context in active_contexts.items():
Expand Down Expand Up @@ -120,8 +122,6 @@ def with_all_upstreams(nodes: {pipelines.Node}):
# queue whole pipeline
queue([pipeline])

# execution contexts
active_contexts: {str: contexts.ExecutionContext} = {}
# book keeping
run_start_time = datetime.datetime.now(tz.utc)
# all nodes that already ran or that won't be run anymore
Expand Down Expand Up @@ -277,34 +277,47 @@ def track_finished_pipelines():
logger.redirect_output(event_queue, pipeline.path())

else:
# initialize context
next_node_context = next_node.context() or config.default_execution_context()
if next_node_context not in active_contexts:
# enter context
new_context = contexts.context(next_node_context)

# TODO add better logging here
print(f"enter execution context '{next_node_context}'")

if not new_context.__enter__() or not new_context.is_active:
raise Exception(f'Could not enter execution context {next_node_context}')

active_contexts[next_node_context] = new_context

# run a task in a subprocess
task_start_time = datetime.datetime.now(tz.utc)
if next_node.parent in running_pipelines:
running_pipelines[next_node.parent][1] += 1
event_queue.put(
pipeline_events.NodeStarted(next_node.path(), datetime.datetime.now(tz.utc), False))
pipeline_events.NodeStarted(next_node.path(), task_start_time, False))
event_queue.put(pipeline_events.Output(
node_path=next_node.path(), format=logger.Format.ITALICS,
message='★ ' + node_cost.format_duration(
node_durations_and_run_times.get(tuple(next_node.path()), [0, 0])[0])))

status_queue = multiprocessing_context.Queue()
process = TaskProcess(next_node, event_queue, status_queue, active_contexts[next_node_context])
process.start()
running_task_processes[next_node] = process
# initialize context
next_node_context = next_node.context() or config.default_execution_context()
if next_node_context not in active_contexts:
# enter context
try:
logger.log(message=f"enter execution context '{next_node_context}'", format=logger.Format.STANDARD)

new_context = contexts.context(next_node_context)

if not new_context.__enter__() or not new_context.is_active:
raise Exception(f'Could not enter execution context {next_node_context}')

active_contexts[next_node_context] = new_context
except Exception as e:
logger.log(message=f"Could not initiate execution context", format=logger.Format.ITALICS,
is_error=True)
logger.log(message=traceback.format_exc(),
format=pipeline_events.Output.Format.VERBATIM, is_error=True)
event_queue.put(pipeline_events.NodeFinished(
node_path=next_node.path(), start_time=task_start_time,
end_time=datetime.datetime.now(tz.utc), is_pipeline=False, succeeded=False))

failed_pipelines.add(next_node.parent)
processed_nodes.add(next_node)

if next_node_context in active_contexts:
status_queue = multiprocessing_context.Queue()
process = TaskProcess(next_node, event_queue, status_queue, active_contexts[next_node_context])
process.start()
running_task_processes[next_node] = process

# check whether some of the running processes finished
for task_process in list(running_task_processes.values()): # type: TaskProcess
Expand Down Expand Up @@ -347,8 +360,8 @@ def track_finished_pipelines():
# exit active contexts
exit_contexts(active_contexts)

# run again because `dequeue` might have moved more nodes to `finished_nodes`
track_finished_pipelines()
# run again because `dequeue` might have moved more nodes to `finished_nodes`
track_finished_pipelines()

if statistics_process:
# kill the stats process (joining or terminating does not work in gunicorn)
Expand Down

0 comments on commit b926b32

Please sign in to comment.