1

I'm testing the possibility of reading a large CSV file simultaneously with different processes using pandas and the Python multiprocessing module.

There are some time savings, but they're pretty minimal. At first I thought maybe it had to do with how fast data can be read from a hard drive, but I don't think that's why because reading a large text file or a large Python pickle file is much faster.

Please see code below:

import pandas as pd import numpy as np import time from datetime import datetime import multiprocessing as mp from util import memchk import re FILE_LENGTH = 1000000 INFILE = 'rtest.1mX80.csv' def single(): df = pd.read_csv(INFILE) return df def now(): currentTime = datetime.now() formattedTime = f"{currentTime.hour}:{currentTime.minute:02}:{currentTime.second:02}.{currentTime.microsecond/1000:03.0f}" return formattedTime def process_name(): fullName = f"{mp.current_process()}" name = re.search(r'PoolWorker-\d', fullName).group() return name def read_chunk(skiprows, nrows): print(f'({now()} | {process_name()}) Starting to read a chunk...') start = time.perf_counter() df = pd.read_csv(INFILE, skiprows=skiprows, nrows=nrows) end = time.perf_counter() print(f"({now()} | {process_name()}) Read the chunk in {end-start:.2f} seconds!") return df def parallel(): nrows = int(FILE_LENGTH/4) skiprows = [i * nrows for i in range(4)] starmapArgs = zip(skiprows, [nrows] * 4) with mp.Pool(4) as pool: dfs = pool.starmap(read_chunk, starmapArgs) df = pd.concat(dfs, sort=False) return df def gen_df(nrows, ncols): colnames = [f"col{i}" for i in range(1, ncols+1)] df = pd.DataFrame(np.random.rand(nrows, ncols), columns=colnames) return df if __name__ == "__main__": gen_df(FILE_LENGTH, 80).to_csv('rtest.1mX80.csv', index=False) start = time.perf_counter() df1 = single() end = time.perf_counter() print(f"Finished reading file (singleprocessing) in {end-start:.2f} seconds.") start = time.perf_counter() df2 = parallel() end = time.perf_counter() print(f"Finished reading file (multiprocessing) in {end-start:.2f} seconds.") 

What is the reason why multiprocessing is only marginally faster when reading large files in pandas? Is it some kind of hardware limitation, or is it more closely related to the pandas implementation of read_csv?

2
  • 1
    A CSV is nothing more than a text file. The problem is the code. Instead of splitting the file in 4 and having each process read a chunk, all of them read the same file but some of them discard the rows they read. This means that the 4th process will read the entire file but only process the last quarter of it. Commented Aug 7, 2019 at 16:07
  • 1
    In any case using multiple processes will never read the file any faster precisely because there's only one disk that can serve that file. Using multiple processes is useful when a) there are a lot of files to process or b) processing costs much more than reading the file. Parsing a CSV isn't expensive. Commented Aug 7, 2019 at 16:13

1 Answer 1

1

In this case the problem has two parts;

  • Reading the data from disk.
  • Converting the rows to Pandas.

Normally speaking reading from disk is extremely slow.

The first possible solution to make that faster is to use an SSD. :-)

If that is not practical, another solution is to use mmap. Using mmap to read a file can speed things up significantly. When you start reading from an mmap-ed file, the OS should notice that and map the next page(s) into memory before you get there.

And this can be combined with multiprocessing.Pool. If you create the mmap before creating the Pool and a list of n offsets for the n workers, all the worker processes should have access to that. The workers then only have to start reading from a their offset to the next.

Edit: I would do reading the file with multiprocessing as follows.

  • Determine the file size (e.g. os.stat)
  • Open the file, seek to 1/4 of the file.
  • Read say 4 kB, find the first newline in there. Save the total offset of that newline.
  • Do the same for 1/2 of the file and 3/4 of the file.

Using this, create a list of 4 (start, end) file offset pairs. Give that list to Pool.map(). The worker should then read the data between the offsets, split it into lines and parse the CSV.

That would divide the CSV conversion without reading the whole file multiple times.

Sign up to request clarification or add additional context in comments.

6 Comments

Thanks, but it seems that using mmap had only a marginal impact. Perhaps part of the issue is that I already have a SSD. I created an mmap object outside of the Pool and yet what I notice is that the first chunk is read in 12 seconds, the second chunk in 14 seconds, the third chunk in 17 seconds, and the fourth chunk in 18 seconds. I think this is only a marginal improvement over using a regular file. Reading the whole file into a Pandas DataFrame takes 26 seconds.
Using mmap is an alternative for an SSD. If you already have an SSD, there is probably not much more performance to be had.
I think part of the issue is that, as Panagiotis said above, when I call pd.read_csv(filepath, skiprows=500000, nrows=250000) it has to read a good chunk of the data before getting to the rows that it actually needs to parse into a DataFrame. I might look into the source code for pd.read_csv and see if I can modify it so that it takes a list of lines. That way, I only have to read the file into memory once, and then I'll pass a list of 250000 lines into each of the four processes (assuming that the original file has 1 million rows).
@N4v I think there is a good way to chop up the file reading and CSV conversion for multiprocessing.Pool. See edit in the answer.
Thanks, I tried that and it works. Each process completes in the same amount of time. The total performance improvement ends up being only around 20-30% though, probably due to the limitation of how fast one can read data from a hard drive.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.