2

I have the following method concurrent_api_call_and_processing() that takes below parameters:

  • api_call: is an HTTP request to an external WebSite that retrieve and XLM document
  • lst: is a list of integers(ids) needed by api_call
  • callback_processing: Is a local method that just parses each XLM request

I do around 500 HTTP requests , one for each id in lst, using api_call() Then each response if processed with the local method callback_processing() that parse the XLM and returns a tuple

def concurrent_api_call_and_processing(api_call=None, callback_processing=None, lst=None, workers=5): """ :param api_call: Function that will be called concurrently. An API call to API_Provider for each entry. : param lst: List of finding's ids needed by the API function to call API_Provider endpoint. :param callback_processing: Function that will be called after we get the response from the above API call. : param workers: Number of concurrent threads that will be used. :return: array of tuples containing the details of each particular finding. """ output = Queue() with ThreadPoolExecutor(max_workers=workers) as executor: future_to_f_detail = {executor.submit(api_call, id): id for id in lst} for future in as_completed(future_to_f_detail): try: find_details = future.result() except Exception as exc: print(f"Finding {id} generated and exception: {exc}") else: f_det = callback_processing(find_details) output.put(f_det) return output 

I started to note some random issues, (not graceful termination) while using this method.

As I was using an array instead of a queue (output=[]), but was in doubt if I could have a race condition, I decided to refactor the code and start using a Queue (output=Queue)

My question is:

  • Is my code, as it is now, free of race condition?

NOTE: I wanted to note that following Raymond Hettinger, Keynote on Concurrency, PyBay 2017, I added fuzz() sleep methods for testing but could not identify if indeed I had a race condition or not.

3
  • 1
    except Exception as exc: is bad practice, it's best to be as specific as possible. Commented Feb 4, 2020 at 0:58
  • Also, can you provide a little more information on the program? For one, is the amount of processing for each response consistent? Commented Feb 4, 2020 at 1:02
  • api_call: is an HTTP request to an external API on the internet that retrieves and XLM document. and callback_processing is a local function that parses the XML information and returns a tuple. I do around 400 calls Commented Feb 4, 2020 at 1:07

2 Answers 2

1

I don't think there is suffient information in able to determine this.

Consider what happens if you pass in an api_call function that increments a global variable:

count = 0 def api_call_fn(): global count count += 1 

When this is concurrently executed it would have a race condition incrementing count variable.

The same goes for the callback_processing function.


In order to audit if this code is race condition free we would have to see the definition of both of those functions :)

Sign up to request clarification or add additional context in comments.

1 Comment

Just added this information to clarify my question: - api_call: is an HTTP request to an external WebSite that retrieve and XLM document - lst: is a list of integers(ids) needed by api_call - callback_processing: Is a local method that just parses each XLM request
1

Under the above conditions, there won't be a race condition on that code. As per concurrent.futures docs here what happens is this:

  1. executor.submit(): Returns a Future object representing the execution of the callable.
  2. as_completed(future_to_f_detail): Returns an iterator over the Future instances given by future_to_f_detail that yields futures as they complete (finished or canceled futures).

So indeed the for loop is consuming the iterator and returning one by one every future that is yield by as_completed()

So unless the call_back() or the function we called introduce some kind of async functionality ( as the example described by @dm03514 above), we are just working synchronously after the for loop

 counter = 0 with ThreadPoolExecutor(max_workers=workers) as executor: future_to_f_detail = {executor.submit(api_call, id): id for id in lst} for future in as_completed(future_to_f_detail): print(f"Entering the for loop for {counter+1} time") counter +=1 try: find_details = future.result() except Exception as exc: print(f"Finding {id} generated and exception: {exc}") else: f_det = callback_processing(find_details) output.append(f_det) return output 

If we have an array of 500 ids and we do 500 calls and all calls yield a future, we will print the message in the print 500 time, once each time before entering the try loop.

We are not forced to use a Queue to avoid a race condition in this case. Futures creates a deferred execution when we use submit we get back a future to be used later

Some important notes and recommendations:

  1. Ramalho, Luciano, Fluent Python , chapter 17th Concurrency with Future.
  2. Beazley, David: Python Cookbook Chapter 12 Concurrency. Page 516 Defining and Actor Task

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.