22

I'd like to run multiple instances of program.py simultaneously, while limiting the number of instances running at the same time (e.g. to the number of CPU cores available on my system). For example, if I have 10 cores and have to do 1000 runs of program.py in total, only 10 instances will be created and running at any given time.

I've tried using the multiprocessing module, multithreading, and using queues, but there's nothing that seemed to me to lend itself to an easy implementation. The biggest problem I have is finding a way to limit the number of processes running simultaneously. This is important because if I create 1000 processes at once, it becomes equivalent to a fork bomb. I don't need the results returned from the processes programmatically (they output to disk), and the processes all run independently of each other.

Can anyone please give me suggestions or an example of how I could implement this in python, or even bash? I'd post the code I've written so far using queues, but it doesn't work as intended and might already be down the wrong path.

Many thanks.

6
  • 2
    Have you tried Python process pools? Commented Aug 16, 2012 at 23:04
  • The simplest way to do this is to create a "controller" program that creates the multiprocessing.pool and spawns the worker (program.py) threads, reallocating work as instances finish. Commented Aug 16, 2012 at 23:04
  • Thanks, I'll try this; in my first attempt for some reason I came to the conclusion that multiprocessing.pool wasn't what I wanted, but now it seems right. So in this case, worker threads would just spawn program.py (as a thread? with subprocess.Popen)? Could you please post a rough example or template implementation I could follow? Commented Aug 16, 2012 at 23:14
  • Like most of the Python docs, there is example code for using them a bit further down the page. Commented Aug 16, 2012 at 23:32
  • So something like: pool = multiprocessing.Pool(10) pool.map(func, 1000) What I don't get is what to make func? Am I calling program.py with subprocess.Popen, or is there a better way? Seems somehow redundant to import both subprocess and multiprocessing. Thanks. Commented Aug 17, 2012 at 0:00

4 Answers 4

27

I know you mentioned that the Pool.map approach doesn't make much sense to you. The map is just an easy way to give it a source of work, and a callable to apply to each of the items. The func for the map could be any entry point to do the actual work on the given arg.

If that doesn't seem right for you, I have a pretty detailed answer over here about using a Producer-Consumer pattern: https://stackoverflow.com/a/11196615/496445

Essentially, you create a Queue, and start N number of workers. Then you either feed the queue from the main thread, or create a Producer process that feeds the queue. The workers just keep taking work from the queue and there will never be more concurrent work happening than the number of processes you have started.

You also have the option of putting a limit on the queue, so that it blocks the producer when there is already too much outstanding work, if you need to put constraints also on the speed and resources that the producer consumes.

The work function that gets called can do anything you want. This can be a wrapper around some system command, or it can import your python lib and run the main routine. There are specific process management systems out there which let you set up configs to run your arbitrary executables under limited resources, but this is just a basic python approach to doing it.

Snippets from that other answer of mine:

Basic Pool:

from multiprocessing import Pool def do_work(val): # could instantiate some other library class, # call out to the file system, # or do something simple right here. return "FOO: %s" % val pool = Pool(4) work = get_work_args() results = pool.map(do_work, work) 

Using a process manager and producer

from multiprocessing import Process, Manager import time import itertools def do_work(in_queue, out_list): while True: item = in_queue.get() # exit signal if item == None: return # fake work time.sleep(.5) result = item out_list.append(result) if __name__ == "__main__": num_workers = 4 manager = Manager() results = manager.list() work = manager.Queue(num_workers) # start for workers pool = [] for i in xrange(num_workers): p = Process(target=do_work, args=(work, results)) p.start() pool.append(p) # produce data # this could also be started in a producer process # instead of blocking iters = itertools.chain(get_work_args(), (None,)*num_workers) for item in iters: work.put(item) for p in pool: p.join() print results 
Sign up to request clarification or add additional context in comments.

1 Comment

Very good example, I improved it by getting the number of CPUS like they explain in stackoverflow.com/questions/6905264/… and so I could dinamically set num_workers based on the CPUs of the machine.
3

You should use a process supervisor. One approach would be using the API provided by Circus to do that "programatically", the documentation site is now offline but I think its just a temporary problem, anyway, you can use the Circus to handle this. Another approach would be using the supervisord and setting the parameter numprocs of the process to the number of cores you have.

An example using Circus:

from circus import get_arbiter arbiter = get_arbiter("myprogram", numprocesses=3) try: arbiter.start() finally: arbiter.stop() 

Comments

2

Bash script rather than Python, but I use it often for simple parallel processing:

#!/usr/bin/env bash waitForNProcs() { nprocs=$(pgrep -f $procName | wc -l) while [ $nprocs -gt $MAXPROCS ]; do sleep $SLEEPTIME nprocs=$(pgrep -f $procName | wc -l) done } SLEEPTIME=3 MAXPROCS=10 procName=myPython.py for file in ./data/*.txt; do waitForNProcs ./$procName $file & done 

Or for very simple cases, another option is xargs where P sets the number of procs

find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 

Comments

1

While there are many answers about using multiprocessing.pool, there are not many code snippets on how to use multiprocessing.Process, which is indeed more beneficial when memory usage matters. starting 1000 processes will overload the CPU and kill the memory. If each process and its data pipelines are memory intensive, OS or Python itself will limit the number of parallel processes. I developed the below code to limit the simultaneous number of jobs submitted to the CPU in batches. The batch size can be scaled proportional to the number of CPU cores. In my windows PC, the number of jobs per batch can be efficient upto 4 times the CPU coures available.

import multiprocessing def func_to_be_multiprocessed(q,data): q.put(('s')) q = multiprocessing.Queue() worker = [] for p in range(number_of_jobs): worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \ args=(q,data)...)) num_cores = multiprocessing.cpu_count() Scaling_factor_batch_jobs = 3.0 num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs num_of_batches = number_of_jobs // num_jobs_per_batch for i_batch in range(num_of_batches): floor_job = i_batch * num_jobs_per_batch ceil_job = floor_job + num_jobs_per_batch for p in worker[floor_job : ceil_job]: worker.start() for p in worker[floor_job : ceil_job]: worker.join() for p in worker[ceil_job :]: worker.start() for p in worker[ceil_job :]: worker.join() for p in multiprocessing.active_children(): p.terminate() result = [] for p in worker: result.append(q.get()) 

The only problem is, if any of the job in any batch could not complete and leads to a hanging situation, rest of the batches of jobs will not be initiated. So, the function to be processed must have proper error handling routines.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.