In the previous tutorial, we learned about testing with pytest. Now let’s learn about async/await — Python’s way to run multiple tasks at the same time without threads.

Async programming is one of the most powerful features in modern Python. It is used in web frameworks (FastAPI), HTTP clients (httpx), database drivers, and many other libraries. By the end of this tutorial, you will understand how async works and when to use it.

The Problem: Blocking Code

Imagine you need to fetch data from three different APIs. Each call takes 2 seconds:

import time

def fetch_data(name: str, delay: float) -> str:
    """Simulate a slow API call."""
    time.sleep(delay)
    return f"Data from {name}"

start = time.time()
result1 = fetch_data("API-1", 2.0)
result2 = fetch_data("API-2", 2.0)
result3 = fetch_data("API-3", 2.0)
elapsed = time.time() - start

print(f"Total time: {elapsed:.1f}s")  # Total time: 6.0s

The total time is 6 seconds because each call blocks the program. The second call waits for the first to finish. The third waits for the second.

But these calls are independent. There is no reason to wait. With async, you can run all three at the same time:

import asyncio
import time

async def fetch_data(name: str, delay: float) -> str:
    """Simulate a slow API call — non-blocking."""
    await asyncio.sleep(delay)
    return f"Data from {name}"

async def main():
    start = time.perf_counter()
    results = await asyncio.gather(
        fetch_data("API-1", 2.0),
        fetch_data("API-2", 2.0),
        fetch_data("API-3", 2.0),
    )
    elapsed = time.perf_counter() - start
    print(f"Total time: {elapsed:.1f}s")  # Total time: 2.0s

asyncio.run(main())

Same work, but only 2 seconds. That is the power of async.

How Async Works (Simple Explanation)

Think of a restaurant with one waiter. The waiter takes an order from table 1, then waits at the kitchen until the food is ready. Only then does the waiter go to table 2. This is synchronous (blocking).

A smart waiter takes the order from table 1, sends it to the kitchen, and immediately goes to table 2 while the kitchen cooks. The waiter checks back when the food is ready. This is asynchronous (non-blocking).

In Python:

  • async def creates an async function (called a coroutine)
  • await pauses the current function and lets other tasks run
  • asyncio.run() starts the event loop — the “restaurant manager” that coordinates everything

The key idea: while one task waits for I/O (network, disk, database), other tasks can run.

async def and await

An async def function is a coroutine. You cannot call it like a normal function — you must await it or pass it to asyncio.run():

import asyncio

async def greet(name: str) -> str:
    """A simple async function."""
    await asyncio.sleep(0.1)  # Simulate some async work
    return f"Hello, {name}!"

# WRONG — this returns a coroutine object, not the result
# result = greet("Alex")  # <coroutine object greet at 0x...>

# RIGHT — use asyncio.run() to execute from sync code
result = asyncio.run(greet("Alex"))
print(result)  # Hello, Alex!

Inside an async function, you can await other async functions:

async def get_full_greeting(name: str) -> str:
    """Call another async function."""
    greeting = await greet(name)
    return f"{greeting} Welcome!"

result = asyncio.run(get_full_greeting("Alex"))
print(result)  # Hello, Alex! Welcome!

Important: You can only use await inside an async def function. Using it in a regular function causes a SyntaxError.

asyncio.run() — The Entry Point

asyncio.run() is how you start async code from regular (synchronous) Python:

import asyncio

async def main():
    print("Starting...")
    await asyncio.sleep(1)
    print("Done!")

# This is the entry point — call it once at the top level
asyncio.run(main())

Rules for asyncio.run():

  • Call it once at the top level of your program
  • Do not call it inside another async function — use await instead
  • It creates an event loop, runs the coroutine, and cleans up

asyncio.gather() — Run Tasks Concurrently

asyncio.gather() runs multiple coroutines at the same time and collects all the results:

import asyncio

async def process_item(item: str) -> str:
    """Process a single item with simulated delay."""
    await asyncio.sleep(0.1)
    return f"Processed: {item}"

async def main():
    items = ["item_1", "item_2", "item_3", "item_4"]
    tasks = [process_item(item) for item in items]
    results = await asyncio.gather(*tasks)

    for result in results:
        print(result)

asyncio.run(main())

Output:

Processed: item_1
Processed: item_2
Processed: item_3
Processed: item_4

All four items are processed at the same time. The total time is ~0.1 seconds, not ~0.4 seconds.

Note: asyncio.gather() returns results in the same order as the input — even if tasks finish in different order.

asyncio.create_task() — Fire and Collect Later

asyncio.create_task() starts a task immediately without waiting for it. You collect the result later:

import asyncio
from typing import Any

async def download_file(filename: str, size_mb: int) -> dict[str, Any]:
    """Simulate downloading a file."""
    await asyncio.sleep(size_mb * 0.01)
    return {"filename": filename, "size_mb": size_mb, "status": "done"}

async def main():
    # Start all downloads immediately
    task1 = asyncio.create_task(download_file("report.pdf", 5))
    task2 = asyncio.create_task(download_file("photo.jpg", 2))
    task3 = asyncio.create_task(download_file("video.mp4", 50))

    # Do other work while files download
    print("Downloads started, doing other work...")

    # Collect results when you need them
    result1 = await task1
    result2 = await task2
    result3 = await task3

    print(f"{result1['filename']}: {result1['status']}")
    print(f"{result2['filename']}: {result2['status']}")
    print(f"{result3['filename']}: {result3['status']}")

asyncio.run(main())

When to use create_task vs gather:

  • Use gather() when you have a list of tasks and want all results at once
  • Use create_task() when you want to start a task and do other work before collecting the result

Error Handling in Async Code

Try/Except in Async Functions

Error handling works the same way as in sync code:

import asyncio

async def risky_fetch(url: str) -> str:
    """A fetch that might fail."""
    await asyncio.sleep(0.05)
    if "bad" in url:
        raise ConnectionError(f"Failed to connect to {url}")
    return f"Response from {url}"

async def safe_fetch(url: str) -> str | None:
    """Fetch with error handling."""
    try:
        return await risky_fetch(url)
    except ConnectionError as e:
        print(f"Error: {e}")
        return None

result = asyncio.run(safe_fetch("https://bad.example.com"))
print(result)  # None

gather with return_exceptions

By default, asyncio.gather() stops on the first error. Use return_exceptions=True to collect all results including errors:

import asyncio

async def main():
    urls = ["https://good.com", "https://bad.com", "https://ok.com"]
    tasks = [risky_fetch(url) for url in urls]

    # Without return_exceptions — raises on first error
    # results = await asyncio.gather(*tasks)  # Raises ConnectionError!

    # With return_exceptions — collects errors as values
    results = await asyncio.gather(*tasks, return_exceptions=True)

    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            print(f"{url}: FAILED — {result}")
        else:
            print(f"{url}: {result}")

asyncio.run(main())

Output:

https://good.com: Response from https://good.com
https://bad.com: FAILED — Failed to connect to https://bad.com
https://ok.com: Response from https://ok.com

This is useful when you want to continue even if some tasks fail.

async for — Async Iterators

Sometimes you need to iterate over data that arrives asynchronously (like streaming from an API or reading from a database cursor). Use async for:

import asyncio

class AsyncCounter:
    """An async iterator that counts with a delay."""

    def __init__(self, start: int, stop: int) -> None:
        self.current = start
        self.stop = stop

    def __aiter__(self):
        return self

    async def __anext__(self) -> int:
        if self.current >= self.stop:
            raise StopAsyncIteration
        value = self.current
        self.current += 1
        await asyncio.sleep(0.01)
        return value

async def main():
    total = 0
    async for value in AsyncCounter(1, 6):
        total += value
        print(f"Got: {value}")
    print(f"Total: {total}")  # Total: 15

asyncio.run(main())

An async iterator implements __aiter__ and __anext__ (the async versions of __iter__ and __next__). You will encounter async iterators when working with database cursors, streaming HTTP responses, and message queues.

async with — Async Context Managers

Some resources need async setup and cleanup. Use async with for those:

import asyncio

class AsyncConnection:
    """Simulate an async database connection."""

    def __init__(self, name: str) -> None:
        self.name = name
        self.connected = False

    async def __aenter__(self):
        await asyncio.sleep(0.01)  # Simulate connection time
        self.connected = True
        print(f"Connected to {self.name}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await asyncio.sleep(0.01)  # Simulate cleanup
        self.connected = False
        print(f"Disconnected from {self.name}")
        return False

    async def execute(self, query: str) -> str:
        if not self.connected:
            raise RuntimeError("Not connected")
        await asyncio.sleep(0.01)
        return f"Result of: {query}"

async def main():
    async with AsyncConnection("my_database") as conn:
        result = await conn.execute("SELECT * FROM users")
        print(result)
    # Connection is automatically closed here

asyncio.run(main())

Output:

Connected to my_database
Result of: SELECT * FROM users
Disconnected from my_database

This is exactly like the regular with statement from Tutorial #15, but with async support. You will see async with everywhere in async database libraries, HTTP client sessions, and file operations.

Timeouts with asyncio.wait_for()

What if a task takes too long? Use asyncio.wait_for() to set a timeout:

import asyncio

async def slow_operation() -> str:
    """An operation that takes 10 seconds."""
    await asyncio.sleep(10.0)
    return "Finally done!"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out!")

asyncio.run(main())
# Output: Operation timed out!

Always set timeouts on network operations. Without a timeout, a stuck connection can hang your entire program.

Rate Limiting with Semaphore

When making many concurrent requests, you might overwhelm a server. Use asyncio.Semaphore to limit how many tasks run at the same time:

import asyncio

async def fetch_url(url: str, semaphore: asyncio.Semaphore) -> str:
    """Fetch a URL with rate limiting."""
    async with semaphore:
        # Only N tasks can be inside this block at once
        await asyncio.sleep(0.05)  # Simulate network request
        return f"Response from {url}"

async def main():
    urls = [f"https://api.example.com/page/{i}" for i in range(10)]

    # Allow max 3 concurrent requests
    semaphore = asyncio.Semaphore(3)

    tasks = [fetch_url(url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks)

    print(f"Fetched {len(results)} pages")

asyncio.run(main())

The semaphore acts like a bouncer at a club — only 3 tasks can enter the “fetch” section at a time. When one leaves, the next one enters. This prevents you from sending 1000 requests at once and getting blocked by the server.

Async HTTP with httpx

In real projects, you will use the httpx library for async HTTP requests. Here is a preview (we will cover httpx in detail in Tutorial #19):

import asyncio
import httpx

async def fetch_todos():
    """Fetch data from an API using httpx."""
    async with httpx.AsyncClient() as client:
        response = await client.get(
            "https://jsonplaceholder.typicode.com/todos/1"
        )
        return response.json()

result = asyncio.run(fetch_todos())
print(result)
# {'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False}

The httpx.AsyncClient is an async context manager (uses async with). It manages connection pooling and cleanup automatically.

When to Use Async vs Threading vs Multiprocessing

This is a common question. Here is a simple decision guide:

Use async when:

  • You have I/O-bound tasks (HTTP requests, database queries, file operations)
  • The libraries you use support async (httpx, aiofiles, asyncpg)
  • You need to handle many connections (web servers, chat apps)

Use threading when:

  • You have I/O-bound tasks but the libraries only support sync
  • You need to run background tasks alongside a GUI
  • Example: concurrent.futures.ThreadPoolExecutor

Use multiprocessing when:

  • You have CPU-bound tasks (math calculations, image processing, data crunching)
  • You need to use multiple CPU cores
  • Example: concurrent.futures.ProcessPoolExecutor

Quick rule: If your code waits a lot (for network, disk, database), use async. If your code computes a lot (math, encoding, parsing), use multiprocessing.

Common Mistakes

1. Calling an Async Function Without await

# WRONG — returns a coroutine object, not the result
async def main():
    result = greet("Alex")  # Missing await!
    print(result)  # <coroutine object greet at 0x...>

# RIGHT
async def main():
    result = await greet("Alex")
    print(result)  # Hello, Alex!

Python will show a warning: RuntimeWarning: coroutine 'greet' was never awaited.

2. Blocking the Event Loop

Never use time.sleep() in async code — it blocks the entire event loop:

# WRONG — blocks everything for 5 seconds
async def bad_example():
    time.sleep(5)  # Blocks the event loop!

# RIGHT — lets other tasks run during the wait
async def good_example():
    await asyncio.sleep(5)  # Non-blocking

Same applies to synchronous HTTP calls, file operations, and database queries. Always use the async version.

3. Using asyncio.run() Inside Async Code

# WRONG — cannot call asyncio.run() from within async code
async def bad_example():
    result = asyncio.run(greet("Alex"))  # RuntimeError!

# RIGHT — use await
async def good_example():
    result = await greet("Alex")

asyncio.run() is only for the top level of your program.

Source Code

You can find all the code from this tutorial on GitHub:

GitHub: python-tutorial/tutorial-18-async

What’s Next?

In the next tutorial, we will learn about HTTP requests and APIs — how to call REST APIs, handle JSON responses, and build API clients using httpx (both sync and async).