427

Is there a Pool class for worker threads, similar to the multiprocessing module's Pool class?

I like for example the easy way to parallelize a map function

def long_running_func(p): c_func_no_gil(p) p = multiprocessing.Pool(4) xs = p.map(long_running_func, range(100)) 

however I would like to do it without the overhead of creating new processes.

I know about the GIL. However, in my usecase, the function will be an IO-bound C function for which the python wrapper will release the GIL before the actual function call.

Do I have to write my own threading pool?

4
  • Here's something that looks promising over in the Python Cookbook: Recipe 576519: Thread pool with same API as (multi)processing.Pool (Python) Commented Jun 24, 2010 at 14:55
  • 3
    Nowadays it's built-in: from multiprocessing.pool import ThreadPool. Commented Dec 10, 2012 at 20:44
  • Can you elaborate on this I know about the GIL. However, in my usecase, the function will be an IO-bound C function for which the python wrapper will release the GIL before the actual function call. ? Commented Jun 24, 2019 at 10:48
  • @mrgloom stackoverflow.com/questions/1294382 Commented Sep 1, 2019 at 20:16

11 Answers 11

555

I just found out that there actually is a thread-based Pool interface in the multiprocessing module, however it is hidden somewhat and not properly documented.

It can be imported via

from multiprocessing.pool import ThreadPool 

It is implemented using a dummy Process class wrapping a python thread. This thread-based Process class can be found in multiprocessing.dummy which is mentioned briefly in the docs. This dummy module supposedly provides the whole multiprocessing interface based on threads.

Sign up to request clarification or add additional context in comments.

16 Comments

That's awesome. I had a problem creating ThreadPools outside the main thread, you can use them from a child thread once created though. I put an issue in for it: bugs.python.org/issue10015
I don't get it why this class has no documentation. Such helper classes are so important nowadays.
@Wernight: it isn't public primarily because nobody has offered a patch that provides it (or something similar) as threading.ThreadPool, including documentation and tests. It would indeed be a good battery to include in the standard library, but it won't happen if nobody writes it. One nice advantage of this existing implementation in multiprocessing, is that it should make any such threading patch much easier to write (docs.python.org/devguide)
@daniel.gindi: multiprocessing.dummy.Pool/multiprocessing.pool.ThreadPool are the same thing, and are both thread pools. They mimic the interface of a process pool, but they are implemented entirely in terms of threading. Reread the docs, you got it backwards.
@daniel.gindi: Read further: "multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module." multiprocessing in general is about processes, but to allow switching between processes and threads, they (mostly) replicated the multiprocessing API in multiprocessing.dummy, but backed with threads, not processes. The goal is to allow you to do import multiprocessing.dummy as multiprocessing to change process-based code to thread-based.
|
286

In Python 3 you can use concurrent.futures.ThreadPoolExecutor, i.e.:

executor = ThreadPoolExecutor(max_workers=10) a = executor.submit(my_function) 

See the docs for more info and examples.

6 Comments

in order to use the backported futures module, run sudo pip install futures
it's the most efficient and fastest way for multi processing
What is the difference between using ThreadPoolExecutor and multiprocessing.dummy.Pool?
concurrent.futures is as of the time of Python 3.9 / beginning of 3.10 is a very problematic library. It looks like it's overrun by bugs that aren't getting proper fixes. Perhaps, the whole premise of this library was bad. I'm more familiar with the process-based part of this library, where there's no end to reasons why the pool would hang up forever, swallow errors and misbehave in other ways. I would stay away from this library as much as possible.
Some gotchas and limitations of concurrent.futures are described here: github.com/yeraydiazdiaz/futureproof#readme
|
73

Yes, and it seems to have (more or less) the same API.

import multiprocessing def worker(lnk): .... def start_process(): ..... .... if(PROCESS): pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process) else: pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, initializer=start_process) pool.map(worker, inputs) .... 

4 Comments

Import path for ThreadPool is different from Pool. Correct import is from multiprocessing.pool import ThreadPool.
Strangely this is not a documented API, and multiprocessing.pool is only briefly mentioned as providing AsyncResult. But it is available in 2.x and 3.x.
This is what I was looking for. It's just a single import line and a small change to my existing pool line and it works perfectly.
this is super easy way of doing multithread or multicore thanks.
52

For something very simple and lightweight (slightly modified from here):

from Queue import Queue from threading import Thread class Worker(Thread): """Thread executing tasks from a given tasks queue""" def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: func, args, kargs = self.tasks.get() try: func(*args, **kargs) except Exception, e: print e finally: self.tasks.task_done() class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads): self.tasks = Queue(num_threads) for _ in range(num_threads): Worker(self.tasks) def add_task(self, func, *args, **kargs): """Add a task to the queue""" self.tasks.put((func, args, kargs)) def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join() if __name__ == '__main__': from random import randrange from time import sleep delays = [randrange(1, 10) for i in range(100)] def wait_delay(d): print 'sleeping for (%d)sec' % d sleep(d) pool = ThreadPool(20) for i, d in enumerate(delays): pool.add_task(wait_delay, d) pool.wait_completion() 

To support callbacks on task completion you can just add the callback to the task tuple.

4 Comments

how can the threads ever join if they unconditionally infinite loop?
@JosephGarvin I've tested it, and the threads keep blocking on an empty queue(since the call to Queue.get() is blocking) till the program ends, after which they are terminated automatically.
@JosephGarvin, good question. Queue.join() will actually join the task queue, not worker threads. So, when queue is empty, wait_completion returns, program ends, and threads are reaped by the OS.
If all of this code is wrapped up into a neat function it doesn't seem to be stopping threads even when the queue is empty and pool.wait_completion() returns. The result is that threads just keep building.
27

Hi to use the thread pool in Python you can use this library :

from multiprocessing.dummy import Pool as ThreadPool 

and then for use, this library do like that :

pool = ThreadPool(threads) results = pool.map(service, tasks) pool.close() pool.join() return results 

The threads are the number of threads that you want and tasks are a list of task that most map to the service.

3 Comments

Thanks, that is a great suggestion! From the docs: multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module. One correction - I think you want to say that the pool api is (function,iterable)
We missed the .close() and .join() calls and that causes .map() to finish before all the threads are finished. Just a warning.
Great solution ,super elegant!
18

Yes, there is a threading pool similar to the multiprocessing Pool, however, it is hidden somewhat and not properly documented. You can import it by following way:-

from multiprocessing.pool import ThreadPool 

Just I show you simple example

def test_multithread_stringio_read_csv(self): # see gh-11786 max_row_range = 10000 num_files = 100 bytes_to_df = [ '\n'.join( ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)] ).encode() for j in range(num_files)] files = [BytesIO(b) for b in bytes_to_df] # read all files in many threads pool = ThreadPool(8) results = pool.map(self.read_csv, files) first_result = results[0] for result in results: tm.assert_frame_equal(first_result, result) 

1 Comment

imo this should be the accepted answer
15

Here's the result I finally ended up using. It's a modified version of the classes by dgorissen above.

File: threadpool.py

from queue import Queue, Empty import threading from threading import Thread class Worker(Thread): _TIMEOUT = 2 """ Thread executing tasks from a given tasks queue. Thread is signalable, to exit """ def __init__(self, tasks, th_num): Thread.__init__(self) self.tasks = tasks self.daemon, self.th_num = True, th_num self.done = threading.Event() self.start() def run(self): while not self.done.is_set(): try: func, args, kwargs = self.tasks.get(block=True, timeout=self._TIMEOUT) try: func(*args, **kwargs) except Exception as e: print(e) finally: self.tasks.task_done() except Empty as e: pass return def signal_exit(self): """ Signal to thread to exit """ self.done.set() class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads, tasks=[]): self.tasks = Queue(num_threads) self.workers = [] self.done = False self._init_workers(num_threads) for task in tasks: self.tasks.put(task) def _init_workers(self, num_threads): for i in range(num_threads): self.workers.append(Worker(self.tasks, i)) def add_task(self, func, *args, **kwargs): """Add a task to the queue""" self.tasks.put((func, args, kwargs)) def _close_all_threads(self): """ Signal all threads to exit and lose the references to them """ for workr in self.workers: workr.signal_exit() self.workers = [] def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join() def __del__(self): self._close_all_threads() def create_task(func, *args, **kwargs): return (func, args, kwargs) 

To use the pool

from random import randrange from time import sleep delays = [randrange(1, 10) for i in range(30)] def wait_delay(d): print('sleeping for (%d)sec' % d) sleep(d) pool = ThreadPool(20) for i, d in enumerate(delays): pool.add_task(wait_delay, d) pool.wait_completion() 

5 Comments

Annotion for other readers: This code is Python 3 (shebang #!/usr/bin/python3)
Why do you use for i, d in enumerate(delays): and then ignore the i value?
@martineau - probably just a relic from development where they probably wanted to print i during a run.
Why is create_task there? What is it for?
I can't believe and answer with 4 votes on SO is the way to do ThreadPooling in Python. The Threadpool in the official python distribution is still broken? What am I missing?
6

another way can be adding the process to thethread queue pool

import concurrent.futures with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor: for i in range(10): a = executor.submit(arg1, arg2,....) 

Comments

4

The overhead of creating the new processes is minimal, especially when it's just 4 of them. I doubt this is a performance hot spot of your application. Keep it simple, optimize where you have to and where profiling results point to.

1 Comment

If the questioner is under Windows (which I do not believe he specified), then I think that process spinup can be a significant expense. At least it is on the projects that I have been recently doing. :-)
3

There is no built in thread based pool. However, it can be very quick to implement a producer/consumer queue with the Queue class.

From: https://docs.python.org/2/library/queue.html

from threading import Thread from Queue import Queue def worker(): while True: item = q.get() do_work(item) q.task_done() q = Queue() for i in range(num_worker_threads): t = Thread(target=worker) t.daemon = True t.start() for item in source(): q.put(item) q.join() # block until all tasks are done 

3 Comments

This is no longer the case with the concurrent.futures module.
I don't think this is true at all anymore. from multiprocessing.pool import ThreadPool
0

If you don't mind executing other's code, here's mine:

Note: There is lot of extra code you may want to remove [added for better clarificaiton and demonstration how it works]

Note: Python naming conventions were used for method names and variable names instead of camelCase.

Working procedure:

  1. MultiThread class will initiate with no of instances of threads by sharing lock, work queue, exit flag and results.
  2. SingleThread will be started by MultiThread once it creates all instances.
  3. We can add works using MultiThread (It will take care of locking).
  4. SingleThreads will process work queue using a lock in middle.
  5. Once your work is done, you can destroy all threads with shared boolean value.
  6. Here, work can be anything. It can automatically import (uncomment import line) and process module using given arguments.
  7. Results will be added to results and we can get using get_results

Code:

import threading import queue class SingleThread(threading.Thread): def __init__(self, name, work_queue, lock, exit_flag, results): threading.Thread.__init__(self) self.name = name self.work_queue = work_queue self.lock = lock self.exit_flag = exit_flag self.results = results def run(self): # print("Coming %s with parameters %s", self.name, self.exit_flag) while not self.exit_flag: # print(self.exit_flag) self.lock.acquire() if not self.work_queue.empty(): work = self.work_queue.get() module, operation, args, kwargs = work.module, work.operation, work.args, work.kwargs self.lock.release() print("Processing : " + operation + " with parameters " + str(args) + " and " + str(kwargs) + " by " + self.name + "\n") # module = __import__(module_name) result = str(getattr(module, operation)(*args, **kwargs)) print("Result : " + result + " for operation " + operation + " and input " + str(args) + " " + str(kwargs)) self.results.append(result) else: self.lock.release() # process_work_queue(self.work_queue) class MultiThread: def __init__(self, no_of_threads): self.exit_flag = bool_instance() self.queue_lock = threading.Lock() self.threads = [] self.work_queue = queue.Queue() self.results = [] for index in range(0, no_of_threads): thread = SingleThread("Thread" + str(index+1), self.work_queue, self.queue_lock, self.exit_flag, self.results) thread.start() self.threads.append(thread) def add_work(self, work): self.queue_lock.acquire() self.work_queue._put(work) self.queue_lock.release() def destroy(self): self.exit_flag.value = True for thread in self.threads: thread.join() def get_results(self): return self.results class Work: def __init__(self, module, operation, args, kwargs={}): self.module = module self.operation = operation self.args = args self.kwargs = kwargs class SimpleOperations: def sum(self, *args): return sum([int(arg) for arg in args]) @staticmethod def mul(a, b, c=0): return int(a) * int(b) + int(c) class bool_instance: def __init__(self, value=False): self.value = value def __setattr__(self, key, value): if key != "value": raise AttributeError("Only value can be set!") if not isinstance(value, bool): raise AttributeError("Only True/False can be set!") self.__dict__[key] = value # super.__setattr__(key, bool(value)) def __bool__(self): return self.value if __name__ == "__main__": multi_thread = MultiThread(5) multi_thread.add_work(Work(SimpleOperations(), "mul", [2, 3], {"c":4})) while True: data_input = input() if data_input == "": pass elif data_input == "break": break else: work = data_input.split() multi_thread.add_work(Work(SimpleOperations(), work[0], work[1:], {})) multi_thread.destroy() print(multi_thread.get_results()) 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.