9

My application reads data from a slow i/o source, does some processing and then writes it to a local file. I've implemented this with generators like so:

import time def io_task(x): print("requesting data for input %s" % x) time.sleep(1) # this simulates a blocking I/O task return 2*x def producer(xs): for x in xs: yield io_task(x) def consumer(xs): with open('output.txt', 'w') as fp: for x in xs: print("writing %s" % x) fp.write(str(x) + '\n') data = [1,2,3,4,5] consumer(producer(data)) 

Now I'd like to parallelize this task with the help of asyncio, but I can't seem to figure out how. The main issue for me is to directly feed data through a generator from the producer to the consumer while letting asyncio make multiple parallel requests to io_task(x). Also, this whole async def vs. @asyncio.coroutine thing is confusing me.

Can someone show me how to build a minimal working example that uses asyncio from this sample code?

(Note: It is not ok to just make calls to io_task(), buffer the results and then write them to a file. I need a solution that works on large data sets that can exceed the main memory, that's why I've been using generators so far. It is however safe to assume that the consumer is always faster than all producers combined)

1 Answer 1

10

Since python 3.6 and asynchronous generators, very few changes need be applied to make your code compatible with asyncio.

The io_task function becomes a coroutine:

async def io_task(x): await asyncio.sleep(1) return 2*x 

The producer generator becomes an asynchronous generator:

async def producer(xs): for x in xs: yield await io_task(x) 

The consumer function becomes a coroutine and uses aiofiles, asynchronous context management and asynchronous iteration:

async def consumer(xs): async with aiofiles.open('output.txt', 'w') as fp: async for x in xs: await fp.write(str(x) + '\n') 

And the main coroutine runs in an event loop:

data = [1,2,3,4,5] main = consumer(producer(data)) loop = asyncio.get_event_loop() loop.run_until_complete(main) loop.close() 

Also, you may consider using aiostream to pipeline some processing operations between the producer and the consumer.


EDIT: The different I/O tasks can easily be run concurrently on the producer side by using as_completed:

async def producer(xs): coros = [io_task(x) for x in xs] for future in asyncio.as_completed(coros): yield await future 
Sign up to request clarification or add additional context in comments.

11 Comments

this is awesome, now I definitely have to upgrade to python 3.6. thanks :)
@Klamann Yes, your example brought most of the cool asynchronous features of python 3.5 and 3.6. It almost looks like a tutorial ;)
I've tried your code now, but it doesn't seem to run asynchronously. When I add print statements before await asyncio.sleep and await fp.write, all of them are executed sequentially and the total run time of the program is 5 seconds. Is this an issue on my platform or do you see the same behaviour?
@Klamann Yes, that's the expected behavior. The point of converting this program to asyncio is that you could potentially run other tasks at the same time. For instance, you could have another producer/consumer pair working on another dataset. If you also wish to separate the producer from the consumer to run them concurrently, you could use a queue. However, the total run time will still be 5 seconds since the producer itself is sequential. Hope that helps.
Hi, I have two questions about this example: 1. what if io_task is an async_generator, what's the replacement of "yield await io_task(x)" in producer, so that it can act like "yield from"? 2. If I have a very large in data and I have to use generator, I want to avoid building a super long coros list, can it also be a generator?
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.