7

I'm having trouble combining async generators and actually running them. This is because the only way I 've found to run them is through an event loop which returns an iterable and not a generator. Let me illustrate this with a simple example:

Let's say I have a function google_search that searches google by scraping it (I'm not using the API on purpose). It takes in a search string and returns a generator of search results. This generator doesn't end when the page is over, the function continues by going on to the next page. Therefore the google_search function returns a possibly nearly endless generator (it will technically always end but often you can get millions of hits for a search on google)

def google_search(search_string): # Basically uses requests/aiohttp and beautifulsoup # to parse the resulting html and yield search results # Assume this function works ...... 

Okay, so now I want to make a function that allows me to iterate over multiple google_search generators. I'd like something like this:

def google_searches(*search_strings): for results in zip(google_search(query) for query in search_strings): yield results 

This way I can use a simple for loop to unwind google_searches and get my results. And the above code works well but is very slow for any reasonably big number of searches. The code is sending a request for the first search, then the second and so forth until finally, it yields results. I would like to speed this up (a lot). My first idea is to change google_searches to an async function (I am using python 3.6.3 and can use await/async etc). This then creates an async generator which is fine but I can only run it in another async function or an event loop. And running it in an event loop with run_until_complete(loop.gather(...)) returns a list of results instead of a normal generator, which defeats the purpose as there's probably way too many search results to hold in a list.

How can I make the google_searches function faster (using preferably async code but anything is welcomed) by executing requests asynchronously while still having it be a vanilla generator? Thanks in advance!

1
  • 2
    Why the downvote? Is my question not properly asked or not specific? I don't mind the downvote but I'd like to know what's wrong so I can improve upon it. Thanks. Commented Dec 31, 2017 at 18:45

3 Answers 3

6

The accepted answer waits for one result from EACH async generator before calling the generators again. If data doesn't come at the same exact same pace, that may be a problem. The solution below takes multiple async iterables (generators or not) and iterates all of them simultaneously in multiple coroutines. Each coroutine puts the results in a asyncio.Queue, which is then iterated by the client code:

Iterator code:

import asyncio from async_timeout import timeout class MergeAsyncIterator: def __init__(self, *it, timeout=60, maxsize=0): self._it = [self.iter_coro(i) for i in it] self.timeout = timeout self._futures = [] self._queue = asyncio.Queue(maxsize=maxsize) def __aiter__(self): for it in self._it: f = asyncio.ensure_future(it) self._futures.append(f) return self async def __anext__(self): if all(f.done() for f in self._futures) and self._queue.empty(): raise StopAsyncIteration with timeout(self.timeout): try: return await self._queue.get() except asyncio.CancelledError: raise StopAsyncIteration def iter_coro(self, it): if not hasattr(it, '__aiter__'): raise ValueError('Object passed must be an AsyncIterable') return self.aiter_to_queue(it) async def aiter_to_queue(self, ait): async for i in ait: await self._queue.put(i) await asyncio.sleep(0) 

Sample client code:

import random import asyncio from datetime import datetime async def myaiter(name): for i in range(5): n = random.randint(0, 3) await asyncio.sleep(0.1 + n) yield (name, n) yield (name, 'DONE') async def main(): aiters = [myaiter(i) for i in 'abc'] async for i in MergeAsyncIterator(*aiters, timeout=3): print(datetime.now().strftime('%H:%M:%S.%f'), i) loop = asyncio.get_event_loop() loop.run_until_complete(main()) 

Output:

14:48:28.638975 ('a', 1) 14:48:29.638822 ('b', 2) 14:48:29.741651 ('b', 0) 14:48:29.742013 ('a', 1) 14:48:30.639588 ('c', 3) 14:48:31.742705 ('c', 1) 14:48:31.847440 ('b', 2) 14:48:31.847828 ('a', 2) 14:48:31.847960 ('c', 0) 14:48:32.950166 ('c', 1) 14:48:33.948791 ('a', 2) 14:48:34.949339 ('b', 3) 14:48:35.055487 ('c', 2) 14:48:35.055928 ('c', 'DONE') 14:48:36.049977 ('a', 2) 14:48:36.050481 ('a', 'DONE') 14:48:37.050415 ('b', 2) 14:48:37.050966 ('b', 'DONE') 

PS: The code above uses the async_timeout third-party library.
PS2: The aiostream library does the same as the above code and much more.

Sign up to request clarification or add additional context in comments.

2 Comments

Awesome! I'm still surprised there's no easier/more pythonic way of doing this...
By the way, this solution is roughly equivalent to the aiostream merge operator
5
def google_search(search_string): # Basically uses requests/aiohttp and beautifulsoup 

This is plain synchronous generator. You would be able to use requests inside it, but if you want to use asynchronous aiohttp, you would need asynchronous generator defined with async def.

What comes to iterating over multiple async generators it's more interesting. You can't use plain zip since it works with plain iterables, not async iterables. So you should implement your own (that would also support iterating concurrently).

I made a little prototype that I think does what you want:

import asyncio import aiohttp import time # async versions of some builtins: async def anext(aiterator): try: return await aiterator.__anext__() except StopAsyncIteration as exc: raise exc def aiter(aiterable): return aiterable.__aiter__() async def azip(*iterables): iterators = [aiter(it) for it in iterables] while iterators: results = await asyncio.gather( *[anext(it) for it in iterators], return_exceptions=True, ) yield tuple(results) # emulating grabbing: async def request(url): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.text() async def google_search(search_string): for i in range(999): # big async generator url = 'http://httpbin.org/delay/{}'.format(i) # increase delay to better see concurency j = await request(url) yield search_string + ' ' + str(i) async def google_searches(*search_strings): async for results in azip(*[google_search(s) for s in search_strings]): for result in results: yield result # test it works: async def main(): async for result in google_searches('first', 'second', 'third'): print(result, int(time.time())) loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) loop.run_until_complete(loop.shutdown_asyncgens()) finally: loop.close() 

Output:

first 0 1514759561 second 0 1514759561 third 0 1514759561 first 1 1514759562 second 1 1514759562 third 1 1514759562 first 2 1514759564 second 2 1514759564 third 2 1514759564 first 3 1514759567 second 3 1514759567 third 3 1514759567 

Time shows that different searches run concurrently.

3 Comments

This is beautiful. Exactly what I was looking for. Thank you! And happy New Year!
By the way, this solution is roughly equivalent to the aiostream zip operator
I didn't know about aiostream, so thx! But aiostream's merge is even better as I wanted iterators to run until completion.
0

I am just going to paste here the solution I coded a while ago because I always end up in this question just to remember I already solved this problem before.

async def iterator_merge(iterators: typing.Dict[typing.AsyncIterator, typing.Optional[asyncio.Future]]): while iterators: for iterator, value in list(iterators.items()): if not value: iterators[iterator] = asyncio.ensure_future(iterator.__anext__()) tasks, _ = await asyncio.wait(iterators.values(), return_when=asyncio.FIRST_COMPLETED) for task in tasks: # We send the result up try: res = task.result() yield res except StopAsyncIteration: # We remove the task from the list for it, old_next in list(iterators.items()): if task is old_next: logger.debug(f'Iterator {it} finished consuming') iterators.pop(it) else: # We remove the task from the key for it, old_next in list(iterators.items()): if task is old_next: iterators[it] = None 

It has typing annotations, but I think it's a good solution this one. It's meant to be called with your async generators as keys, and a future if you have any to wait.

iterators = { k8s_stream_pod_log(name=name): None, k8s_stream_pod_events(name=name): None, } 

You can find it how I use it in github.com/txomon/abot.

1 Comment

Thanks! Would you mind explaining a bit more though? I'm not sure I fully understand your code.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.