90

I'm trying to use multiprocessing with pandas dataframe, that is split the dataframe to 8 parts. apply some function to each part using apply (with each part processed in different process).

EDIT: Here's the solution I finally found:

import multiprocessing as mp import pandas.util.testing as pdt def process_apply(x): # do some stuff to data here def process(df): res = df.apply(process_apply, axis=1) return res if __name__ == '__main__': p = mp.Pool(processes=8) split_dfs = np.array_split(big_df,8) pool_results = p.map(aoi_proc, split_dfs) p.close() p.join() # merging parts processed by different processes parts = pd.concat(pool_results, axis=0) # merging newly calculated parts to big_df big_df = pd.concat([big_df, parts], axis=1) # checking if the dfs were merged correctly pdt.assert_series_equal(parts['id'], big_df['id']) 
6
  • 1
    @yemu what are you exactly trying to achieve by this code? Commented Nov 6, 2014 at 16:37
  • 1
    currently apply only saturates one core of the CPU. I want to use multiprocess and use all cores to decrease processing time Commented Nov 6, 2014 at 19:29
  • 5
    It would be nicer if you left the question alone and then put the answers in the answers. That way we can see more of the process without looking at the changelog. Commented Jun 9, 2015 at 19:49
  • 4
    should "aoi_proc" be "process"? Maybe renaming your "process" function to simply "f" would be more readable in the multiprocessing context Commented Nov 30, 2015 at 19:56
  • I'm puzzled as to what process_apply should look like. Mine is a function a function of the row. Something like: def process_apply(rw): return(rw['A']*rw['B']). Is that correct? Commented Jan 9, 2022 at 21:02

11 Answers 11

165

You can use https://github.com/nalepae/pandarallel, as in the following example:

from pandarallel import pandarallel from math import sin pandarallel.initialize() def func(x): return sin(x**2) df.parallel_apply(func, axis=1) 
Sign up to request clarification or add additional context in comments.

5 Comments

This answer should get more upvotes. The speed up is terrific.
This solution works on linux &macOS natively. On Windows, Pandaral·lel will works only if the Python session is executed from Windows Subsystem for Linux (WSL).
On windows, I get this error: ValueError: cannot find context for 'fork'
if you are using pandarallel with PyCharm, you can disable Run with Python Console
wow, amazing! I tried dask, swifter, mapply and none of them worked out of the box. this did - on the first try! thank you so much
63

A more generic version based on the author solution, that allows to run it on every function and dataframe:

from multiprocessing import Pool from functools import partial import numpy as np import pandas as pd def parallelize(data, func, num_of_processes=8): data_split = np.array_split(data, num_of_processes) pool = Pool(num_of_processes) data = pd.concat(pool.map(func, data_split)) pool.close() pool.join() return data def run_on_subset(func, data_subset): return data_subset.apply(func, axis=1) def parallelize_on_rows(data, func, num_of_processes=8): return parallelize(data, partial(run_on_subset, func), num_of_processes) 

So the following line:

df.apply(some_func, axis=1) 

Will become:

parallelize_on_rows(df, some_func) 

8 Comments

What about some_func with parameters?
@AlaaM. - you can use partial for that. docs.python.org/2/library/functools.html#functools.partial
@TomRaz how do I use a partial in this case when normally I would do something like this? dataframe.apply(lambda row: process(row.attr1, row.attr2, ...))
@frei - lambda functions cannot be used with multiprocessing, since they cannot be pickled. See more info here: stackoverflow.com/a/8805244/1781490 Can you use normal functions instead?
i see ok. that's the piece i needed to know whether i should just refactor the whole method or not
|
9

This is some code that I found useful. Automatically splits the dataframe into however many cpu cores you have.

import pandas as pd import numpy as np import multiprocessing as mp def parallelize_dataframe(df, func): num_processes = mp.cpu_count() df_split = np.array_split(df, num_processes) with mp.Pool(num_processes) as p: df = pd.concat(p.map(func, df_split)) return df def parallelize_function(df): df[column_output] = df[column_input].apply(example_function) return df def example_function(x): x = x*2 return x 

To run:

df_output = parallelize_dataframe(df, parallelize_function) 

Comments

6

This worked well for me:

rows_iter = (row for _, row in df.iterrows()) with multiprocessing.Pool() as pool: df['new_column'] = pool.map(process_apply, rows_iter) 

1 Comment

@Robert Handzel's solution is faster though
4

Since I don't have much of your data script, this is a guess, but I'd suggest using p.map instead of apply_async with the callback.

p = mp.Pool(8) pool_results = p.map(process, np.array_split(big_df,8)) p.close() p.join() results = [] for result in pool_results: results.extend(result) 

2 Comments

I had to put the call inside if name == 'main'. and with other small changes I managed to make it work, however I'm not sure if the result dataframes in pool results are returned in the same order as they were split. I have to check it.
see here for a solution with dask stackoverflow.com/questions/37979167/…
2

To use all (physical or logical) cores, you could try mapply as an alternative to swifter and pandarallel.

You can set the amount of cores (and the chunking behaviour) upon init:

import pandas as pd import mapply mapply.init(n_workers=-1) def process_apply(x): # do some stuff to data here def process(df): # spawns a pathos.multiprocessing.ProcessPool if sensible res = df.mapply(process_apply, axis=1) return res 

By default (n_workers=-1), the package uses all physical CPUs available on the system. If your system uses hyper-threading (usually twice the amount of physical CPUs would show up), mapply will spawn one extra worker to prioritise the multiprocessing pool over other processes on the system.

You could also use all logical cores instead (beware that like this the CPU-bound processes will be fighting for physical CPUs, which might slow down your operation):

import multiprocessing n_workers = multiprocessing.cpu_count() # or more explicit import psutil n_workers = psutil.cpu_count(logical=True) 

Comments

2

Python's pool.starmap() method can be used to succinctly introduce parallelism also to apply use cases where column values are passed as arguments, i.e. to cases like:

df.apply(lambda row: my_func(row["col_1"], row["col_2"], ...), axis=1) 

Full example and benchmarking:

import time from multiprocessing import Pool import numpy as np import pandas as pd def mul(a, b, c): # For illustration, could obviously be vectorized return a * b * c df = pd.DataFrame(np.random.randint(0, 100, size=(10_000_000, 3)), columns=list('ABC')) # Standard apply start = time.time() df["mul"] = df.apply(lambda row: mul(row["A"], row["B"], row["C"]), axis=1) print(f"Standard apply took {time.time() - start:.0f} seconds.") # Starmap apply start = time.time() with Pool(10) as pool: df["mul_pool"] = pool.starmap(mul, zip(df["A"], df["B"], df["C"])) print(f"Starmap apply took {time.time() - start:.0f} seconds.") pd.testing.assert_series_equal(df["mul"], df["mul_pool"], check_names=False) >>> Standard apply took 72 seconds. >>> Starmap apply took 5 seconds. 

This has the benefit of not relying on external libraries, plus being very readable.

Comments

0

I also run into the same problem when I use multiprocessing.map() to apply function to different chunk of a large dataframe.

I just want to add several points just in case other people run into the same problem as I do.

  1. remember to add if __name__ == '__main__':
  2. execute the file in a .py file, if you use ipython/jupyter notebook, then you can not run multiprocessing (this is true for my case, though I have no clue)

Comments

0

Install Pyxtension that simplifies using parallel map and use like this:

from pyxtension.streams import stream big_df = pd.concat(stream(np.array_split(df, multiprocessing.cpu_count())).mpmap(process)) 

Comments

0

I ended up using concurrent.futures.ProcessPoolExecutor.map in place of multiprocessing.Pool.map which took 316 microseconds for some code that took 12 seconds in serial.

Comments

0

Tom Raz's answer https://stackoverflow.com/a/53135031/11847090 misses an edge case where there are fewer rows in the dataframe than processes

use this parallelize method instead

def parallelize(data, func, num_of_processes=8): # check if the number of rows is less than the number of processes # to avoid the following error # ValueError: Expected a 1D array, got an array with shape num_rows = len(data) if num_rows == 0: return None elif num_rows < num_of_processes: num_of_processes = num_rows data_split = np.array_split(data, num_of_processes) pool = Pool(num_of_processes) data = pd.concat(pool.map(func, data_split)) pool.close() pool.join() return data 

and also I used dask bag to multithread this instead of this custom code

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.