24

The requirement is to start five threads, and wait only in the fastest thread. All five threads went to look for the same data 5 directions, and one is enough to continue the control flow.

Actually, I need to wait for the first two threads to return, to verify against each other. But I guess if I know how to wait for the fastest. I can figure out how to wait for the second-fastest.

A lot talk about join(timeout), but you don't know in advance which one to wait (which one to apply join in advance).

5 Answers 5

31

Use a queue: each thread when completed puts the result on the queue and then you just need to read the appropriate number of results and ignore the remainder:

#!python3.3 import queue # For Python 2.x use 'import Queue as queue' import threading, time, random def func(id, result_queue): print("Thread", id) time.sleep(random.random() * 5) result_queue.put((id, 'done')) def main(): q = queue.Queue() threads = [ threading.Thread(target=func, args=(i, q)) for i in range(5) ] for th in threads: th.daemon = True th.start() result1 = q.get() result2 = q.get() print("Second result: {}".format(result2)) if __name__=='__main__': main() 

Documentation for Queue.get() (with no arguments it is equivalent to Queue.get(True, None):

Queue.get([block[, timeout]])

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Sign up to request clarification or add additional context in comments.

6 Comments

Won't this raise an Empty exception if the Queue is empty when you do q.get()?
@Michael, the default for q.get() is to do a blocking get, so no it won't throw an exception instead it will block the main thread until there is a result available.
@Duncan Shouldn't one after the two q.get()'s terminate the active threads in order to finish cleanly?
@PeterMueller for this example it isn't needed: they aren't daemon threads so the script will wait for them to terminate before it exits. For real code though you might well want to wait.
@CharlieOConor if you only want to wait for the first result then call q.get() once and use that result.
|
7

If you have some sort of processing loop in your threads, the following code will terminate them when one terminates by using a threading.Event():

def my_thread(stop_event): while not stop_event.is_set(): # do stuff in a loop # some check if stuff is complete if stuff_complete: stop_event.set() break def run_threads(): # create a thread event a_stop_event = threading.Event() # spawn the threads for x in range(5): t = threading.Thread(target=my_thread, args=[a_stop_event]) t.start() while not a_stop_event.is_set(): # wait for an event time.sleep(0.1) print "At least one thread is done" 

If your process is "cheap" or a single request-response type thread (i.e. for instance an async HTTP request) then Duncan's answer is a good approach.

1 Comment

Can't threading.Thread.is_alive() function be used as a replacement to the threading.Event()?
2

You can use an event for this. See http://docs.python.org/2/library/threading.html#event-objects The idea is that the worker threads raise an event when they are finished. The main thread waits for this event before continueing. The worker thread can set a (mutexed) variable to identify itself with the event.

Comments

2

Duncan's method is probably the best and is what I would recommend. I've been mildly annoyed by the lack of "wait for next completed thread to complete" before, though, so I just wrote this up to try it out. Seems to work. Simply use MWThread in place of threading.thread and you get this new wait_for_thread function.

The global variables are a bit klunky; an alternative would be to make them class-level variables. But if this is hidden in a module (mwthread.py or whatever) it should be fine either way.

#! /usr/bin/env python # Example of how to "wait for" / join whichever threads is/are done, # in (more or less) the order they're done. import threading from collections import deque _monitored_threads = [] _exited_threads = deque() _lock = threading.Lock() _cond = threading.Condition(_lock) class MWThread(threading.Thread): """ multi-wait-able thread, or monitored-wait-able thread """ def run(self): tid = threading.current_thread() try: with _lock: _monitored_threads.append(tid) super(MWThread, self).run() finally: with _lock: _monitored_threads.remove(tid) _exited_threads.append(tid) _cond.notifyAll() def wait_for_thread(timeout=None): """ Wait for some thread(s) to have finished, with optional timeout. Return the first finished thread instance (which is removed from the finished-threads queue). If there are no unfinished threads this returns None without waiting. """ with _cond: if not _exited_threads and _monitored_threads: _cond.wait(timeout) if _exited_threads: result = _exited_threads.popleft() else: result = None return result def main(): print 'testing this stuff' def func(i): import time, random sleeptime = (random.random() * 2) + 1 print 'thread', i, 'starting - sleep for', sleeptime time.sleep(sleeptime) print 'thread', i, 'finished' threads = [MWThread(target=func, args=(i,)) for i in range(3)] for th in threads: th.start() i = 0 while i < 3: print 'main: wait up to .5 sec' th = wait_for_thread(.5) if th: print 'main: got', th th.join() i += 1 else: print 'main: timeout' print 'I think I collected them all' print 'result of wait_for_thread():' print wait_for_thread() if __name__ == '__main__': main() 

Comments

1

Or just keep track of all finished threads in a list and let the second thread to finish handle whatever is supposed to be done, Python lists are threadsafe.

finished_threads = [] event = threading.Event() def func(): do_important_stuff() thisthread = threading.current_thread() finished_threads.append(thisthread) if len(finished_threads) > 1 and finished_threads[1] == thisthread: #yay we are number two! event.set() for i in range(5): threading.Thread(target=func).start() event.wait() 

4 Comments

This doesn't answer the bit about the main thread waiting until two threads are complete and then continuing: instead you've transferred all the remaining activity onto the second thread to complete which may not be what is wanted.
true; handle_two_threads_done() should probably set on an Event instead. Edited.
Ummm, Python lists are threadsafe? Really? I thought one needed to use a Queue() for thread coherency!
The lists are safe, their data is not (like other Python builtins). In this case, they are safe because there is no race condition. If you need to synchronize on a list, you need a Queue. I try to avoid having to synchronize though.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.