0

I have a script to read a file that can be 10s of gigs big and i want to use multiprocessing to process it.

This is a compression algorithm where i want the user to define a buffer, then 3 processes will start, one to read the buffer amount of lines from the file, pass the lines to the processing process, then pass the processed lines to a process that writes the lines to a new file. I want all this to happen simultaneously, and for each process to wait for the next bundle of lines.

I already have the script, but when i run it, it doesn't end. I think something is wrong with the processes. I think it has to do with the islice in my read function, but i don't know how to write it better.

import multiprocessing as mp import time from itertools import islice def read(from_filename, buffer, process_queue): file = open(from_filename, 'r') slice = islice(file, buffer) while slice: to_process = [] for line in slice: to_process.append(line) process_queue.put(to_process) process_queue.put('kill') def write(to_filename, write_queue): to_file = open(to_filename, 'a+') while 1: to_write = write_queue.get() if to_write == 'kill': break to_file.write(to_write + '\n') def compress(process_queue, write_queue): while 1: to_process = process_queue.get() if to_process == 'kill': write_queue.put('kill') break # process, put output in to_write write_queue.put(to_write) def decompress(process_queue, write_queue): while 1: to_process = process_queue.get() if to_process == 'kill': write_queue.put('kill') break # process, put output in to_write write_queue.put(to_write) def main(): option = raw_input("C for Compress OR D for Decompress: ") from_file = raw_input("Enter input filename: ") buf = int(raw_input("Enter line buffer: ")) to_file = raw_input("Enter output filename: ") start = time.time() write_queue = mp.Queue() process_queue = mp.Queue() reader = mp.Process(target=read, args=(from_file, buf, process_queue)) writer = mp.Process(target=write, args=(to_file, write_queue)) if option == 'c' or option == 'C': processor = mp.Process(target=compress, args=(process_queue, write_queue)) elif option == 'd' or option == 'D': processor = mp.Process(target=decompress, args=(process_queue, write_queue)) else: print "Invalid Options..." writer.start() processor.start() reader.start() reader.join() processor.join() writer.join() end = time.time() elapsed = (end - start) print "\n\nTotal Time Elapsed: " + str(elapsed) + " secs" if __name__=='__main__': main() 

This is my first attempt at multiprocessing. When i run it, it doesn't end. I think a process is stuck somewhere.

1 Answer 1

1

This piece of code is wrong:

def read(from_filename, buffer, process_queue): file = open(from_filename, 'r') slice = islice(file, buffer) while slice: to_process = [] for line in slice: to_process.append(line) process_queue.put(to_process) process_queue.put('kill') 

Since slice is a islice object the condition while slice will always be true, hence it's like having a while True there. You should re-create the slice object every time.

def read(from_filename, buffer, process_queue): file = open(from_filename, 'r') while True: slice = islice(file, buffer) to_process = [] for line in slice: to_process.append(line) process_queue.put(to_process) if not to_process: # input ended break process_queue.put('kill') 

Alternatively you could do:

def read_chunk(file, buffer): return [file.readline() for _ in xrange(buffer)] # or, "more" equivalent to using islice #return [line for i,line in itertools.izip(xrange(buffer), file)] def read(from_filename, buffer, process_queue): file = open(from_filename, 'r') for to_process in iter(lambda: read_chunk(file, buffer), []): process_queue.put(to_process) process_queue.put('kill') 

Note that it doesn't make sense to use itertools.islice if you have to build a list anyway.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.