2

`This code is an attempt to use a queue to feed tasks to a number worker processes.

I wanted to time the difference in speed between different number of process and different methods for handling data.

But the output is not doing what I thought it would.

from multiprocessing import Process, Queue import time result = [] base = 2 data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 23, 45, 76, 4567, 65423, 45, 4, 3, 21] # create queue for new tasks new_tasks = Queue(maxsize=0) # put tasks in queue print('Putting tasks in Queue') for i in data: new_tasks.put(i) # worker function definition def f(q, p_num): print('Starting process: {}'.format(p_num)) while not q.empty(): # mimic some process being done time.sleep(0.05) print(q.get(), p_num) print('Finished', p_num) print('initiating processes') processes = [] for i in range(0, 2): if __name__ == '__main__': print('Creating process {}'.format(i)) p = Process(target=f, args=(new_tasks, i)) processes.append(p) #record start time start = time.time() # start process for p in processes: p.start() # wait for processes to finish processes for p in processes: p.join() #record end time end = time.time() # print time result print('Time taken: {}'.format(end-start)) 

I expect this:

Putting tasks in Queue initiating processes Creating process 0 Creating process 1 Starting process: 1 Starting process: 0 1 1 2 0 3 1 4 0 5 1 6 0 7 1 8 0 9 1 10 0 11 1 23 0 45 1 76 0 4567 1 65423 0 45 1 4 0 3 1 21 0 Finished 1 Finished 0 Time taken: <some-time> 

But instead I actually get this:

Putting tasks in Queue initiating processes Creating process 0 Creating process 1 Time taken: 0.01000523567199707 Putting tasks in Queue Putting tasks in Queue initiating processes Time taken: 0.0 Starting process: 1 initiating processes Time taken: 0.0 Starting process: 0 1 1 2 0 3 1 4 0 5 1 6 0 7 1 8 0 9 1 10 0 11 1 23 0 45 1 76 0 4567 1 65423 0 45 1 4 0 3 1 21 0 Finished 0 

There seem to be two major problems, I am not sure how related they are:

  1. The print statements such as: Putting tasks in Queue initiating processes Time taken: 0.0 are repeated systematically though out the code - I say systematically becasue they repeat exactly every time.

  2. The second process never finishes, it never recognizes the queue is empty and therefore fails to exit

6
  • 1
    I sounds like you have code formatting problems: You should only ever have one Time taken:... printout. Commented Aug 8, 2017 at 15:18
  • 1
    Plus you should never poll q.empty() since a greedy thread might steal the last item and leave all the other threads waiting for items that will never appear. What you should use is an end of queue marker. One per thread. Commented Aug 8, 2017 at 15:21
  • Otherwise this is a good question. You have shown some effort in writing code and collecting output and shown what you were expecting to happen. Commented Aug 8, 2017 at 15:27
  • @quamrana you where right, polling q.empty() was the solution to 2 Commented Aug 8, 2017 at 16:50
  • 1
    You can be PEP8 compliant, but an extra or missing indentation can completely change a python program. Commented Aug 8, 2017 at 18:35

2 Answers 2

2

1) I cannot reproduce this.

2) Look at the following code:

while not q.empty(): time.sleep(0.05) print(q.get(), p_num) 

Each line can be run in any order by any proces. Now consider q having a single item and two processes A and B. Now consider the following order of execution:

# A runs while not q.empty(): time.sleep(0.05) # B runs while not q.empty(): time.sleep(0.05) # A runs print(q.get(), p_num) # Removes and prints the last element of q # B runs print(q.get(), p_num) # q is now empty so q.get() blocks forever 

Swapping the order of time.sleep and q.get removes the blocking in all of my runs, but it's still possible to have more than one processes enter the loop with a single item left.

The way to fix this is using a non-blocking get call and catching the queue.Empty exception:

import queue while True: time.sleep(0.05) try: print(q.get(False), p_num) except queue.Empty: break 
Sign up to request clarification or add additional context in comments.

2 Comments

nice answer, well explained, any thoughts on the other half of my problem?
No, looking at your the code I don't see any way that line could be printed more than once. Maybe try coping the code from the question to see if there is any difference with the code you are running
1

Your worker threads should be like this:

def f(q, p_num): print('Starting process: {}'.format(p_num)) while True: value = q.get() if value is None: break # mimic some process being done time.sleep(0.05) print(value, p_num) print('Finished', p_num) 

And the queue should be filled with markers after the real data:

for i in data: new_tasks.put(i) for _ in range(num_of_threads): new_tasks.put(None) 

5 Comments

you have chosen to use if then break instead of try - except . Is this just a speed issue?
Also, what are the 'markers' my queue should be filled with? I cannot find a mention of them in the documentation on queues or multi-processing or multi-threading? (a link would be lovely, I don't expect you ti write an essay in the comments)
@quarana surely the items need to be in the queue in order to be past to the process properly without sharing or 'locking' ?
I've used None as a marker. You need to pass as many of these in as you have threads. Even if one thread is exceptionally greedy and consumes all queued items, it will find the first None and quit. Then all the rest of the lazy threads, when they get round to calling get() will each find a None and quit as they get there. The None markers can be pushed into the Queue at any time after the real data has gone in, but before you want the threads to quit.
It could be that @ikkuh has a better answer in regard to finding the end of the queue.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.