12

I'm looking to be able to yield from a number of async coroutines. Asyncio's as_completed is kind of close to what I'm looking for (i.e. I want any of the coroutines to be able to yield at any time back to the caller and then continue), but that only seems to allow regular coroutines with a single return.

Here's what I have so far:

import asyncio async def test(id_): print(f'{id_} sleeping') await asyncio.sleep(id_) return id_ async def test_gen(id_): count = 0 while True: print(f'{id_} sleeping') await asyncio.sleep(id_) yield id_ count += 1 if count > 5: return async def main(): runs = [test(i) for i in range(3)] for i in asyncio.as_completed(runs): i = await i print(f'{i} yielded') if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() 

Replacing runs = [test(i) for i in range(3)] with runs = [test_gen(i) for i in range(3)] and for for i in asyncio.as_completed(runs) to iterate on each yield is what I'm after.

Is this possible to express in Python and are there any third party maybe that give you more options then the standard library for coroutine process flow?

Thanks

1 Answer 1

11

You can use aiostream.stream.merge:

from aiostream import stream async def main(): runs = [test_gen(i) for i in range(3)] async for x in stream.merge(*runs): print(f'{x} yielded') 

Run it in a safe context to make sure the generators are cleaned up properly after the iteration:

async def main(): runs = [test_gen(i) for i in range(3)] merged = stream.merge(*runs) async with merged.stream() as streamer: async for x in streamer: print(f'{x} yielded') 

Or make it more compact using pipes:

from aiostream import stream, pipe async def main(): runs = [test_gen(i) for i in range(3)] await (stream.merge(*runs) | pipe.print('{} yielded')) 

More examples in the documentation.


Adressing @nirvana-msu comment

It is possible to identify the generator that yielded a given value by preparing sources accordingly:

async def main(): runs = [test_gen(i) for i in range(3)] sources = [stream.map(xs, lambda x: (i, x)) for i, xs in enumerate(runs)] async for i, x in stream.merge(*sources): print(f'ID {i}: {x}') 
Sign up to request clarification or add additional context in comments.

7 Comments

this looks spot on, thank you. I'll give it a try later this evening and award you the answer.
@Vincent: This is exactly what I was looking for! I considered using aioreactive as well, aiostream's source code is much more compact, and seems a lot easier to use and understand . Very good documentation as well! Thanks a lot :)
Is there a way to know which generator the current value was yielded from?
If you want to avoid warnings such as AsyncIteratorContext is iterated outside of its context, you must also protect the async for inside an async with aiostream.stream.merge(*sources).stream() as stream. See github.com/vxgmichel/aiostream/issues/46 and github.com/dabeaz/curio/issues/176 for context.
@RewanthTammana The main downside of not using the async context is that you might still have tasks running in the background when leaving the iteration. The async context ensures that all the started tasks are either finished or cancelled. I would not expect a huge performance difference between the two approaches, outside of obvious cases like leaving the for-loop before all items are produced. Feel free to report an issue on the github repository with the timing you measured.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.