0

I must use library multiprocessing. Trying to run in parallel following loops:

tag=[] #get all model ID all_model_id=get_models_id() #I have a list of list liste_all_img_id=[] #I want to start multiprocessing here for i in range(0,len(all_model_id)): tag=get_tags(all_model_id[i][0]) # get_tags function return me a list #print(tag) for l in range(0,len(tag)): liste_all_img_id.append(get_images_id(tag[l][0],all_model_id[i][0])) #get_images_id function return me a list 

I tried this:

def funcs(start,end): tag=[] list_all_img_id=[] for i in range(start,end): tag=get_tags(all_model_id[i][0]) for l in range(0,len(tag)): list_all_img_id.append(get_images_id(tag[l][0],all_model_id[i][0])) return(list_all_img_id) from multiprocessing import Pool import multiprocessing def main(): all_model_id=get_models_id() len_all_model_id=len(all_model_id) div_total = int(len_all_model_id / 3) rest_div_total = len_all_model_id%3 t1 = multiprocessing.Process(target = funcs,name = "", args=(0, div_total)) t2 = multiprocessing.Process(target = funcs,name = "", args=(div_total, div_total*2)) t3 = multiprocessing.Process(target = funcs,name = "", args=(div_total*2, div_total*3 + rest_div_total + 1)) list_threads = [t1,t2,t3] for i in list_threads: i.start() for i in list_threads: i.join() if __name__ == "__main__": main() 

But :

  • I'm not sure main function is well defined

  • I don't know how to store my results

3 Answers 3

1
def funcs(start,end): tag=[] list_all_img_id=[] for i in range(start,end): tag=get_tags(all_model_id[i][0]) for l in range(0,len(tag)): list_all_img_id.append(get_images_id(tag[l][0],all_model_id[i][0])) return(list_all_img_id) from multiprocessing.pool import Pool def main(): all_model_id=get_models_id() len_all_model_id=len(all_model_id) div_total = int(len_all_model_id / 3) rest_div_total = len_all_model_id%3 with Pool(3) as pool: results = [] # submit 3 tasks without blocking results.append(pool.apply_async(funcs, args=(0, div_total))) results.append(pool.apply_async(funcs, args=(div_total, div_total*2))) results.append(pool.apply_async(funcs, args=(div_total*2, div_total*3 + rest_div_total + 1))) # now await 3 results: for result in results: print(result.get()) if __name__ == "__main__": main() 

Note that apply_async takes an optional callback argument where you can specify a function to be called with a result (the actual return value from the task) as soon as it becomes available, which may not be in the order in which the tasks were submitted. The above method of obtaining the results (i.e. relying on the result object returned from apply_async on which a blocking call to get can be made) will always obtain the results in task-submission order like the starmap function, a reasonable alternative if you have all the call arguments for all the task submissions in an iterable such as a list or a tuple):

with Pool(3) as pool: results = pool.starmap(funcs, [ (0, div_total), (div_total, div_total*2), (div_total*2, div_total*3 + rest_div_total + 1) ]) for result in results: print(result) 

I, too, am a fan of the concurrent.futures module but wanted to make the minimal number of changes to your program. But note that you can use the undocumented but nevertehless existant ThreadPool class that is compatible with the multiprocessing Pool class by simply invoking:

from mulitprocessing.pool import ThreadPool 

instead of

from mulitprocessing.pool import Pool 

and then specifying:

with ThreadPool(3) as pool: 

If your tasks are very I/O intensive, then threading may be a better option.

Sign up to request clarification or add additional context in comments.

9 Comments

When I run main(), code is running but nothing happen, and I have to restart my kernel after a time. Sorry i'm very new with parallelization and python in general
Are you running Jupyter Notebook under Windows?
Spyder, anaconda. And when I use the ThreadPool solution, I got an error message TypeError: string indices must be integers when the function calls get_images_id
First, you should try running this in a standard .py file at the command prompt. I know that for Jupyter Notebook under Windows you would have to put function funcs in a .py file and import it else you would not be able to find it. See this answer.
As far as your TypeError, you need to get your program working first without threading or multiprocessing. Verify that start and end parameters to funcs are reasonable. If so, your problem has nothing to do with threading or multiprocessing, so why confuse the issue? You would then have to open a second question on SO.
|
0

I've modified your code a little bit, but I haven't run it yet. But I think my code will work for your problem. Ask me something it's not working for you.

from multiprocessing import Pool import multiprocessing as mp def funcs(tupl): start, end = tupl[0], tupl[1] tag=[] list_all_img_id=[] for i in range(start,end): tag=get_tags(all_model_id[i][0]) for l in range(0,len(tag)): list_all_img_id.append(get_images_id(tag[l][0],all_model_id[i][0])) return(list_all_img_id) def main(): all_model_id=get_models_id() len_all_model_id=len(all_model_id) div_total = int(len_all_model_id / 3) rest_div_total = len_all_model_id%3 lst_args = [(0, div_total), (div_total, div_total*2), (div_total*2, div_total*3 + rest_div_total + 1)] pool = mp.Pool(processes=3) res = pool.map(funcs, list_args) # you can loop through res to get your results if __name__ == "__main__": main() 

1 Comment

You could have (should have) left the arguments to funcs as they were and used pool.starmap. And if the computer has 16 CPUs, isn't creating 16 processes, an expensive operation, excessive just to run 3 tasks?
0

Try this out, using concurrent.futures module.

ThreadPoolExecutor(max_workers = 10) (you can specify maximum workers).

Moreover if you want multiple Processes instead of Threads. You can simply replace ThreadPoolExecutor with ProcessPoolExecutor

tag=[] all_model_id=get_models_id() liste_all_img_id=[] def func(model_id): tag = get_tags(model_id[0]) for l in range(0,len(tag)): liste_all_img_id.append(get_images_id(tag[l][0],model_id[0])) with concurrent.futures.ThreadPoolExecutor() as executor: executor.map(func, all_model_id) 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.