1

I have a function that uses multiprocessing's Queues and Processes to execute a task, but sometimes it gets stuck and it has unexplained behaviours. I would appreciate if someone could optimize this code to do the same final task in a better way, maybe even a more pythonic way.

from multiprocessing import Process, Queue import numpy as np def start_multicore_task(nres, cores_per_task): in_data = Queue() out_data = Queue() iterator = set(range(nres)) for res1 in iterator: # "others" are every value in "iterator" higher than res1 itself others = list(iterator - set(range(res1)) - {res1}) if len(others) > 0: # a check because the last value in "iterator" won't have any value higher than itself in_data.put((res1, others)) def calculate_row(nres, in_data, out_data): "funciton to fill a row of what will be a square numpy array" while not in_data.empty(): res, others = in_data.get() row = np.zeros((nres,)) for other in others: # in my real code, which also doesn't work, obviously this performs a more # complex calculation to add to the row[other] array position row[other] = other out_data.put((res, row)) return ps = [] for _ in range(cores_per_task): p = Process(target=calculate_row, args=(nres, in_data, out_data)) p.start() ps.append(p) for p in ps: p.join() corr = np.zeros((nres, nres)) for _ in range(out_data.qsize()): res, row = out_data.get() corr[res, :] = row return corr 

Sometimes calling start_multicore_task(50, 3) works and sometimes it gets stuck (if I put some prints around, it gets stuck in the last element of the in_data), and start_multicore_task(300, 3) has never worked (also stuck in the last element of the in_data). I don't know what is happening.

6
  • what is your os? Commented Jun 28, 2022 at 12:47
  • Your code shouldn't even run on windows since you are attempting to pickle a factory function Commented Jun 28, 2022 at 12:54
  • @Charchit Linux, with multiprocess that uses dill to pickle the problem still remains. Commented Jun 28, 2022 at 13:10
  • would you be open to using "spawn" method to start the process rather than the default on your os, i.e, "fork"? Commented Jun 28, 2022 at 13:35
  • 1
    I see that you switched to using a ProcessPoolExecutor. Your problem using queues and processes getting stuck results from trying to join the child process prior to consuming all items it has placed on the queue. Read the docs, in particular the warnings. Also, as mentioned, method multiprocessing.Queue.empty is not reliable. Commented Jun 29, 2022 at 13:53

3 Answers 3

2

See my comment concerning joining child processes before you have consumed items they have put to a multiprocessing.Queue, which can cause you to hang.

If I understand your code correctly, cores_per_task is poorly named since it represents the total number of cores (child processes) available to process all the tasks. But I will continue to use that name.

Since method multiprocessing.queue.empty is not reliable, if you have N processes doing get calls against a queue, you should add to the queue N sentinel items, which cannot be mistaken for a normal item, and is used to signify that there is no more data on the queue. Here we can use None as the sentinel. Similarly, since the main process must get all the items from the output queue prior to joining the child processes, each child process should add a None sentinel record when it has completed writing results to the queue. The main process only has to do blocking get calls and count the number of sentinel records it sees. When that number is N, it knows it had retrieved all the results:

from multiprocessing import Process, Queue import numpy as np def start_multicore_task(nres, cores_per_task): in_data = Queue() out_data = Queue() iterator = set(range(nres)) for res1 in iterator: # "others" are every value in "iterator" higher than res1 itself others = list(iterator - set(range(res1)) - {res1}) if len(others) > 0: # a check because the last value in "iterator" won't have any value higher than itself in_data.put((res1, others)) for _ in range(cores_per_task): in_data.put(None) # sentinel marking no more data to each child process def calculate_row(nres, in_data, out_data): "funciton to fill a row of what will be a square numpy array" while True: item = in_data.get() if item is None: # sentinel? break res, others = item row = np.zeros((nres,)) for other in others: # in my real code, which also doesn't work, obviously this performs a more # complex calculation to add to the row[other] array position row[other] = other out_data.put((res, row)) out_data.put(None) # put sentinel ps = [] for _ in range(cores_per_task): p = Process(target=calculate_row, args=(nres, in_data, out_data)) p.start() ps.append(p) corr = np.zeros((nres, nres)) sentinels_seen = 0 while sentinels_seen < cores_per_task: item = out_data.get() if item is None: # sentinel sentinels_seen += 1 else: res, row = item corr[res, :] = row for p in ps: p.join() return corr if __name__ == '__main__': print(start_multicore_task(10, 4)) 

Prints:

[[0. 1. 2. 3. 4. 5. 6. 7. 8. 9.] [0. 0. 2. 3. 4. 5. 6. 7. 8. 9.] [0. 0. 0. 3. 4. 5. 6. 7. 8. 9.] [0. 0. 0. 0. 4. 5. 6. 7. 8. 9.] [0. 0. 0. 0. 0. 5. 6. 7. 8. 9.] [0. 0. 0. 0. 0. 0. 6. 7. 8. 9.] [0. 0. 0. 0. 0. 0. 0. 7. 8. 9.] [0. 0. 0. 0. 0. 0. 0. 0. 8. 9.] [0. 0. 0. 0. 0. 0. 0. 0. 0. 9.] [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]] 

Alternatively, instead of the child processes putting sentinel items on the output queue, you can use a multiprocessing.JoinableQueue for in_data. As soon as a child process has processed an input item from in_data and put its result on out_data, it calls in_data.task_done() signifying that the item had been fully processed. Then the main process only has to call in_data.join() to wait for all results to have been generated followed by non-blocking calls in_data.get_no_wait() to retrieve the results. When this returns an Empty exception, we know that all results have been processed.

from multiprocessing import Process, Queue, JoinableQueue from queue import Empty import numpy as np def start_multicore_task(nres, cores_per_task): in_data = JoinableQueue() out_data = Queue() iterator = set(range(nres)) for res1 in iterator: # "others" are every value in "iterator" higher than res1 itself others = list(iterator - set(range(res1)) - {res1}) if len(others) > 0: # a check because the last value in "iterator" won't have any value higher than itself in_data.put((res1, others)) for _ in range(cores_per_task): in_data.put(None) # sentinel marking no more data to each child process def calculate_row(nres, in_data, out_data): "funciton to fill a row of what will be a square numpy array" while True: item = in_data.get() if item is None: # sentinel? in_data.task_done() # show we have completed processing this item break res, others = item row = np.zeros((nres,)) for other in others: # in my real code, which also doesn't work, obviously this performs a more # complex calculation to add to the row[other] array position row[other] = other out_data.put((res, row)) in_data.task_done() # show we have completed processing this item ps = [] for _ in range(cores_per_task): p = Process(target=calculate_row, args=(nres, in_data, out_data)) p.start() ps.append(p) # wait for all results to have been created: in_data.join() corr = np.zeros((nres, nres)) try: while True: res, row = out_data.get_nowait() corr[res, :] = row except Empty: pass for p in ps: p.join() return corr if __name__ == '__main__': print(start_multicore_task(10, 4)) 

Finally, we can eliminate having to put sentinel records to the input queue by just making the child processes daemon processes that will now loop forever trying to get more input but will automatically terminate when the main process terminates:

from multiprocessing import Process, Queue, JoinableQueue from queue import Empty import numpy as np def start_multicore_task(nres, cores_per_task): in_data = JoinableQueue() out_data = Queue() iterator = set(range(nres)) for res1 in iterator: # "others" are every value in "iterator" higher than res1 itself others = list(iterator - set(range(res1)) - {res1}) if len(others) > 0: # a check because the last value in "iterator" won't have any value higher than itself in_data.put((res1, others)) def calculate_row(nres, in_data, out_data): "funciton to fill a row of what will be a square numpy array" # This loop never terminates, but we are now a daemon process and # thus this process will terminate when the main process terminates: while True: res, others = in_data.get() row = np.zeros((nres,)) for other in others: # in my real code, which also doesn't work, obviously this performs a more # complex calculation to add to the row[other] array position row[other] = other out_data.put((res, row)) in_data.task_done() # show we have completed processing this item for _ in range(cores_per_task): p = Process(target=calculate_row, args=(nres, in_data, out_data), daemon=True) p.start() # wait for all results to have been created: in_data.join() corr = np.zeros((nres, nres)) try: while True: res, row = out_data.get_nowait() corr[res, :] = row except Empty: pass return corr if __name__ == '__main__': print(start_multicore_task(10, 4)) 

And if you are using a multiprocessing pool, since map returns its results in task submission order, there is no need for the worker function calculate_row to return back a tuple whose first element is res. The value of what res would have been had it been returned can now be deduced just by the order that the row is returned. That is, we can enumerate the rows returned with a starting index of 1. Also, there is no need to explicitly call shutdown on the pool since it will be implicitly called when the with Pool(cores_per_task) as p: block terminates:

from concurrent.futures import ProcessPoolExecutor as Pool from functools import partial import numpy as np def calculate_row(in_data, nres): "function to fill a row of what will be a square numpy array" res, others = in_data row = np.zeros((nres,)) for other in others: # in my real code, which also doesn't work, obviously this performs a more # complex calculation to add to the row[other] array position row[other] = other return row def compute_chunksize(iterable_size, pool_size): chunksize, remainder = divmod(iterable_size, pool_size * 4) if remainder: chunksize += 1 return chunksize def start_multicore_task(nres, cores_per_task): in_data = [] iterator = set(range(nres)) for res1 in iterator: # "others" are every value in "iterator" higher than res1 itself others = list(iterator - set(range(res1)) - {res1}) if len(others) > 0: # a check because the last value in "iterator" won't have any value higher than itself in_data.append((res1, others)) with Pool(cores_per_task) as p: # No need to convert items to a list and by no doing so, # we can process the results as they are generated without having to # wait for all the results to have been generated chunksize = compute_chunksize(len(in_data), cores_per_task) results = p.map(partial(calculate_row, nres=nres), in_data, chunksize=chunksize) corr = np.zeros((nres, nres)) for res, row in enumerate(results, start=1): corr[res, :] = row return corr if __name__ == '__main__': print(start_multicore_task(10, 4)) 
Sign up to request clarification or add additional context in comments.

3 Comments

Great alternatives! I had seen the use of sentinels in other questions/answers but I couldn't figure out how to implement them here. Thanks
I have updated the answer providing a third alternative that eliminates having to use sentinels.
And I have added a method that uses a multiprocessing pool, which is slightly different than yours since there is no need for calculate_row to return a tuple.
0

One problem is here:

def calculate_row(nres, in_data, out_data): "funciton to fill a row of what will be a square numpy array" while not in_data.empty(): res, others = in_data.get() 

Check the docs for multiprocessing.Queue.empty:

"Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable."

Your Queue has multiple consumers running simultaneously, the empty() function will sometimes return False but another Process happens to grab the last item from the Queue before the in_data.get() statement executes. That will cause a freeze, and it will happen randomly. I think this explains why your program sometimes doesn't process the last element.

Try it this way:

def calculate_row(nres, in_data, out_data): while True: try: res, others = in_data.get(block=False) except queue.Empty: break 

The rest of the method is unchanged. This should work because you fill the Queue before launching any consumer Process.

Warning: I had no way of running this myself.

Comments

0

In the end, I changed from Queues and Processes to a concurrent.futures's ProcessPoolExecutor and, as hinted by @Charchit, I had to move my mapped function calculate_row to the top-level of the module so that it could be pickled. This solution works great for me because, besides this problem, I need to run the start_multicore_task inside a multiprocess.Pool itself, which is a daemonic process that can't spawn children of its own, while a Pool from concurrent.futures runs smoothly inside it.

from concurrent.futures import ProcessPoolExecutor as Pool from functools import partial import numpy as np def calculate_row(in_data, nres): "function to fill a row of what will be a square numpy array" res, others = in_data row = np.zeros((nres,)) for other in others: # in my real code, which also doesn't work, obviously this performs a more # complex calculation to add to the row[other] array position row[other] = other return res, row def start_multicore_task(nres, cores_per_task): in_data = [] iterator = set(range(nres)) for res1 in iterator: # "others" are every value in "iterator" higher than res1 itself others = list(iterator - set(range(res1)) - {res1}) if len(others) > 0: # a check because the last value in "iterator" won't have any value higher than itself in_data.append((res1, others)) with Pool(cores_per_task) as p: results = list(p.map(partial(calculate_row, nres=nres), in_data, chunksize=len(in_data)/(cores_per_task*2))) p.shutdown() corr = np.zeros((nres, nres)) for result in results: res, row = result corr[res, :] = row return corr 

4 Comments

I believe you are using chunksize inappropriately. It specifies "chunks" or "batches" of tasks that each pool process will take of the input queue together to process. In the extreme case of where it specifies the total number of input tasks (size of in_data), then all the tasks would be pulled off the input queue by a single pool process and you would have no parallel processing because the other pool processes would remain idle because there is nothing left on the input queue. So it's not how this chunksize should relate to the process pool size (cores_per_task). (more....)
By not specifying a chunksize, which is perfectly okay unless the number of tasks being submitted is very large, ensures that there are no idle pool processes that have no work to do while other processes have multiple tasks left to process. Let's say you have 8 cores so you set the chunksize to 8. You now submit 32 tasks. They will be on the task input queue as 4 chunks of 8 tasks. Only 4 pool processes will be doing any work. What then is the point of having a pool size of 8?
@Booboo you're right! Since I expect my number of tasks (size of in_data) to be around ~300 always, I'm going to use chunksize=len(in_data)/(cores_per_task*2) (will change it in the answer), which at least in theory sounds nice to me. Thanks!
Just for your information, if you do multiprocessing with class multiprocessing.pool.Pool and call map without specifying a chunksize (this argument defaults to None), then chunksize will be computed as follows: chunksize, remainder = divmod(len(in_data), cores_per_task * 4); if remainder: chunksize += 1.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.