0

my multiprocessing.Pool(5).map(func, iterable) does not return because one process is raising an exception. How can I continue execution after this exception raises?

I hoped moving to map_async would solve this (maybe with the _error_callback), but I had the same issue there.

Example code:

x.py

#!/usr/bin/env python3 from os import getpid import multiprocessing as mp def f(x): print(x**2, f'Hi Im {getpid()}') if not x: raise Exception(f'bla {x}') pool = mp.Pool(5) res = pool.map(f, range(5)) print(res) 

Output:

0 Hi Im 15701 1 Hi Im 15702 4 Hi Im 15703 9 Hi Im 15704 16 Hi Im 15705 multiprocessing.pool.RemoteTraceback: """ Traceback (most recent call last): File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 125, in worker result = (True, func(*args, **kwds)) File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 48, in mapstar return list(map(*args)) File "./x.py", line 8, in f raise Exception(f'bla {x}') Exception: bla 0 """ The above exception was the direct cause of the following exception: Traceback (most recent call last): File "./x.py", line 13, in <module> print(res.get()) File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 768, in get raise self._value Exception: bla 0 

If I remove the offending process, everything works fine: (i.e. run with range(1,5))

10
  • range(5) is 0-4. 0 will pass your check (not x) Commented Sep 14, 2020 at 19:08
  • yes, this the core of the question. This is the offending process. What I don't get is how to continue execution after it raised its exception Commented Sep 14, 2020 at 19:10
  • Catch the exception in the function (try\except) Commented Sep 14, 2020 at 19:12
  • Not my function. This is just an example code, the real f is paramiko.connect() which I don't mean to change + I want a handle from the mp side Commented Sep 14, 2020 at 19:13
  • You can try wrapping that function in your own function that catches errors Commented Sep 14, 2020 at 19:15

2 Answers 2

1

AS you can see the python documentation on multiprocessing Pool:

Warning multiprocessing.pool objects have internal resources that need to be properly managed (like any other resource) by using the pool as a context manager or by calling close() and terminate() manually. Failure to do this can lead to the process hanging on finalization. Note that is not correct to rely on the garbage colletor to destroy the pool as CPython does not assure that the finalizer of the pool will be called (see object.del() for more information).

It clearly says if you don't use context managers or close() or terminate then your program got stuck inside.

from os import getpid import multiprocessing as mp def f(x): print(x**2, f'Hi Im {getpid()}') if not x: raise Exception(f'bla_{x}') with mp.Pool(5) as pool: res = pool.map(f, range(5)) print(res) 

Now if you run this code then it will not stuck and it will be executed correctly.

Also if you don't want to use the context manager then you need to close the pool manually by using

pool.close() 

After doing so you are properly managing the pool resources.

Sign up to request clarification or add additional context in comments.

1 Comment

@CIsForCookies Can you mark this answer as correct if it resolves your doubt.
0

The Pool.imap solution referred to in a comment is one way but only conveniently allows for target functions with a single function. I believe the following is more flexible:

If instead you use the ProcessPoolExecutor class from the concurrent.futures module with the submit method, which returns a Future instance, you have greater control over each submission's result:

#!/usr/bin/env python3 from os import getpid import concurrent.futures def f(x): print(x**2, f'Hi Im {getpid()}') if not x: raise Exception(f'bla {x}') return x**2 # let's return a value just for fun def main(): with concurrent.futures.ProcessPoolExecutor(5) as pool: futures = [pool.submit(f, i) for i in range(5)] for future in futures: # wait for all "jobs" to complete ... try: result = future.result() # ... by retrieving the result, which could be an exception print('result of calling f:', result) except Exception as e: print('exception found in result:', e) if __name__ == '__main__': main() 

Prints:

0 Hi Im 12996 1 Hi Im 5152 4 Hi Im 11160 9 Hi Im 12996 exception found in result: bla 0 16 Hi Im 5152 result of calling f: 1 result of calling f: 4 result of calling f: 9 result of calling f: 16 

5 Comments

I think ClsforCookies is asking the reason why his program is not halting with some error code.
This looks very similar to the actual implementation of multiprocessing. This will work, but it is not what I'm looking for (as I already have a working solution using ThreadPool). What I'm looking for is the reason for the weird mp behaviour. Why does it get stuck mid-execution?
I know on Windows you need to run the process creation code within a if __name__ == '__main__': block.
And I certainly read the question differently. When using pool.map, as soon as it processes the result from a "job' that raised an exception, the ballgame is over and you can't continue. My understanding is that was the problem and my code overcomes it. And it is halting -- with the bla 0 exception.
On my desktop, when I put the the last 3 lines in a if __name__ == '__main__': block, the progam throws a bla 0 exception and terminates.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.