From 8eff82ba57cc8e7c2798cd7f9730456d61e60356 Mon Sep 17 00:00:00 2001 From: Alistair Buxton Date: Mon, 25 Nov 2019 17:21:08 +0000 Subject: [PATCH] Ensure shutdown with atexit Multiprocessing tries to join any remaining processes with an atexit handler. Due to with/yield issues, this can run before the context handler for the pool. Ensure that we shutdown before this happens by registering our own atexit handler. Since we register it after multiprocessing, it will run before. On normal shutdown it is unregistered. --- teletext/mp.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/teletext/mp.py b/teletext/mp.py index aae4577..90a29b5 100644 --- a/teletext/mp.py +++ b/teletext/mp.py @@ -126,6 +126,7 @@ def __init__(self, function, processes=1, *args, **kwargs): self._pipes.append(local) def __enter__(self): + atexit.register(self.shutdown) for p in self._procs: p.start() return self @@ -170,12 +171,21 @@ def apply(self, iterable): except (BrokenPipeError, ConnectionResetError, EOFError): raise ChildProcessError('A worker process stopped unexpectedly.') - def __exit__(self, *args): - for p in self._pipes: - p.send((-1, None)) - for p in self._procs: - p.join() + def shutdown(self): + for proc, pipe in zip(self._procs, self._pipes): + if proc.is_alive(): + try: + pipe.send((-1, None)) + while pipe.poll(timeout=0): + pipe.recv() + except (BrokenPipeError, ConnectionResetError, EOFError): + continue + finally: + proc.join() + def __exit__(self, *args): + self.shutdown() + atexit.unregister(self.shutdown) class _PureGeneratorPoolSingle(object):