I have a script to read a file that can be 10s of gigs big and i want to use multiprocessing to process it.
This is a compression algorithm where i want the user to define a buffer, then 3 processes will start, one to read the buffer amount of lines from the file, pass the lines to the processing process, then pass the processed lines to a process that writes the lines to a new file. I want all this to happen simultaneously, and for each process to wait for the next bundle of lines.
I already have the script, but when i run it, it doesn't end. I think something is wrong with the processes. I think it has to do with the islice in my read function, but i don't know how to write it better.
import multiprocessing as mp import time from itertools import islice def read(from_filename, buffer, process_queue): file = open(from_filename, 'r') slice = islice(file, buffer) while slice: to_process = [] for line in slice: to_process.append(line) process_queue.put(to_process) process_queue.put('kill') def write(to_filename, write_queue): to_file = open(to_filename, 'a+') while 1: to_write = write_queue.get() if to_write == 'kill': break to_file.write(to_write + '\n') def compress(process_queue, write_queue): while 1: to_process = process_queue.get() if to_process == 'kill': write_queue.put('kill') break # process, put output in to_write write_queue.put(to_write) def decompress(process_queue, write_queue): while 1: to_process = process_queue.get() if to_process == 'kill': write_queue.put('kill') break # process, put output in to_write write_queue.put(to_write) def main(): option = raw_input("C for Compress OR D for Decompress: ") from_file = raw_input("Enter input filename: ") buf = int(raw_input("Enter line buffer: ")) to_file = raw_input("Enter output filename: ") start = time.time() write_queue = mp.Queue() process_queue = mp.Queue() reader = mp.Process(target=read, args=(from_file, buf, process_queue)) writer = mp.Process(target=write, args=(to_file, write_queue)) if option == 'c' or option == 'C': processor = mp.Process(target=compress, args=(process_queue, write_queue)) elif option == 'd' or option == 'D': processor = mp.Process(target=decompress, args=(process_queue, write_queue)) else: print "Invalid Options..." writer.start() processor.start() reader.start() reader.join() processor.join() writer.join() end = time.time() elapsed = (end - start) print "\n\nTotal Time Elapsed: " + str(elapsed) + " secs" if __name__=='__main__': main() This is my first attempt at multiprocessing. When i run it, it doesn't end. I think a process is stuck somewhere.