My application reads data from a slow i/o source, does some processing and then writes it to a local file. I've implemented this with generators like so:
import time def io_task(x): print("requesting data for input %s" % x) time.sleep(1) # this simulates a blocking I/O task return 2*x def producer(xs): for x in xs: yield io_task(x) def consumer(xs): with open('output.txt', 'w') as fp: for x in xs: print("writing %s" % x) fp.write(str(x) + '\n') data = [1,2,3,4,5] consumer(producer(data)) Now I'd like to parallelize this task with the help of asyncio, but I can't seem to figure out how. The main issue for me is to directly feed data through a generator from the producer to the consumer while letting asyncio make multiple parallel requests to io_task(x). Also, this whole async def vs. @asyncio.coroutine thing is confusing me.
Can someone show me how to build a minimal working example that uses asyncio from this sample code?
(Note: It is not ok to just make calls to io_task(), buffer the results and then write them to a file. I need a solution that works on large data sets that can exceed the main memory, that's why I've been using generators so far. It is however safe to assume that the consumer is always faster than all producers combined)