3

What's the proper way of aborting multiprocessing when one of the child aborts and/or throw an Exception?

I found various questions around that (generic multiprocessing error handling, how to close multiprocessing pool on exception but without answer, ...), but no clear answer on how to stop multiprocessing on child exception.

For instance, I expect the following code:

def f(x): sleep(x) print(f"f({x})") return 1.0 / (x - 2) def main(): with Pool(4) as p: try: r = p.map(f, range(7)) except Exception as e: print(f"oops: {e}") p.close() p.terminate() print("end") if __name__ == '__main__': main() 

To output:

f(0) f(1) f(2) oops: float division by zero end 

Instead, it applies f function on all items before detecting/handling the exception:

f(0) f(1) f(2) f(4) f(3) f(5) f(6) oops: float division by zero end 

Isn't there any way to catch the exception directly?

4
  • Note that the exception is only re-raised in the main process after map has finished. If you definitely want to use map there is no way to catch it any earlier, because the exception does not exist any earlier. So, the question is whether you want to stick to map or are looking for a custom solution? Commented Sep 11, 2018 at 9:14
  • Which python version? My Python:3.4.2 behave as you expected. Commented Sep 11, 2018 at 13:08
  • @stovfl I've edited my post, I'm using Python 3.6.5 Commented Sep 11, 2018 at 13:31
  • @MisterMiyagi I'm looking for the simplest multiprocessing with error handling - I'd like to avoid adding wrapper class or equivalent to handle it though. Commented Sep 11, 2018 at 13:31

1 Answer 1

2

I think you're going to need apply_async for this, so you can act upon every single result instead of the cumulative result. pool.apply_async offers an error_callback parameter you can use to register your error-handler. apply_async is not blocking, so you'll need to join() the pool. I'm also using a flag terminated to know when results can be processed normally in case no exception occured.

from time import sleep from multiprocessing import Pool def f(x): sleep(x) print(f"f({x})") return 1.0 / (x - 2) def on_error(e): global terminated terminated = True pool.terminate() print(f"oops:{e}") def main(): global pool global terminated terminated = False pool = Pool(4) results = [pool.apply_async(f, (x,), error_callback=on_error) for x in range(7)] pool.close() pool.join() if not terminated: for r in results: print(r.get()) print("end") if __name__ == '__main__': main() 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.