-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle SIGTERM #40
Comments
If the goal is to gracefully kill all tasks and show the output for all of these, then the interesting parts are:
My current idea would be: In the main process:
In
(k8s waits 30 sec between SIGTERM and SIGKILL: https://cloud.google.com/blog/products/gcp/kubernetes-best-practices-terminating-with-grace Maybe it would also be good to add a |
I spent some time on this (see #41 for some changes I needed in data-integration, the signal handler itself can live elsewhere as they get inherited on fork.)
I've a signal handler which I install in
I will test this in our system for a bit and then see how it goes... import signal
import os
import sys
import time
import atexit
_ALL_SIGNALS = [signal.SIGTERM, signal.SIGINT, signal.SIGQUIT]
def _signal_all_children(signum: int):
"""Signals all children, starting with the youngest ones"""
import psutil
parent = psutil.Process(os.getpid())
processes = parent.children(recursive=True)
if processes:
_log(f"Found children, signaling {signum}...")
for child in reversed(processes):
try:
os.kill(child.pid, signum)
except:
# e.g. when the process is already gone as we already killed the child of it
pass
def _log(msg: str):
print(msg, flush=True, file=sys.stderr)
def install_gracefully_shutdown_signal_handler():
# gracefully shutting down:
# by installing this signal handler, all children also get it. This goes down to the process which runs
# run_pipeline, the executor started within that function and the TaskProcess started by the executor.
# Installing a signal handler in the webserver turned out to be totally unreliable which is why we only install
# this on a best effort base.
main_process_pid = os.getpid()
def _SIGKILL_all_children_handler(signum, frame):
_log(f"Signaling SIGKILL to all children (PID: {os.getpid()})")
_signal_all_children(signal.SIGKILL)
return
def _main_signal_handler(signum, frame):
"""kills children and signals the parent (up to the executor) to shutdown"""
if hasattr(_main_signal_handler, 'shutdown_flag'):
# already did the work
return
_main_signal_handler.shutdown_flag = True
_log(f"Received shutdown signal, signaling all children to shutdown (PID: {os.getpid()})")
_signal_all_children(signal.SIGTERM)
if main_process_pid != os.getpid():
# we are in a child -> bubble up the signal until we are in the executor itself
os.kill(os.getppid(), signal.SIGTERM)
# no other action to no die in the executor where events are generated and the TaskProcess
# both will die naturally when all children have died
else:
# we are in the main process, usually whatever runs run_pipeline
# send the signal again after 1 second to make sure that any newly started child processes also get killed
# this happens because we cannot prevent the executor from starting new ones unless the executor already
# has seen an error from the same pipeline. It would be nicer if we could do it directly in the executor...
time.sleep(1)
_signal_all_children(signal.SIGTERM)
_log(f"Scheduling SIGKILL to all children in 10 sec (PID: {os.getpid()})")
signal.signal(signal.SIGALRM, _SIGKILL_all_children_handler)
signal.alarm(10)
# return to let the process receive/send events -> if nothing is there anymore, the process dies in the end...
return
def cancel_alarm():
# if we are about to shutdown because all processes already exited without a SIGKILL, we have to disable
# the alarm which would sent this SIGKILL, otherwise we get a bad exit
signal.alarm(0)
# on fork, the signal handler stays on children unless overwritten
try:
for sig_no in _ALL_SIGNALS:
signal.signal(sig_no, _main_signal_handler)
atexit.register(cancel_alarm)
_log("Installed signal handler to gracefully shutdown the ETL")
except ValueError:
# we are in a webserver, you are on your own...
_log("Not in main thread, not adding signal handler") |
This makes some attempts to clean up during shutdown. The main thing is the atexit function in the run_pipeline function which simply closes a run/node_run when shutting down. The atexit in run() will shutdown any still running processes. It also attempts to kill the run_process when the run_pipeline process is shut down via strg+c. This should trigger the atexit function in run() so at least the children are killed. This does not do any attempt to do the same for any other signal yet Also included is a fix to actually check all ancestors of a task when checking if any of them is already failed -> the effect is that we do not schedule any tasks from an already queued sub pipelines (like a parallel task) in case the parent of that subpipeline is failed. With this in place I could successfully add some signal handler (not included yet, needs some more testing) which kills the running processes and closes the runs. It also handles strg+c in flask better, at least I didn't see leftover processes anymore. partly covers: #40
@jankatins is this still a problem that justifies the additional complexity? |
We currently run with this (see below for the current version) in prod so no real clue what would happen if I remove this again. It's possible to add such a handler from "outside" of mara-pipeline but if this is done in mara-pipeline in run_pipeline would give this to everyone who runs mara pipelines via a k8s cron (which can kill and move pods under some circumstances, e.g. out of ram, or simple because the pod should be moved because the instance is down'ed for some reason). https://github.com/mara/mara-pipelines/pull/43/files should IMO also still be merged to get rid of some log spam. Current version of the signal handler installation when the root pipeline is called: @patch(mara_pipelines.config.root_pipeline)
@functools.lru_cache(maxsize=None)
def root_pipeline():
...
# install a signal handler to gracefully shut down. Would probably only run once, even without the guard,
# thanks to the LRU cache
# DOES NOT INSTALL when run via the flask web UI!
from app.signals import install_gracefully_shutdown_signal_handler
if not hasattr(install_gracefully_shutdown_signal_handler, 'already_installed'):
setattr(install_gracefully_shutdown_signal_handler, 'already_installed', True)
install_gracefully_shutdown_signal_handler() And this is import signal
import os
import sys
import time
import atexit
import psutil
_ALL_SIGNALS = [signal.SIGTERM, signal.SIGINT, signal.SIGQUIT]
def _signal_children(signum: int, parent_pid: int = None, only_tasks=False):
"""Signals all children, starting with the youngest ones"""
if parent_pid is None:
parent_pid = os.getpid()
parent = psutil.Process(parent_pid)
processes = parent.children(recursive=True)
for child in reversed(processes):
if only_tasks:
if _name_process(child.pid) in ("Main", "Executor"):
# protect these two so events are handled
continue
try:
os.kill(child.pid, signum)
except :
# e.g. when the process is already gone as we already killed the child of it
pass
def _log(msg: str):
print(msg, flush=True, file=sys.stderr)
def _name_process(pid: int):
"""Names the process of the pid"""
# The idea is that we start with a main process (run_pipeline), which starts the Executor (run())
# which starts the system stats process and the individual tasks. All of these python processes share the same
# "name" (the commandline). Within the tasks it's different and the process parent of the main process is different.
try:
p = psutil.Process(pid)
p_name = p.name()
except psutil.NoSuchProcess:
# process died already...
return
python_name = psutil.Process(os.getpid()).name()
name_for_level = {
1: "Main",
2: "Executor",
3: "Task/SystemStats",
4: "Task Child"
}
if p_name != python_name:
# this assumes we only get children as pid arg!
return name_for_level[4]
for level in [1, 2, 3]:
if p.parent().name() != p_name:
return name_for_level[level]
p = p.parent()
# this can happen for python function tasks which open a new process
return name_for_level[4]
def install_gracefully_shutdown_signal_handler():
# gracefully shutting down:
# by installing this signal handler, all children also get it. This goes down to the process which runs
# run_pipeline, the executor started within that function and the TaskProcess started by the executor.
# Installing a signal handler in the webserver turned out to be totally unreliable which is why we only install
# this on a best effort base.
# if the "highest parent" (which in a normal exec is the process which runs run_pipeline) get a SIGTERM/SIGINT:
# - set 'shutdown_flag' (and return if it is already set to not do the work twice)
# - signal SIGINT to all tasks
# - schedule a SIGINT in 2 sec for all children (so including the executor)
# - schedule a SIGKILL in 10 sec for all children and raise a KeyboardInterupt() to shutdown the run_pipeline()
# - return to let the run_pipeline receive the events for all the killed children
# if any children TaskProcess or the stats process gets a SIGTERM/SIGINT:
# - set a "shutdown_flag" (and return if it is already set to not do the work twice)
# - signal SIGTERM to the main process (which should be the executor)
# - return to let the TaskProcess and the executor send the right events
installing_process_pid = os.getpid()
def _SIGKILL_all_children_handler(signum, frame):
_log(f"Signaling SIGKILL to all children (PID: {os.getpid()})")
_signal_children(signal.SIGKILL)
# Go out via the SIGINT/KeyboardInterrupt as mara-pipeline handles that in several places
raise KeyboardInterrupt()
def _SIGINT_all_children_handler(signum, frame):
_log(f"Signaling SIGINT to all children (PID: {os.getpid()})")
_signal_children(signal.SIGINT)
_log(f"Scheduling SIGKILL to all children in 10 sec (PID: {os.getpid()})")
signal.signal(signal.SIGALRM, _SIGKILL_all_children_handler)
signal.alarm(10)
def _main_signal_handler(signum, frame):
"""kills children and signals the parent (up to the executor) to shutdown"""
if hasattr(_main_signal_handler, 'shutdown_flag'):
# already did the work
return
_main_signal_handler.shutdown_flag = True
_sig_name = signal.Signals(signum).name
_p_name = _name_process(os.getpid())
_log(f"Received shutdown signal {_sig_name} in {_p_name} process (PID: {os.getpid()})")
if installing_process_pid != os.getpid():
# we are in a child -> bubble up the signal to the parent and the
# main pid (which should be the executor itself)
os.kill(installing_process_pid, signal.SIGINT)
# no other action to not die in the executor where events are generated and the TaskProcess
# both will die naturally when all children have died
else:
# we are in the main process, usually whatever runs run_pipeline
_log(f"Shutdown all tasks (PID: {os.getpid()})")
_signal_children(signal.SIGINT, installing_process_pid, only_tasks=True)
# send another shutdown signal after 1 second to make sure that any newly started child processes
# also get killed. this happens because we cannot prevent the executor from starting new ones unless
# the executor already has seen an error from the same pipeline. It would be nicer if we could do it
# directly in the executor...
# While we sleep, no event handling happens!
time.sleep(1)
_signal_children(signal.SIGINT, installing_process_pid, only_tasks=True)
_log(f"Scheduling SIGNINT to all children in 3 sec (PID: {os.getpid()})")
signal.signal(signal.SIGALRM, _SIGINT_all_children_handler)
signal.alarm(3)
# return to let the process receive/send events -> if nothing is there anymore, the process dies in the end...
return
def cancel_alarm():
# if we are about to shutdown because all processes already exited without a SIGKILL, we have to disable
# the alarm which would sent this SIGKILL, otherwise we get a bad exit
signal.alarm(0)
# on fork, the signal handler stays on children unless overwritten
try:
for sig_no in _ALL_SIGNALS:
signal.signal(sig_no, _main_signal_handler)
atexit.register(cancel_alarm)
_log("Installed signal handler to gracefully shutdown the ETL")
except ValueError:
# we are in a webserver, you are on your own...
_log("Not in main thread, not adding signal handler") |
I see this as an important topic that this should be part of the mara-pipelines package. I started now to use docker projects and see that this uses the same approach sending SIGTERM messages. In addition, on my current debian mara server I have often running mara tasks which run infinitely because they got stuck somehow... which I kill then from time to time. These jobs are executed via I think we should write unit tests for these problems and then build a solution from that (test-driven development). When new changes are done to the pipeline execution (see #74 #69 ) they can easily mess up something which was working before. |
I investigated this now a bit and I personally think we need here another solution since you can define a signal handler only once in your app code. What we need is an option to communicate with the running pipeline e.g. via a multiprocessing Queue or Pipe class instance: We add a global list somewhere holding all currently running pipeline processes (e.g. including the statistic process).
A public method should be available to send these signals to all/a specific running pipeline. Maybe this could be even implemented in a REST API. |
K8s sends a SIGTERM followed by a waiting period and SIGKILL. The pipeline runner currently only handles SIGINT (ctrl-c, via the exception python raises for this key combo which triggers the generic
except:
block in the main loop).Python per default seems to react to SIGTERM by terminating. This results in situations where the pipeline run is abruptly killed which leaves open runs around.
How to reproduce:
kill <list of pids>
or close the docker container)-> the runs (
data_integration_run
) and node_runs (data_integration_node_run
) are left open (end_time
isNULL
).We use the runs to not start a second run and the node_runs to display prometheus metrics per main pipeline (last run, currently running,...). Leaving them open means someone has to close them manually :-(
The text was updated successfully, but these errors were encountered: