1

I'm a noob with multiprocessing, and I'm trying to speed up an old algorithm of mine. It works perfectly fine, without multipocessing, but in the moment I try to implement it, the program stop working: it stands by untill I abort the script. Another issue is that it doesn't populate the dataframe: again, normally it works, but with multiprocessing it returns only NaN.

func works well.

stockUniverse = list(map(lambda s: s.strip(), Stocks)) #Stocks = list def func(i): df.at[i, 'A'] = 1 df.at[i, 'B'] = 2 df.at[i, 'C'] = 3 print(i, 'downloaded') return True if __name__ == "__main__": print('Start') pool = mp.Pool(mp.cpu_count()) pool.imap(func, stockUniverse) print(df) 

the result is:

Index 19 NaN NaN NaN index 20 NaN NaN NaN 

And then it stops there until I hit Ctrl+C. Thanks

8
  • You'll have to show us the code of stockUniverse Commented Jun 24, 2021 at 12:04
  • and the code of d.func - sorry Commented Jun 24, 2021 at 12:17
  • imap returns an iterator. You have to iterate the iterator to ensure that all the implied tasks within stockUniverse are submitted, executed and results are returned. Your second problem is that each process in the pool has its own address space and is not updating the df instance in the main process. Commented Jun 24, 2021 at 13:01
  • @Booboo imap will submit the jobs regardless of iterating the results or not. You are correct about the second problem which is the primary fault here. Commented Jun 24, 2021 at 14:46
  • 1
    @Aaron Thanks for your reply. Yes, imap will in general eventually submit the tasks but only if given the chance. What I meant is that in the code that the OP posted, there are calls to imap and print and then the program terminates and thus the tasks will not have had a chance to start by time the program ends (and certainly not by time the print function is called). One could insert a call to time.sleep before calling print guessing how long would be necessary for the tasks to start and finish, but I think we would agree that "iterating the iterator" is really the way to go. Commented Jun 24, 2021 at 19:48

1 Answer 1

1

The map function blocks until all the submitted tasks have completed and returns a list of the return values from the worker function. But the imap function returns immediately with an iterator that must be iterated to return the return values one by one as each becomes available. Your original code did not iterate that iterator but instead immediately printed out what it expected was the updated df. But you would not have given the tasks enough time to start and complete for df to have been modified. In theory if you had inserted before the print statement a call to time.sleep for a sufficiently long enough time, then the tasks would have started and completed before you printed out df. But clearly iterating the iterator is the most efficient way of being sure all tasks have completed and the only way of getting return values back.

But, as I mentioned in my comment, you have a much bigger problem. The tasks you submitted are executed by worker function func being called by processes in the process pool that you created, which are each executing in their own address space. You did not tag your question with the platform on which you are running (whenever you tag a question with multiprocessing, you are suppose to also tag the question with the platform), but I might infer that you are running under a platform that uses the spawn method to create new processes, such as Windows, and that is why you have the if __name__ == "__main__": block controlling code that creates new processes (i.e. the processing pool). When spawn is used to create new processes, a new, empty address space is created, a new Python interpreter is launched and the source is re-executed from the top (without the if __name__ == "__main__": block controlling code that creates new processes, you would get into an infinite, recursive loop creating new processes). But this means that any definition of df at global scope made outside the if __name__ == "__main__": block (which, you must have omitted if you are running under Windows) will be creating a new, separate instance for each process in the pool as each process is created.

If you are instead running under Linux, where fork is used to create new processes, the story is a bit different. The new processes will inherit the original address space from the main process and all declared variables, but copy on write is used. That means that once a subprocess attempts to modify any variable in this inherited storage, a copy of the page is made and the process will now be working on its own copy. So again, nothing can be shared for updating purposes.

You should therefore modify your program to have your worker function return values back to the main process, which will do the necessary updating:

import multiprocessing as mp import pandas as pd def func(stock): return (stock, (('A', 1), ('B', 1), ('C', 1))) if __name__ == "__main__": stockUniverse = ['abc', 'def', 'ghi', 'klm'] d = {col: pd.Series(index=stockUniverse, dtype='int32') for col in ['A', 'B', 'C']} df = pd.DataFrame(d) pool_size = min(mp.cpu_count(), len(stockUniverse)) pool = mp.Pool(pool_size) for result in pool.imap_unordered(func, stockUniverse): stock, col_values = result # unpack for col_value in col_values: col, value = col_value # unpack df.at[stock, col] = value print(df) 

Prints:

 A B C abc 1 1 1 def 1 1 1 ghi 1 1 1 klm 1 1 1 

Note that I have used imap_unordered instead of imap. The former method is allowed to return the results in arbitrary order (i.e. as they become available) and is generally more efficient and since the return value contains all the information required for setting the correct row of df, we no longer require any specific ordering.

But:

If your worker function is doing largely nothing but downloading from a website and very little CPU-intensive processing, then you could (should) be using a thread pool by making the simple substitution of:

from multiprocessing.pool import ThreadPool ... MAX_THREADS_TO_USE = 100 # or maybe even larger!!! pool_size = min(MAX_THREADS_TO_USE, len(stockUniverse)) pool = ThreadPool(pool_size) 

And since all threads share the same address space, you could use your original worker function, func as is!

Sign up to request clarification or add additional context in comments.

3 Comments

Thank you very much, very clear and comprehensive explanation. You have been infinitely kind to explain everything to me, don't worry about the length. Sorry if I made you crazy answering, I just started with multiprocessing. Your code works perfectly, just one question that remained there for me: why the script doesn't close? It remains there waiting, the program does not end. (Sorry if it's a stupid question, but I just don't understand it). Adding a pool.close() works fine, but even in the official documentation, it didn't seem like it was being used. Thanks again for your time.
There is no call to close() in case you wanted to submit more tasks to the pool (close prevents further tasks from being submitted and causes the processes to terminate). If you know that you are through with the pool, then you can certainly explicitly call either pool.close() or pool.terminate() (which stops worker processes immediately without completing outstanding work -- but there isn't any in this case) or implicitly call pool.terminate() by using the pool as a context manager: with Pool() as pool: in which case when the block exits, terminate() will be called. (... more)
But in the end, your program is about to terminate following the print(df) statement (assuming) there is reality no more code following. In that case pool will be garbage collected and when that happens pool.terminate() will be called, the processes will end, etc. So again there is no real need to do anything special. If there is, however, more code following and you want to release resources right away and you have no further use for the pool, then certainly you could call pool.close().

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.