Skip to content

Asyncio: Event Loop, Tasks, and Async Patterns

Modern async programming in Python.

Core Concepts

Event Loop

The event loop is the central execution mechanism for async code.

import asyncio

async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# Run the event loop
asyncio.run(main())

What happens: 1. asyncio.run() creates an event loop 2. Schedules main() coroutine 3. await asyncio.sleep(1) suspends coroutine 4. Event loop can run other tasks 5. After 1 second, resumes main() 6. Loop exits when main() completes

Coroutines

Functions defined with async def that can be suspended.

async def fetch_data():
    # This is a coroutine
    await asyncio.sleep(1)
    return "data"

# Calling a coroutine returns a coroutine object
coro = fetch_data()  # Nothing runs yet!

# Must await to execute
result = await fetch_data()  # Now it runs

Tasks

Wrap coroutines for concurrent execution.

async def main():
    # Create tasks for concurrent execution
    task1 = asyncio.create_task(fetch_data())
    task2 = asyncio.create_task(fetch_data())

    # Both run concurrently
    result1 = await task1
    result2 = await task2

Running Async Code

asyncio.run()

The main entry point.

import asyncio

async def main():
    return "result"

# Standard way to run async code
result = asyncio.run(main())

From Sync to Async

# In a synchronous context
def sync_function():
    # Run async code
    result = asyncio.run(async_function())
    return result

# In an existing event loop (e.g., Jupyter)
import nest_asyncio
nest_asyncio.apply()
asyncio.run(main())  # Now works in Jupyter

Getting the Event Loop

# Inside async function
async def inside_async():
    loop = asyncio.get_running_loop()

# Outside async function (deprecated pattern)
loop = asyncio.get_event_loop()  # Don't use in new code

Concurrent Execution

asyncio.gather()

Run multiple coroutines concurrently.

async def fetch(url):
    await asyncio.sleep(1)
    return f"Result from {url}"

async def main():
    # All run concurrently
    results = await asyncio.gather(
        fetch("url1"),
        fetch("url2"),
        fetch("url3"),
    )
    # results = ["Result from url1", "Result from url2", "Result from url3"]
    return results

Error Handling in gather

async def may_fail(x):
    if x == 2:
        raise ValueError("Failed!")
    return x

async def main():
    # With return_exceptions=True, exceptions are returned as results
    results = await asyncio.gather(
        may_fail(1),
        may_fail(2),
        may_fail(3),
        return_exceptions=True
    )
    # results = [1, ValueError("Failed!"), 3]

    for r in results:
        if isinstance(r, Exception):
            print(f"Error: {r}")
        else:
            print(f"Success: {r}")

asyncio.wait()

More control over task completion.

async def main():
    tasks = [
        asyncio.create_task(fetch(url))
        for url in urls
    ]

    # Wait for first to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    # Wait with timeout
    done, pending = await asyncio.wait(
        tasks,
        timeout=5.0
    )

    # Cancel pending tasks
    for task in pending:
        task.cancel()

as_completed()

Process results as they complete.

async def main():
    tasks = [asyncio.create_task(fetch(url)) for url in urls]

    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Got: {result}")

Task Management

Creating Tasks

async def background_work():
    while True:
        await asyncio.sleep(60)
        print("Background tick")

async def main():
    # Create task (starts immediately)
    task = asyncio.create_task(background_work())

    # Do other work
    await asyncio.sleep(10)

    # Cancel task
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

Task Groups (Python 3.11+)

Structured concurrency with automatic cleanup.

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch("url1"))
        task2 = tg.create_task(fetch("url2"))

    # All tasks complete when exiting context
    # If any task fails, others are cancelled

    print(task1.result())
    print(task2.result())

Background Tasks Pattern

class BackgroundTasks:
    def __init__(self):
        self.tasks: set[asyncio.Task] = set()

    def add(self, coro):
        task = asyncio.create_task(coro)
        self.tasks.add(task)
        task.add_done_callback(self.tasks.discard)

    async def shutdown(self):
        for task in self.tasks:
            task.cancel()
        await asyncio.gather(*self.tasks, return_exceptions=True)

# Usage in FastAPI
bg_tasks = BackgroundTasks()

@app.on_event("shutdown")
async def shutdown():
    await bg_tasks.shutdown()

Timeouts

asyncio.timeout() (Python 3.11+)

async def main():
    async with asyncio.timeout(5.0):
        result = await slow_operation()

asyncio.wait_for()

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=5.0)
    except asyncio.TimeoutError:
        print("Operation timed out")

timeout with Cleanup

async def main():
    try:
        async with asyncio.timeout(5.0):
            async with aiohttp.ClientSession() as session:
                return await session.get(url)
    except asyncio.TimeoutError:
        # Session is properly closed due to context manager
        print("Request timed out")

Queues

Basic Queue

async def producer(queue: asyncio.Queue):
    for i in range(10):
        await queue.put(i)
        print(f"Produced: {i}")
    await queue.put(None)  # Sentinel

async def consumer(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Consumed: {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)

    await asyncio.gather(
        producer(queue),
        consumer(queue),
    )

Multiple Consumers

async def main():
    queue = asyncio.Queue()

    # Start workers
    workers = [
        asyncio.create_task(consumer(queue, f"Worker-{i}"))
        for i in range(3)
    ]

    # Add work
    for i in range(20):
        await queue.put(i)

    # Signal workers to stop
    for _ in workers:
        await queue.put(None)

    await asyncio.gather(*workers)

Synchronization

Lock

lock = asyncio.Lock()

async def critical_section():
    async with lock:
        # Only one coroutine at a time
        await update_shared_resource()

Semaphore

# Limit concurrent operations
semaphore = asyncio.Semaphore(10)

async def limited_operation(url):
    async with semaphore:  # Max 10 concurrent
        return await fetch(url)

async def main():
    tasks = [limited_operation(url) for url in urls]
    results = await asyncio.gather(*tasks)

Event

event = asyncio.Event()

async def waiter():
    print("Waiting for event...")
    await event.wait()
    print("Event received!")

async def setter():
    await asyncio.sleep(1)
    event.set()

async def main():
    await asyncio.gather(waiter(), setter())

Condition

condition = asyncio.Condition()
data = []

async def consumer():
    async with condition:
        while not data:
            await condition.wait()
        item = data.pop(0)
        return item

async def producer(item):
    async with condition:
        data.append(item)
        condition.notify()

Running Blocking Code

run_in_executor

import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    # Synchronous I/O operation
    with open("large_file.txt") as f:
        return f.read()

def cpu_bound():
    # CPU-intensive operation
    return sum(i * i for i in range(10_000_000))

async def main():
    loop = asyncio.get_running_loop()

    # Run in thread pool (for I/O)
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_io)

    # Run in process pool (for CPU)
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound)

    # Use default executor
    result = await loop.run_in_executor(None, blocking_io)

to_thread (Python 3.9+)

async def main():
    # Simpler syntax for thread pool
    result = await asyncio.to_thread(blocking_function)

Streams

TCP Client

async def tcp_client():
    reader, writer = await asyncio.open_connection('localhost', 8888)

    writer.write(b'Hello, server!')
    await writer.drain()

    data = await reader.read(100)
    print(f"Received: {data.decode()}")

    writer.close()
    await writer.wait_closed()

TCP Server

async def handle_client(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    print(f"Received: {message}")

    writer.write(data)
    await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle_client, 'localhost', 8888)

    async with server:
        await server.serve_forever()

Debugging

Debug Mode

# Enable debug mode
asyncio.run(main(), debug=True)

# Or via environment variable
# PYTHONASYNCIODEBUG=1 python script.py

Common Issues

Coroutine never awaited:

# Wrong
async def main():
    fetch_data()  # Returns coroutine, doesn't run!

# Right
async def main():
    await fetch_data()

Blocking the event loop:

# Wrong
async def main():
    time.sleep(1)  # Blocks entire loop!

# Right
async def main():
    await asyncio.sleep(1)  # Non-blocking

Using sync libraries:

# Wrong
async def main():
    requests.get(url)  # Blocks!

# Right
async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

Best Practices

  1. Use asyncio.run() — Proper event loop management
  2. Use TaskGroup — Python 3.11+ for structured concurrency
  3. Handle cancellation — Tasks can be cancelled anytime
  4. Use timeouts — Prevent hanging operations
  5. Limit concurrency — Use semaphores to avoid overwhelming resources
  6. Don't block the loop — Use run_in_executor for blocking calls
  7. Close resources — Use async context managers

Common Patterns

Rate Limiting

class RateLimiter:
    def __init__(self, rate: int, period: float = 1.0):
        self.rate = rate
        self.period = period
        self.semaphore = asyncio.Semaphore(rate)
        self.release_times: list[float] = []

    async def acquire(self):
        await self.semaphore.acquire()
        asyncio.get_running_loop().call_later(
            self.period,
            self.semaphore.release
        )

limiter = RateLimiter(10, 1.0)  # 10 requests per second

async def rate_limited_fetch(url):
    await limiter.acquire()
    return await fetch(url)

Retry with Backoff

async def retry_with_backoff(coro_func, max_retries=3, base_delay=1.0):
    for attempt in range(max_retries):
        try:
            return await coro_func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt)
            await asyncio.sleep(delay)

Graceful Shutdown

async def main():
    tasks = set()

    async def shutdown():
        for task in tasks:
            task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)

    try:
        # Run until interrupted
        await asyncio.gather(*[create_worker() for _ in range(5)])
    except asyncio.CancelledError:
        await shutdown()