One of the main appeals of using Python’s asyncio is being able to fire off many coroutines and run them concurrently. How many ways do you know for waiting for their results?

There’s quite a bit of them! However the different ways have different properties and all of them deserve their place. However I regularly have to look them up to find the right one1.


Before I start, a few definitions that I will use throughout this post:

  • coroutine: A running asynchronous function. So if you define a function as async def f(): ... and call it as f(), you get back a coroutine in the sense that the term is used throughout this post.
  • awaitable: anything that works with await: coroutines, asyncio.Futures, asyncio.Tasks, objects that have a __await__ method.
  • I will be using two async functions f and g for my examples. It’s not important what they do, only that they are defined as async def f(): ... and async def g(): ... and that they terminate eventually.

await

The simplest case is to await your coroutines:

result_f = await f()
result_g = await g()

However:

  1. The coroutines do not run concurrently. g only starts executing after f has finished.
  2. You can’t cancel them once you started awaiting.

A naïve approach to the first problem might be something like this:

coro_f = f()
coro_g = g()

result_f = await coro_f
result_g = await coro_g

But the execution of g/coro_g doesn’t start before it is awaited, making it identical to the first example. For both problems you need to wrap your coroutines in tasks.

Tasks

asyncio.Tasks wrap your coroutines and get independently scheduled for execution by the event loop whenever you yield control to it2. You can create them using asyncio.create_task():

task_f = asyncio.create_task(f())
task_g = asyncio.create_task(g())

await asyncio.sleep(0.1) # <- f() and g() are already running!
result_f = await task_f
result_g = await task_g

Your tasks now run concurrently and if you decide that you don’t want to wait for task_f or task_g to finish, you can cancel them using task_f.cancel() or task_g.cancel() respectively. Please note that you must create both tasks before you await the first one – otherwise you gain nothing. However, the awaits are only needed to collect the results and to clean up resources3.


But waiting for each of them like this is not very practical. In real life code you often enough don’t even know how many awaitables you will need to wrangle. What we need is to gather the results of multiple awaitables.

asyncio.gather()

asyncio.gather() takes 1 or more awaitables as *args, wraps them in tasks if necessary, and waits for all of them to finish. Then it returns the results of all awaitables in the same order as you passed in the awaitables:

result_f, result_g = await asyncio.gather(f(), g())

If f() or g() raise an exception, gather() will raise it immediately, but the other tasks are not affected. However if gather() itself is canceled, all of the awaitables that it’s gathering – and that have not completed yet – are also canceled.

You can also pass return_exceptions=True and then exceptions are returned like normal results and you have to check yourself whether or not they were successful (e.g. using isinstance(result, BaseException).

Summary

  • Takes many awaitables as *args.
  • Wraps each awaitable in a task if necessary.
  • Returns the list of results in the same order.
    • Allows errors to be returned as results (by passing return_exceptions=True).
    • Otherwise if one of the awaitables raises an exception, gather() propagates it immediately to the caller. But the remaining tasks keep running.
  • If gather() itself is canceled, it cancels all unfinished tasks it’s gathering.

Now we can wait for many awaitables at once! However well-behaved distributed systems need timeouts. Since gather() hasn’t an option for that, we need the next helper.

asyncio.wait_for()

asyncio.wait_for() takes two arguments: one awaitable and a timeout in seconds. If the awaitable is a coroutine, it will automatically be wrapped by a task. So the following construct is quite common:

try:
    result_f, result_g = await asyncio.wait_for(
        asyncio.gather(f(), g()),
        timeout=5.0.
    )
except asyncio.TimeoutError:
    print("oops took longer than 5s!")

If the timeout expires, the inner task gets cancelled. Which for gather() means that all tasks that it is gathering are canceled too: in this case f() and g().


Please note that just replacing create_task() by wait_for() and calling it a day does not work. create_task() is a regular function that returns a task; wait_for() is an async function that returns a coroutine. That means it does not start executing until you await it:

# NOT concurrent!
cf = asyncio.wait_for(f(), timeout=0.5)
cg = asyncio.wait_for(g(), timeout=0.5)

# cf and cg are both COROUTINES, not tasks!
# At THIS point, there's NOTHING to be scheduled by the event loop.

await cf  # g() is NOT executing yet!
await cg  # wait_for creates only HERE the task for g()

If you now think that there would be no need for wait_for() if gather() had a timeout option, we’re thinking the same thing.

Summary

  • Takes one awaitable.
  • Wraps the awaitable in a task if necessary.
  • Takes a timeout that cancels the task if it expires.
  • Unlike create_task(), is a coroutine itself that doesn’t execute until awaited.

Interlude: async-timeout

A more elegant approach to timeouts is the async-timeout package on PyPI. It gives you an asynchronous context manager that allows you to apply a total timeout even if you need to execute the coroutines sequentially:

async with async_timeout.timeout(5.0):
    await f()
    await g()

Sometimes, you don’t want to wait until all awaitables are done. Maybe you want to process them as they finish and report some kind of progress to the user.

asyncio.as_completed()

asyncio.as_completed() takes an iterable4 of awaitables and returns an iterator that yields asyncio.Futures in the order the awaitables are done:

for fut in asyncio.as_completed([task_f, task_g], timeout=5.0):
    try:
        await fut
        print("one task down!")
    except Exception:
        print("ouch")

There’s no way to find out which awaitable you’re awaiting though5.

Summary

  • Takes many awaitables in an iterable.
  • Yields Futures that you have to await as soon as something is done.
  • Does not guarantee to return the original awaitables that you passed in.
  • Does wrap the awaitables in tasks (it actually calls asyncio.ensure_future() on them).
  • Takes an optional timeout.

Finally, you may want more control over waiting and that takes us to the final waiting primitive.

asyncio.wait()

asyncio.wait() is the most unwieldy of the APIs but also the most powerful one. It reminds a little of the venerable select() system call.

Like as_completed(), it takes awaitables in an iterable. It will return two sets: the awaitables that are done and those that are still pending. It’s up to you to await them6 and to determine which result belongs to what:

done, pending = await asyncio.wait([task_f, task_g])

for t in done:
    try:
        if t is task_f:
            print(f"The result of f() is { await task_f }.")
    except Exception as e:
        print(f"f() failed with { repr(e) }.")

# ...and same for g()

This code would not work if you passed in a coroutine and wait() wrapped it in a task, because the returned awaitable would be different from the one that you passed in and the identity check would always fail7. Currently, wait() will do it anyway, but it will warn you about it because it’s probably a bug.

How can an awaitable be still pending when wait() returns? There are two possibilities:

  1. You can pass a timeout after which wait() will return. Unlike with gather(), nothing is done to the awaitables when that timeout expires. The function just returns and sorts the tasks into the done and the pending buckets.
  2. You can tell wait() to not wait until all awaitables are done using the return_when argument. By default it’s set to asyncio.ALL_COMPLETED which does exactly what it sounds like. But you can also set it to asyncio.FIRST_EXCEPTION that also waits for all awaitables to finish, unless one of them raises an exception – then it will make it return immediately. Finally, asyncio.FIRST_COMPLETED returns the moment any of the awaitables finishes.

All of this together is a bit complicated but allows you to build powerful dispatcher functions. Often using a while loop until all awaitables are done.

Summary

  • Takes many awaitables in an iterable.
  • Will not return the results, but the passed awaitables sorted into two sets, that are returned as a tuple of (done, pending). It’s up to you to await and dispatch.
  • Will wrap in tasks, but will warn about it because that means you get a different awaitables back than you put in. Avoid and only pass tasks!
  • Gives you fine-grained control when it should sort the tasks into the buckets and return – it never cancels any of the tasks:
    • Pass a timeout to limit the maximum waiting time.
    • Pass a return_when with:
      • asyncio.ALL_COMPLETED: As soon all awaitables are done.
      • asyncio.FIRST_EXCEPTION: As soon as all are done, or as soon as one raises an exception.
      • asyncio.FIRST_COMPLETED: As soon as any awaitable is done.

Next Steps

Of all the available asyncio material, I’m only aware of two sets that are comprehensive, accurate, and written by people with extensive practical asyncio experience:

  1. Łukasz Langa who did a lot of asyncio work at Facebook and Instagram and is now working for EdgeDB – a database that uses asyncio extensively – started a video series that begins with nothing and works itself up to asyncio. The third episode is particularly pertinent to this post, because it shows the practical use of asyncio.wait().
  2. For more advanced asyncio-in-production advice, Lynn Root of Spotify gave two talks on that topic: asyncio in Practice: We Did It Wrong (2018) and Advanced asyncio: Solving Real-world Production Problems (2019) with an extensive written tutorial.

Footnotes


  1. Yes, this blog post is actually a cheatsheet for myself. ↩︎

  2. Usually by awaiting something. ↩︎

  3. asyncio will complain if you don’t consume all your results and exceptions. ↩︎

  4. For example a list, a tuple, or a set. ↩︎

  5. There are loopholes but they are ugly and rely on undocumented implementation details. ↩︎

  6. Since the ones from the done bucket are guaranteed to be done, you can also introspect their results using Task.result() and Task.exception(). But I find awaiting them more idiomatic. ↩︎

  7. Since wait_for() returns a coroutine, this very much applies to it too! Those two functions don’t mix unless you wrap the call to wait_for() into a task. ↩︎