4

I have an application which already runs infinitely with asyncio event loop run forever and also I need to run a specific function every 10 seconds.

def do_something(): pass a = asyncio.get_event_loop() a.run_forever() 

I would like to call the function do_something every 10 seconds. How to achieve this without replacing asynctio event loop with while loop ?

Edited: I can achieve this with the below code

def do_something(): pass while True: time.sleep(10) do_something() 

But I dont want to use while loop to run infinitely in my application instead I would like to go with asyncio run_forever(). So how to call the same function every 10 seconds with asyncio ? is there any scheduler like which will not block my ongoing work ?

4
  • its simple just call a function every 10 seconds inside my application. My application is doing other stuffs as well infinitely with run_forever(). Commented Nov 12, 2021 at 8:29
  • I have edited the question little to get a clear idea. Commented Nov 12, 2021 at 8:38
  • not an usual idea, but recursion just do work. Commented Nov 12, 2021 at 8:45
  • Recursion reached maximum depth will occur. Commented Nov 12, 2021 at 8:46

2 Answers 2

7

asyncio does not ship with a builtin scheduler, but it is easy enough to build your own. Simply combine a while loop with asyncio.sleep to run code every few seconds.

async def every(__seconds: float, func, *args, **kwargs): while True: func(*args, **kwargs) await asyncio.sleep(__seconds) a = asyncio.get_event_loop() a.create_task(every(1, print, "Hello World")) ... a.run_forever() 

Note that the design has to be slightly different if func is itself a coroutine or a long-running subroutine. In the former case use await func(...) and in the latter case use asyncio's thread capabilities.

Sign up to request clarification or add additional context in comments.

Comments

1

You can achieve it with

async def do_something(): while True: await asyncio.wait(10) ...rest of code... if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(do_something()) loop.run_forever() 

3 Comments

But where will I make a call to the function ?
@VimalanE check my post, i updated it
Thanks this also works fine. psowa001