9

As a followup to my previous question about calling an async function from a synchronous one, I've discovered asyncio.run_coroutine_threadsafe.

On paper, this looks ideal. Based on the comments in this StackOverflow question, this looks ideal. I can create a new Thread, get a reference to my original event loop, and schedule the async function to run inside the original event loop while only blocking the new Thread.

class _AsyncBridge: def call_async_method(self, function, *args, **kwargs): print(f"call_async_method {threading.get_ident()}") event_loop = asyncio.get_event_loop() thread_pool = ThreadPoolExecutor() return thread_pool.submit(asyncio.run, self._async_wrapper(event_loop, function, *args, **kwargs)).result() async def _async_wrapper(self, event_loop, function, *args, **kwargs): print(f"async_wrapper {threading.get_ident()}") future = asyncio.run_coroutine_threadsafe(function(*args, **kwargs), event_loop) return future.result() 

This doesn't error, but it doesn't ever return, either. The Futures just hang and the async call is never hit. It doesn't seem to matter whether I use a Future in call_async_method, _async_wrapper, or both; wherever I use a Future, it hangs.

I experimented with putting the run_coroutine_threadsafe call directly in my main event loop:

event_loop = asyncio.get_event_loop() future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop) return_value = future.result() 

Here too, the Future hangs.

I tried using the LoopExecutor class defined here, which seems like the exact answer to my needs.

event_loop = asyncio.get_event_loop() loop_executor = LoopExecutor(event_loop) future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3) return_value = future.result() 

There too, the returned Future hangs.

I toyed with the idea that I was blocking my original event loop and therefore the scheduled task would never run, so I made a new event loop:

event_loop = asyncio.get_event_loop() new_event_loop = asyncio.new_event_loop() print(event_loop == new_event_loop) # sanity check to make sure the new loop is actually different from the existing one - prints False as expected loop_executor = LoopExecutor(new_event_loop) future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3) return_value = future.result() return return_value 

Still hanging at future.result() and I don't understand why.

What's wrong with asyncio.run_coroutine_threadsafe/the way I'm using it?

2 Answers 2

5

I think there are two problems. First one is that run_coroutine_threadsafe only submit the coroutine but not really run it.

So

event_loop = asyncio.get_event_loop() future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop) return_value = future.result() 

doesn't work as you've never run this loop.

To make it work, theoretically, you can just use asyncio.run(future), but actually, you cannot, maybe it is because that it is submitted by run_coroutine_threadsafe. The following will work:

import asyncio async def stop(): await asyncio.sleep(3) event_loop = asyncio.get_event_loop() coro = asyncio.sleep(1, result=3) future = asyncio.run_coroutine_threadsafe(coro, event_loop) event_loop.run_until_complete(stop()) print(future.result()) 

The second problem is, I think you have noticed that your structure is somehow reversed. You should run the event loop in the separated thread but submit the task from the main thread. If you submit it in the separated thread, you still need to run the event loop in the main thread to actually execute it. Mostly I would suggest just create another event loop in the separated thread.

Sign up to request clarification or add additional context in comments.

5 Comments

Maybe the answer should show an example of starting the event loop in a separate thread? The included snippet will work, but is unlikely to be what the OP wanted to achieve.
Frustrating. The included snippet is actually exactly what I wanted to achieve, but asyncio.run and event_loop.run_until_complete are both off limits to me as my main event loop has been monopolized by the async library I'm trying to interact with. Trying to use them will get me This loop is already running errors. I also can't use a new event loop in the separate thread because the async library holds a reference to my main thread's event loop and will throw Future attached to a different loop errors.
Doesn't seem like there's truly a way to have the existing event loop run an async method and return a result unless I convert my entire codebase to async/await.
Both of your problems can be solved. Let me find a spare time to add details. But in short, for the first question, if you are still using the event loop in main thread, then all your snippet are just useless as they are exactly the same as a simple normal submit. For your second question, mostly an async lib allows you to pass a ` loop` parameter. Or you can create a new event loop and set it as default event loop in the target thread.
So if I create a new thread with a new event loop, bind that new event loop to the async lib, and then call the async lib's methods from my main thread's event loop, it should work? Or am I misunderstanding? I still don't quite see where I can call run_until_complete since I think I need it to be the same loop as run_coroutine_threadsafe is getting, and that event loop is already in a permanent run_until_complete because of the async lib.
1

I have run into the same issue, and the answer by @Sraw did not help me, as in my case the coroutine also had to be done in the main event loop, monopolized by the async library.

As a quick (though admittedly hacky) approach, what helped me is the nest_asyncio library, which patches asyncio to allow nested event loops.

With it, the following works for me, even if the event loop is already running:

import asyncio import nest_asyncio nest_asyncio.apply() event_loop = asyncio.get_event_loop() coro = asyncio.sleep(1, result=3) print(event_loop.run_until_complete(coro)) 

This also allows me to create a simple wrapper function that, for the purposes of the question, turns any async function into a synchronous one:

def run_cor(obj): if asyncio.iscoroutine(obj): loop = asyncio.get_event_loop() result = loop.run_until_complete(obj) return result else: return obj run_cor(asyncio.sleep(1, result=3)) # Returns 3 

This was useful to me because the async library I'm using (which is Telethon) is changing the same functions to be either sync or async depending on whether an event loop is running or not, allowing me to have the same code for both cases.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.