0

I have a multiprocessor program. Each process takes a number from data, and then inserts it into__queue_out.

But there is a problem: When the last process starts, an endless cycle begins and all the processes just die

import time import threading import random from queue import Queue, PriorityQueue from multiprocessing import Pool, Process data = range(1, 1001) start = time.time() end_date = time.time() - start class Worker(Process): counter = -1 def __init__(self, queue_in, queue_out): super(Worker, self).__init__() self._daemon = Process().daemon # self.setDaemon(True) self.__queue_in = queue_in self.__queue_out = queue_out def run(self): while True: job = self.__queue_in.get() Worker.counter += 1 num = Worker.counter print('Take: ', self.name, job) print('Complete: ', self.name, job) self.__queue_out.put((num, job)) self.__queue_in.task_done() queue = Queue() res = PriorityQueue() for i in data: queue.put(i) for i in range(1): w = Worker(queue, res) w.start() queue.join() out = [] while not res.empty(): out.append(res.get()[1]) print(out) print(end_date) 
7
  • 2
    Are you familiar with the Global Interpreter Lock (GIL) in python? realpython.com/python-gil/… Commented Apr 16, 2020 at 6:36
  • @mrzo Yes, I read about the GIL. I think that you can bypass it and speed up the program using multiprocessing, but I don’t know how to redo my code correctly. Commented Apr 16, 2020 at 6:49
  • Because of the GIL, adding more threads does nothing but increase the overhead of doing the processing because threads do not really run concurrently in Python for the most part. The exceptions are when they do I/O or call external modules written in some other language. Commented Apr 16, 2020 at 7:17
  • @martineau OK, I get that. How can I redo the code so that instead of threads it starts processes? Commented Apr 16, 2020 at 7:19
  • I recommend concurrent.futures.ProcessPoolExecutor. You can also use the multiprocessing module directly. Commented Apr 16, 2020 at 7:22

1 Answer 1

1

Here's how to do it with the multiprocessing module. Note I had to change the use of queues to multiprocessing.Queue and multiprocessing.JoinableQueue. Also note that there is no multiprocessing.PriorityQueue, so I changed it to be a regular one — however it looks like there may be a relatively easy workaround — see Strange Queue.PriorityQueue behaviour with multiprocessing in Python 2.7.6 (even though it's an old question).

#from queue import PriorityQueue from multiprocessing import JoinableQueue, Pool, Process, Queue import time class Worker(Process): counter = -1 def __init__(self, queue_in, queue_out): super().__init__(daemon=True) self._queue_in = queue_in self._queue_out = queue_out def run(self): while True: job = self._queue_in.get() Worker.counter += 1 num = Worker.counter # print('Take:: ', self.name, job) # print('Complete: ', self.name, job) self._queue_out.put((num, job)) self._queue_in.task_done() if __name__ == '__main__': start = time.time() data = range(1, 1001) queue = JoinableQueue() # res = PriorityQueue() # No multiprocessing.PriorityQueue. res = Queue() for i in data: queue.put(i) for i in range(1): w = Worker(queue, res) w.start() queue.join() out = [] while not res.empty(): out.append(res.get()[1]) print(out) print('elapsed time:', time.time() - start) 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.