I would like to start a large number of HTTP requests and collect their results, once all of them have returned. Sending the requests in a non-blocking fashion is possible with asyncio, but I have problems collecting their results.
I'm aware of solutions such as aiohttp that are made for this specific problem. But the HTTP requests are just an example, my question is how to use asyncio correctly.
On the server-side, I have flask which answers every request to localhost/ with "Hello World!", but it waits 0.1 seconds before answering. In all my examples, I'm sending 10 requests. A synchronous code should take about 1 second, an asynchronous version could do it in 0.1 seconds.
On the client-side I want to spin up many requests at the same time and collect their results. I'm trying to do this in three different ways. Since asyncio needs an executor to work around blocking code, all of the approaches call loop.run_in_executor.
This code is shared between them:
import requests from time import perf_counter import asyncio loop = asyncio.get_event_loop() async def request_async(): r = requests.get("http://127.0.0.1:5000/") return r.text def request_sync(): r = requests.get("http://127.0.0.1:5000/") return r.text Approach 1:
Use asyncio.gather() on a list of tasks and then run_until_complete. After reading Asyncio.gather vs asyncio.wait, it seemed like gather would wait on the results. But it doesn't. So this code returns instantly, without waiting for the requests to finish. If I use a blocking function here, this works. Why can't I use an async function ?
# approach 1 start = perf_counter() tasks = [] for i in range(10): tasks.append(loop.run_in_executor(None, request_async)) # <---- using async function ! gathered_tasks = asyncio.gather(*tasks) results = loop.run_until_complete(gathered_tasks) stop = perf_counter() print(f"finished {stop - start}") # 0.003 # approach 1(B) start = perf_counter() tasks = [] for i in range(10): tasks.append(loop.run_in_executor(None, request_sync)) # <---- using sync function gathered_tasks = asyncio.gather(*tasks) results = loop.run_until_complete(gathered_tasks) stop = perf_counter() print(f"finished {stop - start}") # 0.112 Python even warns me that coroutine "request_async" was never awaited. At this point, I have a working solution: Using a normal (not async) function in an executor. But I would like to have a solution that works with async function definitions. Because I would like to use await inside them (in this simple example that is not necessary, but if I move more code to asyncio, I'm sure it will become important).
Approach 2:
Python warns me that my coroutines are never awaited. So let's await them. Approach 2 wraps all the code into an outer async function and awaits the result from the gathering. Same problem, also returns instantly (also same warning):
# approach 2 async def main(): tasks = [] for i in range(10): tasks.append(loop.run_in_executor(None, request_async)) gathered_tasks = asyncio.gather(*tasks) return await gathered_tasks # <-------- here I'm waiting on the coroutine start = perf_counter() results = loop.run_until_complete(main()) stop = perf_counter() print(f"finished {stop - start}") # 0.0036 This really confused me. I'm waiting on the result of gather. Intuitively that should be propagated to the coroutines that I'm gathering. But python still complains that my coroutine is never awaited.
I read some more and found: How could I use requests in asyncio?
This is pretty much exactly my example: Combining requests and asyncio. Which brings me to approach 3:
Approach 3:
Same structure as approach 2, but wait on each task that was given to run_in_executor() individually (surely this counts as awaiting the coroutine):
# approach 3: # wrapping executor in coroutine # awaiting every task individually async def main(): tasks = [] for i in range(10): task = loop.run_in_executor(None, request_async) tasks.append(task) responses = [] for task in tasks: response = await task responses.append(response) return responses start = perf_counter() results = loop.run_until_complete(main()) stop = perf_counter() print(f"finished {stop - start}") # 0.004578 My question is: I want to have blocking code in my coroutines and run them in parallel with an executor. How do I get their results ?