23

My question is more or less like this, which is really an X-Y problem leading back to this. This is, however, not a duplicate, because my use case is slightly different and the linked threads don't answer my question.

I am porting a set of synchronous programs from Java to Python. These programs interact with an asynchronous library. In Java, I could block and wait for this library's asynchronous functions to return a value and then do things with that value.

Here's a code sample to illustrate the problem.

def do_work_sync_1(arg1, arg2, arg3): # won't even run because await has to be called from an async function value = await do_work_async(arg1, arg2, arg3) def do_work_sync_2(arg1, arg2, arg3): # throws "Loop already running" error because the async library referenced in do_work_async is already using my event loop event_loop = asyncio.get_event_loop() event_loop.run_until_complete(do_work_async(arg1, arg2, arg3)) def do_work_sync_3(arg1, arg2, arg3): # throws "got Future attached to a different loop" because the do_work_async refers back to the asynchronous library, which is stubbornly attached to my main loop thread_pool = ThreadPoolExecutor() future = thread_pool.submit(asyncio.run, do_work_async(arg1, arg2, arg3) result = future.result() def do_work_sync_4(arg1, arg2, arg3): # just hangs forever event_loop = asyncio.get_event_loop() future = asyncio.run_coroutine_threadsafe(do_work_async(arg1, arg2, arg3), event_loop) return_value = future.result() async def do_work_async(arg1, arg2, arg3): value_1 = await async_lib.do_something(arg1) value_2 = await async_lib.do_something_else(arg2, arg3) return value_1 + value_2 

Python appears to be trying very hard to keep me from blocking anything, anywhere. await can only be used from async def functions, which in their turn must be awaited. There doesn't seem to be a built-in way to keep async def/await from spreading through my code like a virus.

Tasks and Futures don't have any built-in blocking or wait_until_complete mechanisms unless I want to loop on Task.done(), which seems really bad.

I tried asyncio.get_event_loop().run_until_complete(), but that produces an error: This event loop is already running. Apparently I'm not supposed to do that for anything except main().

The second linked question above suggests using a separate thread and wrapping the async function in that. I tested this with a few simple functions and it seems to work as a general concept. The problem here is that my asynchronous library keeps a reference to the main thread's event loop and throws an error when I try to refer to it from the new thread: got Future <Future pending> attached to a different loop.

I considered moving all references to the asynchronous library into a separate thread, but I realized that I still can't block in the new thread, and I'd have to create a third thread for blocking calls, which would bring me back to the Future attached to a different loop error.

I'm pretty much out of ideas here. Is there a way to block and wait for an async function to return, or am I really being forced to convert my entire program to async/await? (If it's the latter, an explanation would be nice. I don't get it.)

6
  • 2
    how many concurrent blocking calls are you expecting? if it's less than like 10000 or so, and you're not as concerned about memory, you can just run blocking calls in thread pools. without seeing a clear example of where you're really running into trouble, it's tough to say what you should do. Commented Jul 27, 2019 at 18:13
  • 1
    As an aside, "There doesn't seem to be a built-in way to keep async def/await from spreading through my code like a virus" is my exact concern with Python's implementation of async, and the reason I haven't adopted it in my own code bases. Commented Jul 27, 2019 at 18:17
  • @acushner Not many. I'm starting with a single one as a testbed. I did try to use a concurrent.futures.ThreadPoolExecutor() to spawn a new thread, but since my async library keeps a reference to the main thread's event loop, anything I do within the new thread's new event loop bombs out. Commented Jul 27, 2019 at 18:29
  • That's the got Future <Future pending> attached to a different loop bit above, if I wasn't clear. Not sure how to make it less ambiguous without posting code, and the code is a mess after 8 hours of trying every trick I could find to get this working. The basic idea I tried is to run thread_pool.submit(wrapper_function, original_function, *args, **kwargs) where wrapper_function calls asyncio.new_event_loop (get_event_loop fails for some reason) and event_loop.run_until_complete(original_function(*args, **kwargs)). Commented Jul 27, 2019 at 18:33
  • yeah, combining the 2 is not easy, in the least. the concept is you either want single-threaded cooperative multi-tasking (async/await) OR you want preemptive multi-tasking (threads/processes). occasionally you want to run an event loop on another thread, which is doable using calls like run_coroutine_threadsafe, but it becomes a lot to deal with. Commented Jul 27, 2019 at 19:04

1 Answer 1

21

It took me some time, but finally I've found the actual question 😇

Is there a way to block and wait for an async function to return, or am I really being forced to convert my entire program to async/await?

There is a high-level function asyncio.run(). It does three things:

  1. create new event loop
  2. run your async function in that event loop
  3. wait for any unfinished tasks and close the loop

Its source code is here: https://github.com/python/cpython/blob/3221a63c69268a9362802371a616f49d522a5c4f/Lib/asyncio/runners.py#L8 You see it uses loop.run_until_complete(main) under the hood.

If you are writing completely asynchronous code, you are supposed to call asyncio.run() somewhere at the end of your main() function, I guess. But that doesn't have to be the case. You can run it wherever you want, as many times you want. Caveats:

  • in given thread, at one time, there can be only one running event loop

  • do not run it from async def function, because, obviously, you have already one event loop running, so you can just call that function using await instead

Example:

import asyncio async def something_async(): print('something_async start') await asyncio.sleep(1) print('something_async done') for i in range(3): asyncio.run(something_async()) 

You can have multiple threads with their own event loop:

import asyncio import threading async def something_async(): print('something_async start in thread:', threading.current_thread()) await asyncio.sleep(1) print('something_async done in thread:', threading.current_thread()) def main(): t1 = threading.Thread(target=asyncio.run, args=(something_async(), )) t2 = threading.Thread(target=asyncio.run, args=(something_async(), )) t1.start() t2.start() t1.join() t2.join() if __name__ == '__main__': main() 

If you encounter this error: Future attached to a different loop That may mean two tings:

  1. you are using resources tied to another event loop than you are running right now

  2. you have created some resource before starting an event loop - it uses a "default event loop" in that case - but when you run asyncio.run(), you start a different loop. I've encountered this before: asyncio.Semaphore RuntimeError: Task got Future attached to a different loop

You need to use Python version at least 3.5.3 - explanation here.

Sign up to request clarification or add additional context in comments.

6 Comments

The big problem I'm always going to face is the presence of an existing event loop. My asynchronous lib latches onto the main event loop and keeps a reference to it, and it will throw Future attached to a different loop errors if I try to reference it from another Thread. I could move all references to the async lib into a second thread, but then I'd need a third thread to call asyncio.run from for my coroutines, and I will still get Future attached to a different loop errors.
Somewhere in the question details is the general idea that this async library (which I can't control) has monopolized my event loop and shut down the idea of multithreading because of the Future attached to a different loop errors. I am gradually becoming convinced that my only options are asyncio.create_task and looping until it's done or accepting my fate and letting async def/await infect my entire codebase.
OK, maybe it would be better to have only one loop for the whole time and reuse it. Like I wrote, asyncio.run() creates a new loop every time, so instead you can use loop,run_until_complete(). Example: gist.github.com/messa/36988d7311f91e71f8b65f521d6b053d I see above you have already tried this? If the error is "loop already running"t hen you should simply await instead of starting the loop.
Maybe I am starting to understand :) You probably have sync ("blocking") and async code and want it run side by side somehow. Can you run the blocking code in separate thread, and when it needs to run async code, schedule it "in the async thread" using loop.call_soon_threadsafe() docs.python.org/3/library/…?
@computerfreaker Do you have some code sample please? :)
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.