I have CPU bound application that I wish to speedup using multiprocessing+threading instead of using the pure threaded version. I wrote a simple application to check the performance of my approach and was surprised to see that the multiprocessing and multiprocessing+threaded versions were performing poorer than both the threaded and serial versions.
In my application I have a work queue that stores all the work. The threads then pop off one work item at a time and then process it either directly (threaded version) or by passing it into a process. The thread then needs to wait for the result to arrive before proceeding with the next iteration. The reason I need to pop off one work item at a time is because the work is dynamic(not the case in the prototype application code pasted below) and I cannot pre-partition the work and hand it off to each thread/process during creation.
I would like to know what I am doing wrong and how I could speedup my application.
Here is the execution time when I ran on a 16-core machine:
Version : 2.7.2 Compiler : GCC 4.1.2 20070925 (Red Hat 4.1.2-33) Platform : Linux-2.6.24-perfctr-x86_64-with-fedora-8-Werewolf Processor : x86_64 Num Threads/Processes: 8 ; Num Items: 16000 mainMultiprocessAndThreaded exec time: 3505.97214699 ms mainPureMultiprocessing exec time: 2241.89805984 ms mainPureThreaded exec time: 309.767007828 ms mainSerial exec time: 52.3412227631 ms Terminating and here is the code I used:
import threading import multiprocessing import time import platform class ConcurrentQueue: def __init__(self): self.data = [] self.lock = threading.Lock() def push(self, item): self.lock.acquire() try: self.data.append(item) finally: self.lock.release() return def pop(self): self.lock.acquire() result = None try: length = len(self.data) if length > 0: result = self.data.pop() finally: self.lock.release() return result def isEmpty(self, item): self.lock.acquire() result = 0 try: result = len(self.data) finally: self.lock.release() return result != 0 def timeFunc(passedFunc): def wrapperFunc(*arg): startTime = time.time() result = passedFunc(*arg) endTime = time.time() elapsedTime = (endTime - startTime) * 1000 print passedFunc.__name__, 'exec time:', elapsedTime, " ms" return result return wrapperFunc def checkPrime(candidate): # dummy process to do some work for k in xrange(3, candidate, 2): if candidate % k: return False return True def fillQueueWithWork(itemQueue, numItems): for item in xrange(numItems, 2 * numItems): itemQueue.push(item) @timeFunc def mainSerial(numItems): jobQueue = ConcurrentQueue() fillQueueWithWork(jobQueue, numItems) while True: dataItem = jobQueue.pop() if dataItem is None: break # do work with dataItem result = checkPrime(dataItem) return # Start: Implement a pure threaded version def pureThreadFunc(jobQueue): curThread = threading.currentThread() while True: dataItem = jobQueue.pop() if dataItem is None: break # do work with dataItem result = checkPrime(dataItem) return @timeFunc def mainPureThreaded(numThreads, numItems): jobQueue = ConcurrentQueue() fillQueueWithWork(jobQueue, numItems) workers = [] for index in xrange(numThreads): loopName = "Thread-" + str(index) loopThread = threading.Thread(target=pureThreadFunc, name=loopName, args=(jobQueue, )) loopThread.start() workers.append(loopThread) for worker in workers: worker.join() return # End: Implement a pure threaded version # Start: Implement a pure multiprocessing version def pureMultiprocessingFunc(jobQueue, resultQueue): while True: dataItem = jobQueue.get() if dataItem is None: break # do work with dataItem result = checkPrime(dataItem) resultQueue.put_nowait(result) return @timeFunc def mainPureMultiprocessing(numProcesses, numItems): jobQueue = ConcurrentQueue() fillQueueWithWork(jobQueue, numItems) workers = [] queueSize = (numItems/numProcesses) + 10 for index in xrange(numProcesses): jobs = multiprocessing.Queue(queueSize) results = multiprocessing.Queue(queueSize) loopProcess = multiprocessing.Process(target=pureMultiprocessingFunc, args=(jobs, results, )) loopProcess.start() workers.append((loopProcess, jobs, results)) processIndex = 0 while True: dataItem = jobQueue.pop() if dataItem is None: break workers[processIndex][1].put_nowait(dataItem) processIndex += 1 if numProcesses == processIndex: processIndex = 0 for worker in workers: worker[1].put_nowait(None) for worker in workers: worker[0].join() return # End: Implement a pure multiprocessing version # Start: Implement a threaded+multiprocessing version def mpFunc(processName, jobQueue, resultQueue): while True: dataItem = jobQueue.get() if dataItem is None: break result = checkPrime(dataItem) resultQueue.put_nowait(result) return def mpThreadFunc(jobQueue): curThread = threading.currentThread() threadName = curThread.getName() jobs = multiprocessing.Queue() results = multiprocessing.Queue() myProcessName = "Process-" + threadName myProcess = multiprocessing.Process(target=mpFunc, args=(myProcessName, jobs, results, )) myProcess.start() while True: dataItem = jobQueue.pop() # put item to allow process to start jobs.put_nowait(dataItem) # terminate loop if work queue is empty if dataItem is None: break # wait to get result from process result = results.get() # do something with result return @timeFunc def mainMultiprocessAndThreaded(numThreads, numItems): jobQueue = ConcurrentQueue() fillQueueWithWork(jobQueue, numItems) workers = [] for index in xrange(numThreads): loopName = "Thread-" + str(index) loopThread = threading.Thread(target=mpThreadFunc, name=loopName, args=(jobQueue, )) loopThread.start() workers.append(loopThread) for worker in workers: worker.join() return # End: Implement a threaded+multiprocessing version if __name__ == '__main__': print 'Version :', platform.python_version() print 'Compiler :', platform.python_compiler() print 'Platform :', platform.platform() print 'Processor :', platform.processor() numThreads = 8 numItems = 16000 #200000 print "Num Threads/Processes:", numThreads, "; Num Items:", numItems mainMultiprocessAndThreaded(numThreads, numItems) mainPureMultiprocessing(numThreads, numItems) mainPureThreaded(numThreads, numItems) mainSerial(numItems) print "Terminating" Edit: One of my guesses for the slowness is that the Queue.put() are busy waiting instead of relinquishing the GIL. If so, any suggestions on an alternate data structure I should be using?