0

I have extremely large files. Each file is almost 2GB. Therefore, I would like to run multiple files in parallel. And I can do that because all of the files have similar format therefore, file reading can be done in parallel. I know I should use multiprocessing library but I am really confused how to use it with my code.

My code for file reading is:

def file_reading(file,num_of_sample,segsites,positions,snp_matrix): with open(file,buffering=2000009999) as f: ###I read file here. I am not putting that code here. try: assert len(snp_matrix) == len(positions) return positions,snp_matrix ## return statement except: print('length of snp matrix and length of position vector not the same.') sys.exit(1) 

My main function is:

if __name__ == "__main__": segsites = [] positions = [] snp_matrix = [] path_to_directory = '/dataset/example/' extension = '*.msOut' num_of_samples = 162 filename = glob.glob(path_to_directory+extension) ###How can I use multiprocessing with function file_reading number_of_workers = 10 x,y,z = [],[],[] array_of_number_tuple = [(filename[file], segsites,positions,snp_matrix) for file in range(len(filename))] with multiprocessing.Pool(number_of_workers) as p: pos,snp = p.map(file_reading,array_of_number_tuple) x.extend(pos) y.extend(snp) 

So my input to the function is as follows:

  1. file - list containing filenames
  2. num_of_samples - int value
  3. segsites - initially an empty list to which I want to append as I am reading the file.
  4. positions - initially an empty list to which I want to append as I am reading the file.
  5. snp_matrix - initially an empty list to which I want to append as I am reading the file.

The function returns positions list and snp_matrix list at the end. How can I use multiprocessing for this where my arguments are lists and integer? The way I've used multiprocessing gives me following error:

TypeError: file_reading() missing 3 required positional arguments: 'segsites', 'positions', and 'snp_matrix'

2
  • For the future, might help to read: meta.stackoverflow.com/questions/290746/… (I think the question changed now quite a lot from the initial one - even though you might have wanted to ask the same thing, what was written was a bit different). So I deleted my answer as it does not make sense now... Commented Apr 30, 2019 at 9:14
  • Also, you should read stackoverflow.com/help/mcve and try to make your questions as close to what is described there. Commented Apr 30, 2019 at 9:15

1 Answer 1

1

The elements in the list that is being passed to the Pool.map are not automatically unpacked. You can generally only have one argument in your 'file_reading' function.

Of course, this argument can be a tuple, so it is no problem to unpack it yourself:

def file_reading(args): file, num_of_sample, segsites, positions, snp_matrix = args with open(file,buffering=2000009999) as f: ###I read file here. I am not putting that code here. try: assert len(snp_matrix) == len(positions) return positions,snp_matrix ## return statement except: print('length of snp matrix and length of position vector not the same.') sys.exit(1) if __name__ == "__main__": segsites = [] positions = [] snp_matrix = [] path_to_directory = '/dataset/example/' extension = '*.msOut' num_of_samples = 162 filename = glob.glob(path_to_directory+extension) number_of_workers = 10 x,y,z = [],[],[] array_of_number_tuple = [(filename[file], num_of_samples, segsites,positions,snp_matrix) for file in range(len(filename))] with multiprocessing.Pool(number_of_workers) as p: pos,snp = p.map(file_reading,array_of_number_tuple) x.extend(pos) y.extend(snp) 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.