0

I have the following code:

import asyncio async def myfunc(i): print("hello", i) await asyncio.sleep(i) print("world", i) async def main(): asyncio.create_task(myfunc(2)) asyncio.create_task(myfunc(1)) asyncio.run(main()) 

It outputs:

hello 2 hello 1 

Notice that world isn't printed anywhere. Why is the output we see being produced? I was expecting:

hello 2 hello 1 world 1 world 2 

Because I thought that the asyncio.sleep(i) calls would yield execution to the event loop, at which point the event loop would reschedule them after their respective wait times. Clearly I am misunderstanding. Can someone explain?

0

2 Answers 2

1

The problem is that the loop in main is not waiting for the tasks to finish causing the tasks to not finish executing. Use asyncio.gather() to launch and wait for all the coroutines to execute.

import asyncio async def myfunc(i): print("hello", i) await asyncio.sleep(i) print("world", i) async def main(): await asyncio.gather(myfunc(2), myfunc(1)) asyncio.run(main()) 

The logic that you describe in the comments is more complicated and there is no function that implements it, so you have to design the logic, in this case there are 2 conditions that must happen to finish the application:

  • All tasks must be launched.
  • There should be no active task.

Considering this I use a Future to create a flag that indicates it, also use add_done_callback to notify me that the job has finished.

import asyncio from collections import deque import random async def f(identifier, seconds): print(f"Starting task: {identifier}, seconds: {seconds}s") await asyncio.sleep(seconds) print(f"Finished task: {identifier}") return identifier T = 3 class Manager: def __init__(self, q): self._q = q self._future = asyncio.get_event_loop().create_future() self._active_tasks = set() self._status = False @property def q(self): return self._q def launch_task(self): try: identifier = self.q.pop() except IndexError: return False else: seconds = random.uniform(T - 1, T + 1) task = asyncio.create_task(f(identifier, seconds)) self._active_tasks.add(identifier) task.add_done_callback(self._finished_callback) return True async def work(self): self._status = True while self.launch_task(): await asyncio.sleep(T) self._status = False await self._future def _finished_callback(self, c): self._active_tasks.remove(c.result()) if not self._active_tasks and not self._status: self._future.set_result(None) async def main(): identifiers = deque(range(10)) manager = Manager(identifiers) await manager.work() if __name__ == "__main__": asyncio.run(main()) 
Sign up to request clarification or add additional context in comments.

8 Comments

I see. How would I approach this in the case where I create_task every 2 seconds in a loop in main until a network I/O operation has completed? I'd like to do something like the equivalent of pthread_join after the loop, is there such an equivalent? I'd use gather but I need the loop to be non blocking
@CodeM4aster I don't understand your comment, avoid talking about networks and pthreads. You want to launch a new task every T seconds, and this task can take N seconds. I am right? If so, is there an event that should cause the program to terminate or should it run forever?
Not quite. In a loop I create a new task every T seconds. Each consumes from a global job queue and loops until it is empty. The loop that is creating tasks is also looping until the global queue is empty. As soon as that loop finishes it ends the program even though some tasks haven't finished. I'd like to wait until they finish before exiting
@CodeM4aster From what I understand you have already predefined N tasks that are launched sequentially every T seconds, and you want the program when all the tasks finish executing, am I correct?
Almost, the tasks are not predefined. When I create a task I need information that is only available at runtime. There is no upper bound on N, I keep creating new tasks every T seconds until the job queue empties, at which point I should make no more tasks but let the existing tasks complete
|
0

Found a much simpler solution than the one provided by @eyllanesc here. Turns out there is a function that implements it

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.