0

I used this and this to run 2 function calls in parallel, but the times are barely improving. This is my code:

Sequential:

from nltk import pos_tag def posify(txt): return ' '.join([pair[1] for pair in pos_tag(txt.split())]) df1['pos'] = df1['txt'].apply(posify) # ~15 seconds df2['pos'] = df2['txt'].apply(posify) # ~15 seconds # Total Time: 30 seconds 

Parallel:

from nltk import pos_tag import multiprocessing def posify(txt): return ' '.join([pair[1] for pair in pos_tag(txt.split())]) def posify_parallel(ser, key_name, shared_dict): shared_dict[key_name] = ser.apply(posify) manager = multiprocessing.Manager() return_dict = manager.dict() p1 = multiprocessing.Process(target=posify_parallel, args=(df1['txt'], 'df1', return_dict)) p1.start() p2 = multiprocessing.Process(target=posify_parallel, args=(df2['txt'], 'df2', return_dict)) p2.start() p1.join(), p2.join() df1['pos'] = return_dict['df1'] df2['pos'] = return_dict['df2'] # Total Time: 27 seconds 

I would expect the total time to be about 15 seconds, but I'm getting 27 seconds.
If it makes any difference, I have an i7 2.6GHz CPU with 6 cores (12 logical).

Is it possible to achieve something around 15 seconds? Does this have something to do with the pos_tag function itself?


EDIT:

I ended up just doing the following and now it's 15 seconds:

with Pool(cpu_count()) as pool: df1['pos'] = pool.map(posify, df1['txt']) df2['pos'] = pool.map(posify, df2['txt']) 

I think this way the lines run sequentially, but each of them runs in parallel internally. As long as it's 15 seconds, that's fine with me.

2
  • Do you have a very large Dataframe? Commented Nov 11, 2021 at 12:34
  • @BrutusForcus - 9K rows each. Commented Nov 11, 2021 at 19:33

1 Answer 1

1

The more usual way of passing data back from processes is via a multiprocessing.Queue instance. Not knowing the particular details of your dataframe data and the results of your processing, I cannot quantify how much performance will be improved by switching from a managed dictionary, but the use of a queue should be more performant.

from nltk import pos_tag import multiprocessing def posify(txt): return ' '.join([pair[1] for pair in pos_tag(txt.split())]) def posify_parallel(ser, which_df, q): # Pass back the results along with which dataframe the results are for: q.put((which_df, ser.apply(posify))) q = multiprocessing.Queue() p1 = multiprocessing.Process(target=posify_parallel, args=(df1['txt'], 1, q)) p1.start() p2 = multiprocessing.Process(target=posify_parallel, args=(df2['txt'], 2, q)) p2.start() # Get the results: for _ in range(2): # Must do the gets before joing the processes! which_df, results = q.get() if which_df == 1: df1['pos'] = results else: # assert(which_df == 2) df2['pos'] = results p1.join() p2.join() 

To use a multiprocessing pool:

from nltk import pos_tag import multiprocessing def posify(txt): return ' '.join([pair[1] for pair in pos_tag(txt.split())]) def posify_parallel(ser): return ser.apply(posify) pool = multiprocessing.Pool(2) results1 = pool.apply_async(posify_parallel, args=(df1['txt'],)) results2 = pool.apply_async(posify_parallel, args=(df2['txt'],)) df1['pos'] = results1.get() df2['pos'] = results2.get() 
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks I tried your second option and it got me 21-22 seconds
So there is always going to be overhead in moving data from one address space (i.e. process) to another that you did not have in the non-parallel version, which could be an issue that only goes away when the the original data is in shared memory. I couldn't say, not being familiar with nltk, whether you have an additional issue.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.