One of the main appeals of using Python’s asyncio
is being able to fire off many coroutines and run them concurrently. How many ways do you know for waiting for their results?
There’s quite a bit of them! However the different ways have different properties and all of them deserve their place. However, I regularly have to look them up to find the right one.
TL;DR:
- If you can, use
asyncio.TaskGroup
. It’s the most modern, user-friendly API with the fewest sharp edges. - If you need more flexibility, use
asyncio.wait()
. It’s the most powerful API that doesn’t even try to be user-friendly, but also comes with no surprises.
Terminology
Before we start, a few definitions that I will use throughout this post:
- coroutine: A running asynchronous function. So if you define a function as
async def f(): ...
and call it asf()
, you get back a coroutine in the sense that the term is used throughout this post. - awaitable: anything that works with
await
: coroutines,asyncio.Future
s,asyncio.Task
s, objects that have a__await__
method. - I will be using two async functions
f
andg
for my examples. It’s not important what they do, only that they are defined asasync def f(): ...
andasync def g(): ...
and that they terminate eventually.
await
The simplest case is to await your coroutines:
result_f = await f()
result_g = await g()
However:
- The coroutines do not run concurrently.
g
only starts executing afterf
has finished. - You can’t cancel them once you started awaiting.
A naïve approach to the first problem might be something like this:
coro_f = f()
coro_g = g()
result_f = await coro_f
result_g = await coro_g
But the execution of g
/coro_g
doesn’t start before it is awaited, making it identical to the first example. For both problems you need to wrap your coroutines in tasks.
Tasks
asyncio.Task
s wrap your coroutines and get independently scheduled for execution by the event loop whenever you yield control to it1. You can create them using asyncio.create_task()
:
task_f = asyncio.create_task(f())
task_g = asyncio.create_task(g())
await asyncio.sleep(0.1) # <- f() and g() are already running!
result_f = await task_f
result_g = await task_g
Your tasks now run concurrently and if you decide that you don’t want to wait for task_f
or task_g
to finish, you can cancel them using task_f.cancel()
or task_g.cancel()
respectively.
Please note that you must create both tasks before you await the first one – otherwise you gain nothing. However, the awaits are only needed to collect the results and to clean up resources (asyncio
will complain if you don’t consume all your results and exceptions). Also note that you must keep track of your tasks, otherwise they may get garbage-collected, gifting you a fun Heisenbug to deal with.
But waiting for each of them like this is not very practical. In real-life code you often enough don’t even know how many awaitables you will need to wrangle. What we need is to gather the results of multiple awaitables.
asyncio.gather()
asyncio.gather()
takes 1 or more awaitables as *args
, wraps them in tasks if necessary, and waits for all of them to finish. Then it returns the results of all awaitables in the same order as you passed in the awaitables:
result_f, result_g = await asyncio.gather(f(), g())
If f()
or g()
raise an exception, gather()
will raise it immediately, but the other tasks are not affected. However if gather()
itself is canceled, all of the awaitables that it’s gathering – and that have not completed yet – are also canceled.
You can also pass return_exceptions=True
and then exceptions are returned like normal results and you have to check yourself whether or not they were successful (e.g. using isinstance(result, BaseException)
.
Summary
- Takes many awaitables as
*args
. - Wraps each awaitable in a task if necessary.
- Returns the list of results in the same order.
- Allows errors to be returned as results (by passing
return_exceptions=True
). - Otherwise if one of the awaitables raises an exception,
gather()
propagates it immediately to the caller. But the remaining tasks keep running.
- Allows errors to be returned as results (by passing
- If
gather()
itself is canceled, it cancels all unfinished tasks it’s gathering.
Now we can wait for many awaitables at once! However well-behaved distributed systems need timeouts. Since gather()
hasn’t an option for that, we need the next helper.
asyncio.wait_for()
asyncio.wait_for()
takes two arguments: one awaitable and a timeout in seconds. If the awaitable is a coroutine, it will automatically be wrapped by a task. So the following construct is quite common:
try:
result_f, result_g = await asyncio.wait_for(
asyncio.gather(f(), g()),
timeout=5.0,
)
except asyncio.TimeoutError:
print("oops took longer than 5s!")
If the timeout expires, the inner task gets cancelled. Which for gather()
means that all tasks that it is gathering are canceled too: in this case f()
and g()
.
Please note that just replacing create_task()
by wait_for()
and calling it a day does not work. create_task()
is a regular function that returns a task; wait_for()
is an async function that returns a coroutine. That means it does not start executing until you await it:
# NOT concurrent!
cf = asyncio.wait_for(f(), timeout=0.5)
cg = asyncio.wait_for(g(), timeout=0.5)
# cf and cg are both COROUTINES, not tasks!
# At THIS point, there's NOTHING to be scheduled by the event loop.
await cf # g() is NOT executing yet!
await cg # wait_for creates only HERE the task for g()
If you now think that there would be no need for wait_for()
if gather()
had a timeout
option, we’re thinking the same thing.
Summary
- Takes one awaitable.
- Wraps the awaitable in a task if necessary.
- Takes a timeout that cancels the task if it expires.
- Unlike
create_task()
, is a coroutine itself that doesn’t execute until awaited.
Interlude: timeout context managers
Since Python 3.11, a more elegant approach to timeouts are asyncio.timeout()
and asyncio.timeout_at()
. They give you asynchronous context managers that allow you to apply a total timeout even if you need to execute the coroutines sequentially:
async with asyncio.timeout(5.0):
await f()
await g()
For older Python versions, you can use the async-timeout package from PyPI.
Sometimes, you don’t want to wait until all awaitables are done. Maybe you want to process them as they finish and report some kind of progress to the user.
asyncio.as_completed()
asyncio.as_completed()
takes an iterable of awaitables and returns an iterator that yields asyncio.Future
s in the order the awaitables are done:
for fut in asyncio.as_completed([task_f, task_g], timeout=5.0):
try:
await fut
print("one task down!")
except Exception:
print("ouch")
There’s no way to find out which awaitable you’re awaiting2, but it can be useful if you want to provide the user with some kind of progress indicator.
Summary
- Takes many awaitables in an iterable.
- Yields
Future
s that you have to await as soon as something is done. - Does not guarantee to return the original awaitables that you passed in.
- Does wrap the awaitables in tasks (it actually calls
asyncio.ensure_future()
on them). - Takes an optional timeout.
Finally, you may want more control over waiting and that takes us to the final waiting primitive.
asyncio.wait()
asyncio.wait()
is the most unwieldy of the APIs but also the most powerful one. It reminds a little of the venerable select()
system call.
Like as_completed()
, it takes awaitables in an iterable. It will return two sets: the awaitables that are done and those that are still pending. It’s up to you to await them3 and to determine which result belongs to what:
done, pending = await asyncio.wait([task_f, task_g])
for t in done:
try:
if t is task_f:
print(f"The result of f() is { await task_f }.")
except Exception as e:
print(f"f() failed with { repr(e) }.")
# ...and same for g()
This code would not work if you passed in a coroutine and wait()
wrapped it in a task, because the returned awaitable would be different from the one that you passed in and the identity check would always fail4. Currently, wait()
will do it anyway, but it will warn you about it because it’s probably a bug.
How can an awaitable be still pending when wait()
returns? There are two possibilities:
- You can pass a timeout after which
wait()
will return. Unlike withgather()
, nothing is done to the awaitables when that timeout expires. The function just returns and sorts the tasks into the done and pending buckets. - You can tell
wait()
to not wait until all awaitables are done using thereturn_when
argument. By default it’s set toasyncio.ALL_COMPLETED
which does exactly what it sounds like. But you can also set it toasyncio.FIRST_EXCEPTION
that also waits for all awaitables to finish, unless one of them raises an exception – then it will make it return immediately. Finally,asyncio.FIRST_COMPLETED
returns the moment any of the awaitables finishes.
All of this together is a bit complicated but allows you to build powerful dispatcher functions. Often using a while
loop until all awaitables are done.
Summary
- Takes many awaitables in an iterable.
- Will not return the results, but the passed awaitables sorted into two sets, that are returned as a tuple of
(done, pending
). It’s up to you to await and dispatch. - Will wrap in tasks, but will warn about it because that means you get a different awaitables back than you put in. Avoid and only pass tasks!
- Gives you fine-grained control when it should sort the tasks into the buckets and return – it never cancels any of the tasks:
- Pass a
timeout
to limit the maximum waiting time. - Pass a
return_when
with:asyncio.ALL_COMPLETED
: As soon all awaitables are done.asyncio.FIRST_EXCEPTION
: As soon as all are done, or as soon as one raises an exception.asyncio.FIRST_COMPLETED
: As soon as any awaitable is done.
- Pass a
Task Groups
Trio has been a great source of new ideas that made their way into asyncio
. The probably most notable one is the concept of task groups that arrived with Python 3.11.
The idea is that you create an asynchronous context manager that lets you create new tasks and once the context manager is left, they are awaited:
async with asyncio.TaskGroup() as tg:
tg.create_task(f())
tg.create_task(g())
# f and g are guaranteed to be done here
You can add more tasks to the group while waiting for it to finish (e.g., if you pass tg
into f
, and f
creates more tasks while its being awaited).
If one or more of the tasks crash, the rest is cancelled and an ExceptionGroup
with all errors is raised by the context manager.
Summary
- Create as many tasks as you want.
- Once you leave the context manager, they’re all done.
- Any exception except
asyncio.CancelledError
in one task leads to the cancellation of all unfinished tasks and anExceptionGroup
being raised.
Next steps
Of all the available asyncio
material, I’m only aware of two sets that are comprehensive, accurate, and written by people with extensive practical asyncio
experience:
Łukasz Langa who did a lot of
asyncio
work at Facebook and Instagram, and worked on EdgeDB – a database that usesasyncio
extensively – started a video series that begins with nothing and works itself up toasyncio
. The third episode is particularly pertinent to this post, because it shows the practical use ofasyncio.wait()
.For more advanced
asyncio
-in-production advice, Lynn Root of Spotify gave two talks on that topic:asyncio
in Practice: We Did It Wrong (2018) and Advancedasyncio
: Solving Real-world Production Problems (2019) with an extensive written tutorial.
Usually by
await
ing something. ↩︎There are loopholes but they are ugly and rely on undocumented implementation details. ↩︎
Since the ones from the
done
bucket are guaranteed to be done, you can also introspect their results usingTask.result()
andTask.exception()
. But I find awaiting them more idiomatic. ↩︎Since
wait_for()
returns a coroutine, this very much applies to it too! Those two functions don’t mix unless you wrap the call towait_for()
into a task. ↩︎