5
votes

I have a simple class that leverages an async generator to retrieve a list of URLs:

import aiohttp
import asyncio
import logging
import sys

LOOP = asyncio.get_event_loop()
N_SEMAPHORE = 3

FORMAT = '[%(asctime)s] - %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)

class ASYNC_GENERATOR(object):
    def __init__(self, n_semaphore=N_SEMAPHORE, loop=LOOP):
        self.loop = loop
        self.semaphore = asyncio.Semaphore(n_semaphore)
        self.session = aiohttp.ClientSession(loop=self.loop)

    async def _get_url(self, url):
        """
        Sends an http GET request to an API endpoint
        """

        async with self.semaphore:
            async with self.session.get(url) as response:
                logger.info(f'Request URL: {url} [{response.status}]')
                read_response = await response.read()

                return {
                    'read': read_response,
                    'status': response.status,
                }

    def get_routes(self, urls):
        """
        Wrapper around _get_url (multiple urls asynchronously)

        This returns an async generator
        """

        # Asynchronous http GET requests
        coros = [self._get_url(url) for url in urls]
        futures = asyncio.as_completed(coros)
        for future in futures:
            yield self.loop.run_until_complete(future)

    def close(self):
        self.session._connector.close()

When I execute this main part of the code:

if __name__ == '__main__':
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    for response in responses:
        response = next(ag.get_routes(['https://httpbin.org/get']))
    ag.close()

The log prints out:

[2018-05-15 12:59:49,228] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 12:59:49,235] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 12:59:49,242] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 12:59:49,285] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 12:59:49,290] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 12:59:49,295] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 12:59:49,335] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 12:59:49,340] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 12:59:49,347] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 12:59:49,388] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 12:59:49,394] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,444] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,503] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,553] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,603] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,650] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,700] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,825] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,875] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,922] - Request URL: https://httpbin.org/get [200]

Since responses is an async generator, I expect it to yield one response from the async generator (which should only send the request upon actually yielding), send a separate request to the endpoint with no x parameter, and then yield the next response from the async generator. This should flip back and forth between a request with an x parameter and a request with no parameters. Instead, it is yielding all responses from the async generator with an x parameter and then followed by all of the https requests that have no parameters.

Something similar happens when I do:

ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
next(responses)
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()

And the log prints:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,656] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 13:08:38,681] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 13:08:38,695] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 13:08:38,717] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 13:08:38,741] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 13:08:38,750] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 13:08:38,773] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 13:08:38,792] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 13:08:38,803] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

Instead, what I want is:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

There are times when I want to retrieve all of the responses first before doing anything else. However, there are also times when I want to interject and make intermediate requests before yielding the next item from the generator (i.e., the generator returns results from paginated search results and I want to process further links from each page before moving onto the next page).

What do I need to change to achieve the required result?

1
responses is not an async generator; it's a regular, normal, synchronous generator function. To be async, you'd have to use async def responses(...): - Martijn Pieters
I don't think I follow. Would you mind providing a code snippet? Would async def responses(...): sit between get_routes and _get_url? Or do I use this inside __main__? And what would I be awaiting inside async def responses(...):? Thank you for your patience as I am still learning this complicated async stuff! - slaw
@MartijnPieters The trouble is that the OP is using as_completed, which is itself an ordinary generator, but designed (strangely) for use with asyncio. Thus it comes naturally to wrap it using another ordinary generator. Using run_until_complete on coroutines yielded (synchronously) by as_completed is a legitimate way of using it, though not something I'd recommend. - user4815162342
@user4815162342 I am flexible and willing to learn the best way to go about this if you don't mind offering an answer with the right solution? I cobbled this together using limited knowledge scraped from disparate examples from the web so I'd appreciate any help that I can get. Hopefully, it is clear what I am trying to achieve? - slaw
@slaw: sorry, that was too brief a comment and I had to step away. The term async generator has a very specific meaning in Python; your generator function pushes tasks into the asyncio loop but is not an async generator. You are firing off a series of tasks and then waiting for the next one to be complete; all those tasks are executing cooperatively. - Martijn Pieters

1 Answers

7
votes

Leaving aside the technical question of whether responses is an async generator (it's not, as Python uses the term), your problem lies in as_completed. as_completed starts a bunch of coroutines in parallel and provides means to obtain their results as they complete. That the futures run in parallel is not exactly obvious from the documentation (improved in later versions), but it makes sense if you consider that the original concurrent.futures.as_completed works on thread-based futures which revolve around parallel execution. Conceptually, the same is true of asyncio futures.

Your code obtains only the first (fastest-arriving) result and then start doing something else, also using asyncio. The remaining coroutines passed to as_completed are not frozen up merely because no one is collecting their results - they are doing their jobs in the background, and once done are ready to be awaited (in your case by the code inside as_completed, which you access using loop.run_until_complete()). I would venture to guess that the URL without parameters takes longer to retrieve than the URL with just the parameter x, which is why it gets printed after all other coroutines.

In other words, those log lines being printed means that asyncio is doing its job and providing the parallel execution you requested! If you don't want parallel execution, then don't ask for it, execute them serially:

def get_routes(self, urls):
    for url in urls:
        yield loop.run_until_complete(self._get_url(url))

But this is a poor way of using asyncio - its main loop is non-reentrant, so to ensure composability, you almost certainly want the loop to be spun just once once at top-level. This is typically done with a construct like loop.run_until_complete(main()) or loop.run_forever(). As Martijn pointed out, you could achieve that, while retaining the nice generator API, by making get_routes an actual async generator:

async def get_routes(self, urls):
    for url in urls:
        result = await self._get_url(url)
        yield result

Now you can have a main() coroutine that looks like this:

async def main():
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    async for response in responses:
        # simulate `next` with async iteration
        async for other_response in ag.get_routes(['https://httpbin.org/get']):
            break
    ag.close()

loop.run_until_complete(main())