diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 12d25bc2..163d8fa6 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -31,13 +31,6 @@ single experimental *payload*. For instructions on how to set **tomato** up for a first run, see the :ref:`quickstart`. -.. warning:: - - Currently, all *jobs* are executed under the user that started the :mod:`tomato.daemon`. - This means that when the :mod:`tomato.daemon` is running under a different user than the - current user who submits a *job*, this current user (if unpriviledged) will not be able to - cancel their own *job*. - Using :mod:`~tomato.ketchup` ```````````````````````````` @@ -78,6 +71,7 @@ by loading or ejecting *samples* and marking *pipelines* ready for execution. q Job has entered the queue. qw Job is in the queue, waiting for a pipeline to be ready. r Job is running. + rd Job has been marked for cancellation. c Job has completed successfully. ce Job has completed with an error. cd Job has been cancelled. @@ -94,6 +88,9 @@ by loading or ejecting *samples* and marking *pipelines* ready for execution. >>> ketchup cancel + This will mark the `job` for cancellation by setting its status to ``rd``. The + :mod:`tomato.daemon` will then proceed with cancelling the `job`. + *Jobs* submitted to the *queue* will remain in the *queue* until a *pipeline* meets all of the following criteria: diff --git a/src/tomato/daemon/main.py b/src/tomato/daemon/main.py index 61272663..40e6c2a8 100644 --- a/src/tomato/daemon/main.py +++ b/src/tomato/daemon/main.py @@ -6,6 +6,29 @@ import logging from .. import dbhandler +log = logging.getLogger(__name__) + + +def _kill_tomato_job(proc): + pc = proc.children() + log.warning(f"{proc.name()=}, {proc.pid=}, {pc=}") + if psutil.WINDOWS: + for proc in pc: + if proc.name() in {"conhost.exe"}: + continue + ppc = proc.children() + for proc in ppc: + log.debug(f"{proc.name()=}, {proc.pid=}, {proc.children()=}") + proc.terminate() + gone, alive = psutil.wait_procs(ppc, timeout=10) + elif psutil.POSIX: + for proc in pc: + log.debug(f"{proc.name()=}, {proc.pid=}, {proc.children()=}") + proc.terminate() + gone, alive = psutil.wait_procs(pc, timeout=10) + log.debug(f"{gone=}") + log.debug(f"{alive=}") + def _find_matching_pipelines(pipelines: list, method: list[dict]) -> list[str]: req_names = set([item["device"] for item in method]) @@ -40,7 +63,6 @@ def _pipeline_ready_sample(ret: tuple, sample: dict) -> bool: def main_loop(settings: dict, pipelines: dict, test: bool = False) -> None: - log = logging.getLogger(__name__) qup = settings["queue"]["path"] qut = settings["queue"]["type"] stp = settings["state"]["path"] @@ -63,7 +85,7 @@ def main_loop(settings: dict, pipelines: dict, test: bool = False) -> None: ret = dbhandler.job_get_all(qup, type=qut) for jobid, jobname, strpl, st in ret: payload = json.loads(strpl) - if st in ["q", "qw"]: + if st in {"q", "qw"}: if st == "q": log.info(f"checking whether job '{jobid}' can ever be matched") matched_pips = _find_matching_pipelines(pipelines, payload["method"]) @@ -101,4 +123,12 @@ def main_loop(settings: dict, pipelines: dict, test: bool = False) -> None: ["tomato_job", str(jpath)], start_new_session=sns ) break + elif st in {"rd"}: + log.warning(f"cancelling a running job {jobid} with pid {pid}") + proc = psutil.Process(pid=pid) + log.debug(f"{proc=}") + _kill_tomato_job(proc) + log.info(f"setting job {jobid} to status 'cd'") + dbhandler.job_set_status(qup, "cd", jobid, type=qut) + time.sleep(settings.get("main loop", 1)) diff --git a/src/tomato/dbhandler/sqlite.py b/src/tomato/dbhandler/sqlite.py index 211d0c52..0dc021db 100644 --- a/src/tomato/dbhandler/sqlite.py +++ b/src/tomato/dbhandler/sqlite.py @@ -14,7 +14,7 @@ def get_db_conn( sql = sqlite3 else: raise RuntimeError(f"database type '{type}' unsupported") - + head, tail = os.path.split(dbpath) if head != "" and not os.path.exists(head): log.warning("making local data folder '%s'", head) diff --git a/src/tomato/drivers/dummy/main.py b/src/tomato/drivers/dummy/main.py index eaad6635..c784ff50 100644 --- a/src/tomato/drivers/dummy/main.py +++ b/src/tomato/drivers/dummy/main.py @@ -139,7 +139,7 @@ def start_job( jobqueue :class:`multiprocessing.Queue` for passing job related data. - + logger :class:`logging.Logger` instance for writing logs. diff --git a/src/tomato/ketchup/functions.py b/src/tomato/ketchup/functions.py index 16b3ee55..7c15f108 100644 --- a/src/tomato/ketchup/functions.py +++ b/src/tomato/ketchup/functions.py @@ -227,6 +227,12 @@ def cancel(args: Namespace) -> None: cancelled. Optional arguments include the verbose/quiet switches (``-v/-q``) and the testing switch (``-t``). + .. note:: + + The :func:`~ketchup.functions.cancel` only sets the status of the running + job to ``rd``; the actual job cancellation is performed in the + :func:`tomato.daemon.main.main_loop`. + Examples -------- @@ -247,27 +253,6 @@ def cancel(args: Namespace) -> None: Cancelling a completed job will do nothing. """ - - def kill_tomato_job(proc): - pc = proc.children() - log.warning(f"{proc.name()=}, {proc.pid=}, {pc=}") - if psutil.WINDOWS: - for proc in pc: - if proc.name() in {"conhost.exe"}: - continue - ppc = proc.children() - for proc in ppc: - log.debug(f"{proc.name()=}, {proc.pid=}, {proc.children()=}") - proc.terminate() - gone, alive = psutil.wait_procs(ppc, timeout=10) - elif psutil.POSIX: - for proc in pc: - log.debug(f"{proc.name()=}, {proc.pid=}, {proc.children()=}") - proc.terminate() - gone, alive = psutil.wait_procs(pc, timeout=10) - log.debug(f"{gone=}") - log.debug(f"{alive=}") - dirs = setlib.get_dirs(args.test) settings = setlib.get_settings(dirs.user_config_dir, dirs.user_data_dir) state = settings["state"] @@ -286,10 +271,8 @@ def kill_tomato_job(proc): running = dbhandler.pipeline_get_running(state["path"], type=state["type"]) for pip, pjobid, pid in running: if pjobid == jobid: - log.warning(f"cancelling a running job {jobid} with pid {pid}") - proc = psutil.Process(pid=pid) - log.debug(f"{proc=}") - kill_tomato_job(proc) + log.info(f"setting job {jobid} to status 'rd'") + dbhandler.job_set_status(queue["path"], "rd", jobid, type=queue["type"]) def load(args: Namespace) -> None: