36

I am trying to use a worker Pool in python using Process objects. Each worker (a Process) does some initialization (takes a non-trivial amount of time), gets passed a series of jobs (ideally using map()), and returns something. No communication is necessary beyond that. However, I can't seem to figure out how to use map() to use my worker's compute() function.

from multiprocessing import Pool, Process class Worker(Process): def __init__(self): print 'Worker started' # do some initialization here super(Worker, self).__init__() def compute(self, data): print 'Computing things!' return data * data if __name__ == '__main__': # This works fine worker = Worker() print worker.compute(3) # workers get initialized fine pool = Pool(processes = 4, initializer = Worker) data = range(10) # How to use my worker pool? result = pool.map(compute, data) 

Is a job queue the way to go instead, or can I use map()?

3
  • All process objects are stateful. You might want to remove that word from the title. Also. compute is a method of a Worker. In the examples it's usually a completely stand-alone function. Why not write the compute function to simply include both initialization and processing? Commented Jan 27, 2012 at 19:48
  • 1
    Fair enough, thanks. The initialization takes a long time, so I only want to do it once per worker process. Commented Jan 27, 2012 at 20:14
  • You must want to emphasize the "gets passed a series of jobs" part of the question. Since that wasn't obvious. Commented Jan 27, 2012 at 20:19

3 Answers 3

68

I would suggest that you use a Queue for this.

class Worker(Process): def __init__(self, queue): super(Worker, self).__init__() self.queue = queue def run(self): print('Worker started') # do some initialization here print('Computing things!') for data in iter(self.queue.get, None): # Use data 

Now you can start a pile of these, all getting work from a single queue

request_queue = Queue() for i in range(4): Worker(request_queue).start() for data in the_real_source: request_queue.put(data) # Sentinel objects to allow clean shutdown: 1 per worker. for i in range(4): request_queue.put(None) 

That kind of thing should allow you to amortize the expensive startup cost across multiple workers.

Sign up to request clarification or add additional context in comments.

5 Comments

That's what I figured, thanks! I ended up using a job queue (input) and result queue (output) to synchronize everything.
you example is awesome, i try right now how to input the sentinel objects when strg + c is pressed without an exepction
@S.Lott: Isn't it that Queue isn't pickle-able? and that's why you use multiprocessing.Manager().Queue?
It is really much more customizable than the default multiprocessing.Pool()!!!
This way will create 4 separate processes. If you want a pool of workers to run in a single process, use Pool docs.python.org/2/library/…
7

initializer expects an arbitrary callable that does initilization e.g., it can set some globals, not a Process subclass; map accepts an arbitrary iterable:

#!/usr/bin/env python import multiprocessing as mp def init(val): print('do some initialization here') def compute(data): print('Computing things!') return data * data def produce_data(): yield -100 for i in range(10): yield i yield 100 if __name__=="__main__": p = mp.Pool(initializer=init, initargs=('arg',)) print(p.map(compute, produce_data())) 

Comments

2

Since python 3.3 you can use starmap, also for using multiple arguments AND getting back the results in a very simplistic syntax:

import multiprocessing nb_cores = multiprocessing.cpu_count() def caps(nb, letter): print('Exec nb:', nb) return letter.upper() if __name__ == '__main__': multiprocessing.freeze_support() # for Windows, also requires to be in the statement: if __name__ == '__main__' input_data = ['a','b','c','d','e','f','g','h'] input_order = [1,2,3,4,5,6,7,8,9] with multiprocessing.Pool(processes=nb_cores) as pool: # auto closing workers results = pool.starmap(caps, zip(input_order, input_data)) print(results) 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.