See my comment concerning joining child processes before you have consumed items they have put to a multiprocessing.Queue, which can cause you to hang.
If I understand your code correctly, cores_per_task is poorly named since it represents the total number of cores (child processes) available to process all the tasks. But I will continue to use that name.
Since method multiprocessing.queue.empty is not reliable, if you have N processes doing get calls against a queue, you should add to the queue N sentinel items, which cannot be mistaken for a normal item, and is used to signify that there is no more data on the queue. Here we can use None as the sentinel. Similarly, since the main process must get all the items from the output queue prior to joining the child processes, each child process should add a None sentinel record when it has completed writing results to the queue. The main process only has to do blocking get calls and count the number of sentinel records it sees. When that number is N, it knows it had retrieved all the results:
from multiprocessing import Process, Queue import numpy as np def start_multicore_task(nres, cores_per_task): in_data = Queue() out_data = Queue() iterator = set(range(nres)) for res1 in iterator: # "others" are every value in "iterator" higher than res1 itself others = list(iterator - set(range(res1)) - {res1}) if len(others) > 0: # a check because the last value in "iterator" won't have any value higher than itself in_data.put((res1, others)) for _ in range(cores_per_task): in_data.put(None) # sentinel marking no more data to each child process def calculate_row(nres, in_data, out_data): "funciton to fill a row of what will be a square numpy array" while True: item = in_data.get() if item is None: # sentinel? break res, others = item row = np.zeros((nres,)) for other in others: # in my real code, which also doesn't work, obviously this performs a more # complex calculation to add to the row[other] array position row[other] = other out_data.put((res, row)) out_data.put(None) # put sentinel ps = [] for _ in range(cores_per_task): p = Process(target=calculate_row, args=(nres, in_data, out_data)) p.start() ps.append(p) corr = np.zeros((nres, nres)) sentinels_seen = 0 while sentinels_seen < cores_per_task: item = out_data.get() if item is None: # sentinel sentinels_seen += 1 else: res, row = item corr[res, :] = row for p in ps: p.join() return corr if __name__ == '__main__': print(start_multicore_task(10, 4))
Prints:
[[0. 1. 2. 3. 4. 5. 6. 7. 8. 9.] [0. 0. 2. 3. 4. 5. 6. 7. 8. 9.] [0. 0. 0. 3. 4. 5. 6. 7. 8. 9.] [0. 0. 0. 0. 4. 5. 6. 7. 8. 9.] [0. 0. 0. 0. 0. 5. 6. 7. 8. 9.] [0. 0. 0. 0. 0. 0. 6. 7. 8. 9.] [0. 0. 0. 0. 0. 0. 0. 7. 8. 9.] [0. 0. 0. 0. 0. 0. 0. 0. 8. 9.] [0. 0. 0. 0. 0. 0. 0. 0. 0. 9.] [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]
Alternatively, instead of the child processes putting sentinel items on the output queue, you can use a multiprocessing.JoinableQueue for in_data. As soon as a child process has processed an input item from in_data and put its result on out_data, it calls in_data.task_done() signifying that the item had been fully processed. Then the main process only has to call in_data.join() to wait for all results to have been generated followed by non-blocking calls in_data.get_no_wait() to retrieve the results. When this returns an Empty exception, we know that all results have been processed.
from multiprocessing import Process, Queue, JoinableQueue from queue import Empty import numpy as np def start_multicore_task(nres, cores_per_task): in_data = JoinableQueue() out_data = Queue() iterator = set(range(nres)) for res1 in iterator: # "others" are every value in "iterator" higher than res1 itself others = list(iterator - set(range(res1)) - {res1}) if len(others) > 0: # a check because the last value in "iterator" won't have any value higher than itself in_data.put((res1, others)) for _ in range(cores_per_task): in_data.put(None) # sentinel marking no more data to each child process def calculate_row(nres, in_data, out_data): "funciton to fill a row of what will be a square numpy array" while True: item = in_data.get() if item is None: # sentinel? in_data.task_done() # show we have completed processing this item break res, others = item row = np.zeros((nres,)) for other in others: # in my real code, which also doesn't work, obviously this performs a more # complex calculation to add to the row[other] array position row[other] = other out_data.put((res, row)) in_data.task_done() # show we have completed processing this item ps = [] for _ in range(cores_per_task): p = Process(target=calculate_row, args=(nres, in_data, out_data)) p.start() ps.append(p) # wait for all results to have been created: in_data.join() corr = np.zeros((nres, nres)) try: while True: res, row = out_data.get_nowait() corr[res, :] = row except Empty: pass for p in ps: p.join() return corr if __name__ == '__main__': print(start_multicore_task(10, 4))
Finally, we can eliminate having to put sentinel records to the input queue by just making the child processes daemon processes that will now loop forever trying to get more input but will automatically terminate when the main process terminates:
from multiprocessing import Process, Queue, JoinableQueue from queue import Empty import numpy as np def start_multicore_task(nres, cores_per_task): in_data = JoinableQueue() out_data = Queue() iterator = set(range(nres)) for res1 in iterator: # "others" are every value in "iterator" higher than res1 itself others = list(iterator - set(range(res1)) - {res1}) if len(others) > 0: # a check because the last value in "iterator" won't have any value higher than itself in_data.put((res1, others)) def calculate_row(nres, in_data, out_data): "funciton to fill a row of what will be a square numpy array" # This loop never terminates, but we are now a daemon process and # thus this process will terminate when the main process terminates: while True: res, others = in_data.get() row = np.zeros((nres,)) for other in others: # in my real code, which also doesn't work, obviously this performs a more # complex calculation to add to the row[other] array position row[other] = other out_data.put((res, row)) in_data.task_done() # show we have completed processing this item for _ in range(cores_per_task): p = Process(target=calculate_row, args=(nres, in_data, out_data), daemon=True) p.start() # wait for all results to have been created: in_data.join() corr = np.zeros((nres, nres)) try: while True: res, row = out_data.get_nowait() corr[res, :] = row except Empty: pass return corr if __name__ == '__main__': print(start_multicore_task(10, 4))
And if you are using a multiprocessing pool, since map returns its results in task submission order, there is no need for the worker function calculate_row to return back a tuple whose first element is res. The value of what res would have been had it been returned can now be deduced just by the order that the row is returned. That is, we can enumerate the rows returned with a starting index of 1. Also, there is no need to explicitly call shutdown on the pool since it will be implicitly called when the with Pool(cores_per_task) as p: block terminates:
from concurrent.futures import ProcessPoolExecutor as Pool from functools import partial import numpy as np def calculate_row(in_data, nres): "function to fill a row of what will be a square numpy array" res, others = in_data row = np.zeros((nres,)) for other in others: # in my real code, which also doesn't work, obviously this performs a more # complex calculation to add to the row[other] array position row[other] = other return row def compute_chunksize(iterable_size, pool_size): chunksize, remainder = divmod(iterable_size, pool_size * 4) if remainder: chunksize += 1 return chunksize def start_multicore_task(nres, cores_per_task): in_data = [] iterator = set(range(nres)) for res1 in iterator: # "others" are every value in "iterator" higher than res1 itself others = list(iterator - set(range(res1)) - {res1}) if len(others) > 0: # a check because the last value in "iterator" won't have any value higher than itself in_data.append((res1, others)) with Pool(cores_per_task) as p: # No need to convert items to a list and by no doing so, # we can process the results as they are generated without having to # wait for all the results to have been generated chunksize = compute_chunksize(len(in_data), cores_per_task) results = p.map(partial(calculate_row, nres=nres), in_data, chunksize=chunksize) corr = np.zeros((nres, nres)) for res, row in enumerate(results, start=1): corr[res, :] = row return corr if __name__ == '__main__': print(start_multicore_task(10, 4))
multiprocessthat uses dill to pickle the problem still remains.ProcessPoolExecutor. Your problem using queues and processes getting stuck results from trying tojointhe child process prior to consuming all items it has placed on the queue. Read the docs, in particular the warnings. Also, as mentioned, methodmultiprocessing.Queue.emptyis not reliable.