3

I'm trying to learn how to use the multiprocessing package in Python, and I've written the following code, which randomly generates a large 2D array and then works out how many numbers in each row are within a specified interval (in this case between 4 and 8):

import time import multiprocessing as mp import numpy as np def how_many_within_range(row, minimum, maximum): count = 0 for n in row: if minimum <= n <= maximum: count += 1 return count if __name__ == '__main__': data = np.random.randint(0, 10, size=[10000000, 5]) print(data[:5]) start_time = time.perf_counter() # With parallelisation with mp.Pool(mp.cpu_count()) as pool: results = [ pool.apply(how_many_within_range, args=(row, 4, 8)) \ for row in data ] # Without parallelisation # results = [ how_many_within_range(row, 4, 8) for row in data ] print(f'Time elapsed: {time.perf_counter() - start_time}') print(results[:5]) 

Without multiprocessing, the code runs in about 40 seconds, but with it, the program is much slower and doesn't finish in a realistic time. I'm pretty sure I've correctly followed the tutorial I was using, so what am I doing wrong?

1
  • 2
    pool.apply is a blocking call. Use pool.apply_async() and things start running in parallel. The return structure of apply_async is a bit different, though. Commented Sep 2, 2021 at 14:39

3 Answers 3

1

.apply() is the wrong function for this case. .starmap() is more appropriate, but for this simple case the overhead of starting the processes and transferring the large amount of data interprocess makes it overall slower.

import time import multiprocessing as mp import numpy as np def how_many_within_range(row, minimum, maximum): count = 0 for n in row: if minimum <= n <= maximum: count += 1 return count if __name__ == '__main__': data = np.random.randint(0, 10, size=[1000000, 5]) print(data[:5]) # With parallelisation start_time = time.perf_counter() with mp.Pool() as pool: results = pool.starmap(how_many_within_range, ((row,4,8) for row in data), chunksize=1000) print(f'Time elapsed: {time.perf_counter() - start_time}') print(results[:5]) # Without parallelisation start_time = time.perf_counter() results = [ how_many_within_range(row, 4, 8) for row in data ] print(f'Time elapsed: {time.perf_counter() - start_time}') print(results[:5]) 

Output:

[[1 4 8 9 2] [9 1 6 7 0] [0 7 6 8 4] [4 5 6 9 9] [6 6 9 9 1]] Time elapsed: 3.3232607 [2, 2, 4, 3, 2] Time elapsed: 2.4664016999999996 [2, 2, 4, 3, 2] 
Sign up to request clarification or add additional context in comments.

3 Comments

Many thanks, this is definitely a lot better! Would it be possible to explain the difference between apply and starmap (I also tried using apply_async but that didn't seem to help much)? I've read the documentation but I still don't really understand when to use one over the other...
@MrLatinNerd Apply runs one function in the pool and waits for the result. Starmap takes an iterable that provides parameters and calls the function with each set of parameters in parallel. The chunksize parameter gives sets of parameters to each process to work on.
Brilliant, thanks for the clear explanation!
1

From the documentation it looks like Pool.apply() is blocking, so you get the overhead of starting a process but not gaining parallelism.

Comments

0

Why do you need to use multiprocessing in such a simple function, and even with numpy arrays? Try use this code

%%timeit np.sum((data>=4)&(data<=8), axis=1) 198 ms ± 3.44 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) 

There is no need to iterate through the array elements in the for loop and it is executed instantly

2 Comments

From the question: "... I'm trying to learn how to use the multiprocessing package ...".
Yup - I know that multiprocessing isn't necessary here, I was just testing it and trying to get it to work.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.