23

In my code I have a class with properties, that occasionally need to run asynchronous code. Sometimes I need to access the property from asynchronous function, sometimes from synchronous - that's why I don't want my properties to be asynchronous. Besides, I have an impression that asynchronous properties in general is a code smell. Correct me if I'm wrong.

I have a problem with executing the asynchronous method from the synchronous property and blocking the further execution until the asynchronous method will finish.

Here is a sample code:

import asyncio async def main(): print('entering main') synchronous_property() print('exiting main') def synchronous_property(): print('entering synchronous_property') loop = asyncio.get_event_loop() try: # this will raise an exception, so I catch it and ignore loop.run_until_complete(asynchronous()) except RuntimeError: pass print('exiting synchronous_property') async def asynchronous(): print('entering asynchronous') print('exiting asynchronous') asyncio.run(main()) 

Its output:

entering main entering synchronous_property exiting synchronous_property exiting main entering asynchronous exiting asynchronous 

First, the RuntimeError capturing seems wrong, but if I won't do that, I'll get RuntimeError: This event loop is already running exception.

Second, the asynchronous() function is executed last, after the synchronous one finish. I want to do some processing on the data set by asynchronous method so I need to wait for it to finish. If I'll add await asyncio.sleep(0) after calling synchronous_property(), it will call asynchronous() before main() finish, but it doesn't help me. I need to run asynchronous() before synchronous_property() finish.

What am I missing? I'm running python 3.7.

2
  • main() and asynchronous() are both async. synchronous_property() is sync but called within async. So the grouping of your printing looks correct. The error you are trapping is warning you that you are trying to create an extra event loop which is kind of critical to the whole issue. Commented Mar 13, 2019 at 18:07
  • Yes, I'm aware of the fact, that the output is correct. I want to make the async call to execute from sync and block it's execution. Commented Mar 13, 2019 at 19:05

5 Answers 5

20

Asyncio is really insistent on not allowing nested event loops, by design. However, you can always run another event loop in a different thread. Here is a variant that uses a thread pool to avoid having to create a new thread each time around:

import asyncio, concurrent.futures async def main(): print('entering main') synchronous_property() print('exiting main') pool = concurrent.futures.ThreadPoolExecutor() def synchronous_property(): print('entering synchronous_property') result = pool.submit(asyncio.run, asynchronous()).result() print('exiting synchronous_property', result) async def asynchronous(): print('entering asynchronous') await asyncio.sleep(1) print('exiting asynchronous') return 42 asyncio.run(main()) 

This code creates a new event loop on each sync→async boundary, so don't expect high performance if you're doing that a lot. It could be improved by creating only one event loop per thread using asyncio.new_event_loop, and caching it in a thread-local variable.

Sign up to request clarification or add additional context in comments.

7 Comments

Just call the sync function using the thread pool executor? One loop in one process with some threads for handling the sync function?
@Craftables The OP wants their sync functions to be able to arbitrarily call async functions. With the sync functions being themselves called from async, that is nesting and it won't work with one event loop.
OK, so basically, there is no other clean solution than having an asynchronous functions everywhere. There is always a danger that n functions deeper I will need to call async, and single non-async function in a chain will make it very inconvienient. Good to know, thanks.
@AndrzejKlajnert Exactly, the assumption that everything is async is deeply embedded because each coroutine must be able to suspend all the way to the event loop. Recursive event loops don't work because a recursive loop means that new tasks will run before the current task has either finished or suspended, which means it is in an inconsistent state. Inability to call async to sync is sometimes referred to as the function color issue.
@unhammer Because that would raise RuntimeError when synchronous_property() is invoked from async code - asyncio loops don't nest. The OP has a rather specific requirement to execute async code from sync context, even when that sync context is itself invoked from async. That is not how asyncio was designed to be used, which is why a hack (workaround) using threads is needed.
|
5

The easiest way is using an existing "wheel", like asgiref.async_to_sync

from asgiref.sync import async_to_sync 

then:

async_to_sync(main)() 

in general:

async_to_sync(<your_async_func>)(<.. arguments for async function ..>) 

This is a caller class which turns an awaitable that only works on the thread with the event loop into a synchronous callable that works in a subthread.

If the call stack contains an async loop, the code runs there. Otherwise, the code runs in a new loop in a new thread.

Either way, this thread then pauses and waits to run any thread_sensitive code called from further down the call stack using SyncToAsync, before finally exiting once the async task returns.

Comments

0

There appears to a problem with the question as stated. Restating the question: How to communicate between a thread (containing no async processes and hence considered sync) and an async proces (running in some event loop). One approach is to use two sync Queues. The sync process puts its request/parameters into the QtoAsync, and waits on the QtoSync. The async process reads the QtoAsync WITHOUT wait, and if it finds a request/parameters, executes the request, and places the result in QtoSync.

import queue QtoAsync = queue.Queue() QtoSync = queue.Queue() ... async def asyncProc(): while True: try: data=QtoAsync.get_nowait() result = await <the async that you wish to execute> QtoAsync.put(result) #This can block if queue is full. you can use put_nowait and handle the exception. except queue.Empty: await asyncio.sleep(0.001) #put a nominal delay forcing this to wait in event loop .... #start the sync process in a different thread here.. asyncio.run(main()) #main invokes the async tasks including the asyncProc The sync thread puts it request to async using: req = <the async that you wish to execute> QtoAsync.put(req) result = QtoSync.get() 

This should work.

Problem with the question as stated: 1. When the async processes are started with asyncio.run (or similar) execution blocks until the async processes are completed. A separate sync thread has to be started explicity before calling asyncio.run 2. In general asyncio processes depend on other asyncio processes in that loop. So calling a async process from another thread is not permitted directly. The interaction should be with the event loop, and using two queues is one approach.

Comments

0

If you control enough of the environment that you can run the non-async code in a separate thread, this is actually fairly tractable.

If you have the event loop object, then asyncio.run_coroutine_threadsafe() will return a concurrent.futures.Future – not an asyncio.Future – that will eventually produce the result of the function. With that future you can call future.result() to block on the eventual result, but that blocks the current thread.

So from a different thread you could run

def synchronous_property(loop): print('entering synchronous_property') future = asyncio.run_coroutine_threadsafe(asynchronous(), loop) future.result() print('exiting synchronous_property') 

passing in the event loop as a parameter. The future.result() call returns the result of await asynchronous() when that's useful to you.

In the context of the question, your main program is inside an asyncio.run() call, but you have a blocking API layer that you can't modify for compatibility reasons. From anywhere in this call stack, then, asyncio.get_running_loop() can return the current loop, and asyncio.to_thread() can run an arbitrary blocking callable in a new thread. If you have enough control over the calling environment, then you can add an asyncio.to_thread() wrapper, like

async def main(): print('entering main') # instead of just calling synchronous_property() await asyncio.to_thread(synchronous_property, asyncio.get_running_loop()) print('exiting main') 

Being able to run the synchronous code in a separate thread is important. You can't have nested asyncio.run() calls, and if there is an asyncio event loop anywhere in your call stack then there's no way to suspend the loop and run other asyncio tasks without actually being in an async function. Conversely, the concurrent.futures.Future.result() call is blocking, so nothing else (including asyncio code) will happen on the current thread until that returns.

It matters that you run the legacy synchronous code on a separate thread, rather than the asyncio code. asyncio code tends to assume that it is being run sequentially and there is not concurrent access to an object other than at await points; it is not typically thread-safe. Especially if you're only making limited calls to this synchronous API, it will probably be easier to make those specific calls thread-safe than to try to make the asyncio code also be thread-safe.

Comments

-5

I want to make the async call to execute from sync and block it's execution

Just make the sync func async and await the asynchronous function. Async functions are just like normal functions and you can put whatever code you want in them. If you still have a problem modify your question using actual code you are trying to run.

import asyncio async def main(): print('entering main') await synchronous_property() print('exiting main') async def synchronous_property(): print('entering synchronous_property') await asynchronous() # Do whatever sync stuff you want who cares print('exiting synchronous_property') async def asynchronous(): print('entering asynchronous') print('exiting asynchronous') asyncio.run(main()) 

1 Comment

That's the point - I don't want to have my property asynchronous. It is executed also by some synchronous code, I don't want my whole code with async functions.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.