2

I have CPU bound application that I wish to speedup using multiprocessing+threading instead of using the pure threaded version. I wrote a simple application to check the performance of my approach and was surprised to see that the multiprocessing and multiprocessing+threaded versions were performing poorer than both the threaded and serial versions.

In my application I have a work queue that stores all the work. The threads then pop off one work item at a time and then process it either directly (threaded version) or by passing it into a process. The thread then needs to wait for the result to arrive before proceeding with the next iteration. The reason I need to pop off one work item at a time is because the work is dynamic(not the case in the prototype application code pasted below) and I cannot pre-partition the work and hand it off to each thread/process during creation.

I would like to know what I am doing wrong and how I could speedup my application.

Here is the execution time when I ran on a 16-core machine:

Version : 2.7.2 Compiler : GCC 4.1.2 20070925 (Red Hat 4.1.2-33) Platform : Linux-2.6.24-perfctr-x86_64-with-fedora-8-Werewolf Processor : x86_64 Num Threads/Processes: 8 ; Num Items: 16000 mainMultiprocessAndThreaded exec time: 3505.97214699 ms mainPureMultiprocessing exec time: 2241.89805984 ms mainPureThreaded exec time: 309.767007828 ms mainSerial exec time: 52.3412227631 ms Terminating 

and here is the code I used:

import threading import multiprocessing import time import platform class ConcurrentQueue: def __init__(self): self.data = [] self.lock = threading.Lock() def push(self, item): self.lock.acquire() try: self.data.append(item) finally: self.lock.release() return def pop(self): self.lock.acquire() result = None try: length = len(self.data) if length > 0: result = self.data.pop() finally: self.lock.release() return result def isEmpty(self, item): self.lock.acquire() result = 0 try: result = len(self.data) finally: self.lock.release() return result != 0 def timeFunc(passedFunc): def wrapperFunc(*arg): startTime = time.time() result = passedFunc(*arg) endTime = time.time() elapsedTime = (endTime - startTime) * 1000 print passedFunc.__name__, 'exec time:', elapsedTime, " ms" return result return wrapperFunc def checkPrime(candidate): # dummy process to do some work for k in xrange(3, candidate, 2): if candidate % k: return False return True def fillQueueWithWork(itemQueue, numItems): for item in xrange(numItems, 2 * numItems): itemQueue.push(item) @timeFunc def mainSerial(numItems): jobQueue = ConcurrentQueue() fillQueueWithWork(jobQueue, numItems) while True: dataItem = jobQueue.pop() if dataItem is None: break # do work with dataItem result = checkPrime(dataItem) return # Start: Implement a pure threaded version def pureThreadFunc(jobQueue): curThread = threading.currentThread() while True: dataItem = jobQueue.pop() if dataItem is None: break # do work with dataItem result = checkPrime(dataItem) return @timeFunc def mainPureThreaded(numThreads, numItems): jobQueue = ConcurrentQueue() fillQueueWithWork(jobQueue, numItems) workers = [] for index in xrange(numThreads): loopName = "Thread-" + str(index) loopThread = threading.Thread(target=pureThreadFunc, name=loopName, args=(jobQueue, )) loopThread.start() workers.append(loopThread) for worker in workers: worker.join() return # End: Implement a pure threaded version # Start: Implement a pure multiprocessing version def pureMultiprocessingFunc(jobQueue, resultQueue): while True: dataItem = jobQueue.get() if dataItem is None: break # do work with dataItem result = checkPrime(dataItem) resultQueue.put_nowait(result) return @timeFunc def mainPureMultiprocessing(numProcesses, numItems): jobQueue = ConcurrentQueue() fillQueueWithWork(jobQueue, numItems) workers = [] queueSize = (numItems/numProcesses) + 10 for index in xrange(numProcesses): jobs = multiprocessing.Queue(queueSize) results = multiprocessing.Queue(queueSize) loopProcess = multiprocessing.Process(target=pureMultiprocessingFunc, args=(jobs, results, )) loopProcess.start() workers.append((loopProcess, jobs, results)) processIndex = 0 while True: dataItem = jobQueue.pop() if dataItem is None: break workers[processIndex][1].put_nowait(dataItem) processIndex += 1 if numProcesses == processIndex: processIndex = 0 for worker in workers: worker[1].put_nowait(None) for worker in workers: worker[0].join() return # End: Implement a pure multiprocessing version # Start: Implement a threaded+multiprocessing version def mpFunc(processName, jobQueue, resultQueue): while True: dataItem = jobQueue.get() if dataItem is None: break result = checkPrime(dataItem) resultQueue.put_nowait(result) return def mpThreadFunc(jobQueue): curThread = threading.currentThread() threadName = curThread.getName() jobs = multiprocessing.Queue() results = multiprocessing.Queue() myProcessName = "Process-" + threadName myProcess = multiprocessing.Process(target=mpFunc, args=(myProcessName, jobs, results, )) myProcess.start() while True: dataItem = jobQueue.pop() # put item to allow process to start jobs.put_nowait(dataItem) # terminate loop if work queue is empty if dataItem is None: break # wait to get result from process result = results.get() # do something with result return @timeFunc def mainMultiprocessAndThreaded(numThreads, numItems): jobQueue = ConcurrentQueue() fillQueueWithWork(jobQueue, numItems) workers = [] for index in xrange(numThreads): loopName = "Thread-" + str(index) loopThread = threading.Thread(target=mpThreadFunc, name=loopName, args=(jobQueue, )) loopThread.start() workers.append(loopThread) for worker in workers: worker.join() return # End: Implement a threaded+multiprocessing version if __name__ == '__main__': print 'Version :', platform.python_version() print 'Compiler :', platform.python_compiler() print 'Platform :', platform.platform() print 'Processor :', platform.processor() numThreads = 8 numItems = 16000 #200000 print "Num Threads/Processes:", numThreads, "; Num Items:", numItems mainMultiprocessAndThreaded(numThreads, numItems) mainPureMultiprocessing(numThreads, numItems) mainPureThreaded(numThreads, numItems) mainSerial(numItems) print "Terminating" 

Edit: One of my guesses for the slowness is that the Queue.put() are busy waiting instead of relinquishing the GIL. If so, any suggestions on an alternate data structure I should be using?

4
  • as far as i know, python multithreading is not for speeding up execution. ( i might be totally wrong, so please correct me). understanding GIL Commented Jan 22, 2012 at 6:00
  • Similar research Python – parallelizing CPU-bound tasks with multiprocessing Commented Jan 22, 2012 at 6:51
  • @reclosedev the difference I see with the example there is all the input for the target is provided in advance instead of repeated calls to queue.get(). Similarly there are only single calls to result.get() per process. Commented Jan 22, 2012 at 7:17
  • @c-ram Yes, I expected the multiprocessing variants to achieve some speedup, but even those are slower. Commented Jan 22, 2012 at 7:17

2 Answers 2

5

It looks like the computational cost of each item isn't outweighing the overhead associated with dispatching the work to another thread/process. For example, here are the results I'm seeing when running your test application on my machine (very similar to your results):

Version : 2.7.1 Compiler : MSC v.1500 32 bit (Intel) Platform : Windows-7-6.1.7601-SP1 Processor : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel Num Threads/Processes: 8 ; Num Items: 16000 mainMultiprocessAndThreaded exec time: 1134.00006294 ms mainPureMultiprocessing exec time: 917.000055313 ms mainPureThreaded exec time: 111.000061035 ms mainSerial exec time: 41.0001277924 ms Terminating 

If I modify the work that's being performed to something that's more computationally expensive, for example:

def checkPrime(candidate): i = 0; for k in xrange(1,10000): i += k return i < 5000 

Then I see results that are more in line with what I think you would expect:

Version : 2.7.1 Compiler : MSC v.1500 32 bit (Intel) Platform : Windows-7-6.1.7601-SP1 Processor : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel Num Threads/Processes: 8 ; Num Items: 16000 mainMultiprocessAndThreaded exec time: 2190.99998474 ms mainPureMultiprocessing exec time: 2154.99997139 ms mainPureThreaded exec time: 16170.0000763 ms mainSerial exec time: 9143.00012589 ms Terminating 

You may also want to take a look at multiprocessing.Pool. It provides a similar model to what you're describing (multiple worker processes pulling jobs from a common queue). For your example, an implementation may look something like:

@timeFunc def mainPool(numThreads, numItems): jobQueue = ConcurrentQueue() fillQueueWithWork(jobQueue, numItems) pool = multiprocessing.Pool(processes=numThreads) results = [] while True: dataItem = jobQueue.pop() if dataItem == None: break results.append(pool.apply_async(checkPrime, dataItem)) pool.close() pool.join() 

On my machine, with the alternative checkPrime implementation, I'm seeing a result of:

Version : 2.7.1 Compiler : MSC v.1500 32 bit (Intel) Platform : Windows-7-6.1.7601-SP1 Processor : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel Num Threads/Processes: 8 ; Num Items: 1600 mainPool exec time: 1530.99989891 ms Terminating 

Since the multiprocessing.Pool already provides safe access for inserting work, you likely could eliminate your ConcurrentQueue and insert your dynamic work directly to the Pool.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks a lot for the lack of computational work catch, I was able to reproduce your results. I will definitely have a look at the Pool example.
2

It seems that your function is not computationally intensive enough to outweigh the overheads of multiprocessing. (Note that in Python, MultiThreading does not increase your computational resources due to GIL).

Your function (checkPrime) is not actually checking for primality, rather it is returning very quickly, replacing it with a simple (and naive) prime checker, the result is as expected.

However, look at Use Python pool.map to have multiple processes perform operations on a list to see an easy use of multiprocessing. Note that there are builtin types to perform the task of your Queue, such as the Queue, See http://docs.python.org/library/multiprocessing.html#multiprocessing-managers

def checkPrime(candidate): # dummy process to do some work for k in xrange(3, candidate): if not candidate % k: return False return True 

and an example 'speedy' implementation:

@timeFunc def speedy(numThreads,numItems): pool = multiprocessing.Pool(numThreads) #note the default will use the optimal number of workers for i in xrange(numItems, 2 * numItems): pool.apply_async(checkPrime,i) pool.close() pool.join() 

Which is nearly twice as fast!

wdolphin@Cory-linuxlaptop:~$ python test.py Version : 2.6.6 Compiler : GCC 4.4.5 Platform : Linux-2.6.35-32-generic-x86_64-with-Ubuntu-10.10-maverick Processor : Num Threads/Processes: 8 ; Num Items: 16000 mainSerial exec time: 5555.76992035 ms mainMultiprocessAndThreaded exec time: 4721.43602371 ms mainPureMultiprocessing exec time: 4440.83094597 ms mainPureThreaded exec time: 10829.3449879 ms speedy exec time: 1898.72503281 ms 

1 Comment

Thanks a lot for the catch. Your answer is similar to the one by @DRH. Unfortunaltely I can accept only one answer :(

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.