Skip to content

Commit

Permalink
Ensure shutdown with atexit
Browse files Browse the repository at this point in the history
Multiprocessing tries to join any remaining processes with an atexit
handler. Due to with/yield issues, this can run before the context
handler for the pool. Ensure that we shutdown before this happens by
registering our own atexit handler. Since we register it after
multiprocessing, it will run before. On normal shutdown it is
unregistered.
  • Loading branch information
ali1234 committed Nov 25, 2019
1 parent 276683c commit 8eff82b
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions teletext/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def __init__(self, function, processes=1, *args, **kwargs):
self._pipes.append(local)

def __enter__(self):
atexit.register(self.shutdown)
for p in self._procs:
p.start()
return self
Expand Down Expand Up @@ -170,12 +171,21 @@ def apply(self, iterable):
except (BrokenPipeError, ConnectionResetError, EOFError):
raise ChildProcessError('A worker process stopped unexpectedly.')

def __exit__(self, *args):
for p in self._pipes:
p.send((-1, None))
for p in self._procs:
p.join()
def shutdown(self):
for proc, pipe in zip(self._procs, self._pipes):
if proc.is_alive():
try:
pipe.send((-1, None))
while pipe.poll(timeout=0):
pipe.recv()
except (BrokenPipeError, ConnectionResetError, EOFError):
continue
finally:
proc.join()

def __exit__(self, *args):
self.shutdown()
atexit.unregister(self.shutdown)

class _PureGeneratorPoolSingle(object):

Expand Down

0 comments on commit 8eff82b

Please sign in to comment.