1

so i just wrote a simple code with multiprocessing,Pool() and Queue() and when i execute it with this command it keeps open for ever

python3 m.py 

i can see that 5 of my Cpu cores finish the job and cpu usage decrease to normal but steel its not closing and i have to press Ctrl + c to exit from it.

here is my code :

 from multiprocessing import Queue,Pool import csv,json from itertools import chain def worker(line): j_string = json.dumps(line) worker.output_q.put(j_string) def worker_init(output_q): worker.output_q = output_q f_open = open('khodro','rt') f_csv = csv.reader(f_open) output_q = Queue() pool = Pool(5,worker_init,[output_q]) pool.imap(worker,chain(f_csv),1000) raise SystemExit() 
7
  • are you using windows? you'd need to add if __name__=="__main__" before creating the pool. Commented Feb 7, 2017 at 18:18
  • im on ubuntu 16.04 , i add this but the script just get closed without doing any processing, why should we write this before creating pool? Commented Feb 7, 2017 at 18:24
  • did you paste the full script? imap is not blocking and the script you posted generated 5 processes but the parent process immediately terminates and also makes the subprocesses terminate. Either you have a join() at the end or you consume the elements by for el in pool.imap, but the script as you have pasted does not do what you describe Commented Feb 7, 2017 at 20:55
  • @hansaplast this is the hole script , i didn't continue coding since i have this problem, putting pool.close() and pool.join() doesn't solve the problem, the script just fill a queue which is output_q that i initialized into worker Commented Feb 8, 2017 at 12:27
  • how big is khodro? Commented Feb 8, 2017 at 12:29

2 Answers 2

2

I don't know why exactly, but the problem lies in you filling the output queue with worker.output_q.put(j_string). If you remove that line the script does terminate. If you print out the line it is printing you see that it hangs after processing the last line. I guess it's because you don't explicetly close() the output queue at the end. In fact you can't because in worker you don't know if it is the last line or not.

Good news is that what you're trying to achieve with the output queue is done by imap itself when you return in the function:

from multiprocessing import Queue,Pool import csv,json from itertools import chain def worker(line): return json.dumps(line) f_open = open('generated.json','rt') f_csv = csv.reader(f_open) pool = Pool(5) for j_string in pool.imap(worker,f_csv,1000): print(j_string) 

The for loop iterates over the queue which is filled by imap, so internally it is (AFAIK) the same as you're trying to achieve.

Since imap is not blocking you can start processing the outputs of worker before all workers ran through.

Sign up to request clarification or add additional context in comments.

Comments

0

Accepted answer is correct but if you need queue based solution then use Manager Queue.

from multiprocessing import Manager mgr = Manager() output_q = mgr.Queue() 

I tested this on Python 3.6.8 it works.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.