12

I'm new in python threading and I'm experimenting this: When I run something in threads (whenever I print outputs), it never seems to be running in parallel. Also, my functions take the same time that before using the library concurrent.futures (ThreadPoolExecutor). I have to calculate the gains of some attributes over a dataset (I cannot use libraries). Since I have about 1024 attributes and the function was taking about a minute to execute (and I have to use it in a for iteration) I dicided to split the array of attributes into 10 (just as an example) and run the separete function gain(attribute) separetly for each sub array. So I did the following (avoiding some extra unnecessary code):

def calculate_gains(self): splited_attributes = np.array_split(self.attributes, 10) result = {} for atts in splited_attributes: with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(self.calculate_gains_helper, atts) return_value = future.result() self.gains = {**self.gains, **return_value} 

Here's the calculate_gains_helper:

def calculate_gains_helper(self, attributes): inter_result = {} for attribute in attributes: inter_result[attribute] = self.gain(attribute) return inter_result 

Am I doing something wrong? I read some other older posts but I couldn't get any info. Thanks a lot for any help!

3
  • You'll only get parallelism to the extent that you stay within numpy operations which release the python Global Interpreter Lock. Is there anyway these calculations can be done within numpy intead of breaking things out into python for loops? Commented Apr 10, 2020 at 23:16
  • for atts in splited_attributes: you are creating a thread executor, submitting a single work item and then waiting for it to complete for each atts in the for loop. That is way more expensive than just doing the calcuation single threaded. You should create the executor once and throw all of the jobs at it. Commented Apr 10, 2020 at 23:18
  • Makes sense! but how do I do that? How can I manage all the returns out of the for? Commented Apr 10, 2020 at 23:38

3 Answers 3

13

Python threads do not run in parallel (at least in CPython implementation) because of the GIL. Use processes and ProcessPoolExecutor to really have parallelism

with concurrent.futures.ProcessPoolExecutor() as executor: ... 
Sign up to request clarification or add additional context in comments.

3 Comments

I tried changing to ProcessPoolExecutor but the time now is greater.
@LeandroD. Are you on windows? The process pool executor may end up copying the array to the child, wasting more time than can be gained in the subprocess.
No, I'm using Mac OS
3

You submit and then wait for each work item serially so all the threads do is slow everything down. I can't guarantee this will speed things up much because you are still dealing with the python GIL that keeps python level stuff from working in parallel, but here goes.

I've created a thread pool and pushed everything possible into the worker, including the slicing of self.attributes.

def calculate_gains(self): with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: result_list = executor.map(self.calculate_gains_helper, ((i, i+10) for i in range(0, len(self.attributes), 10))) for return_value in result_list: self.gains = {**self.gains, **return_value} def calculate_gains_helper(self, start_end): start, end = start_end inter_result = {} for attribute in self.attributes[start:end]: inter_result[attribute] = self.gain(attribute) return inter_result 

3 Comments

First of all thanks for your soon reply! I'm trying to figure out what you meant in line 3 and 4 since that does not compile :(. I'm using Python 3.7.
Oops, missing some parens. The idea is to just pass the wanted range to the worker and let it do the split in parallel with other threads.
I think this worked but execution time was not improved :(. If someone has any advice I would love to hear!.
2

I had this same trouble and fixed by moving the iteration to within the context of the ThreadPoolExecutor, or else, you'll have to wait for the context to finish and start another one.

Here is a probably fix for your code:

def calculate_gains(self): splited_attributes = np.array_split(self.attributes, 10) result = {} with concurrent.futures.ThreadPoolExecutor() as executor: for atts in splited_attributes: future = executor.submit(self.calculate_gains_helper, atts) return_value = future.result() self.gains = {**self.gains, **return_value} 

To demonstrate better what I mean here is a sample code:

Below is a non working code. Threads will execute synchronoulsly...

from concurrent.futures import ThreadPoolExecutor, as_completed from time import sleep def t(reference): i = 0 for i in range(10): print(f"{reference} :" + str(i)) i+=1 sleep(1) futures = [] refs = ["a", "b", "c"] for i in refs: with ThreadPoolExecutor(max_workers=3) as executor: futures.append(executor.submit(t, i)) for future in as_completed(futures): print(future.result()) 

Here is the fixed code:

from concurrent.futures import ThreadPoolExecutor, as_completed from time import sleep def t(reference): i = 0 for i in range(10): print(f"{reference} :" + str(i)) i+=1 sleep(1) futures = [] refs = ["a", "b", "c"] with ThreadPoolExecutor(max_workers=3) as executor: #swapped for i in refs: #swapped futures.append(executor.submit(t, i)) for future in as_completed(futures): print(future.result()) 

You can try this on your terminal and check out the outputs.

1 Comment

This answer should be marked as the right answer! Loops should move inside with concurrent.futures.ThreadPoolExecutor() as executor: to see the difference.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.