Asyncio: Event Loop, Tasks, and Async Patterns¶
Modern async programming in Python.
Core Concepts¶
Event Loop¶
The event loop is the central execution mechanism for async code.
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# Run the event loop
asyncio.run(main())
What happens:
1. asyncio.run() creates an event loop
2. Schedules main() coroutine
3. await asyncio.sleep(1) suspends coroutine
4. Event loop can run other tasks
5. After 1 second, resumes main()
6. Loop exits when main() completes
Coroutines¶
Functions defined with async def that can be suspended.
async def fetch_data():
# This is a coroutine
await asyncio.sleep(1)
return "data"
# Calling a coroutine returns a coroutine object
coro = fetch_data() # Nothing runs yet!
# Must await to execute
result = await fetch_data() # Now it runs
Tasks¶
Wrap coroutines for concurrent execution.
async def main():
# Create tasks for concurrent execution
task1 = asyncio.create_task(fetch_data())
task2 = asyncio.create_task(fetch_data())
# Both run concurrently
result1 = await task1
result2 = await task2
Running Async Code¶
asyncio.run()¶
The main entry point.
import asyncio
async def main():
return "result"
# Standard way to run async code
result = asyncio.run(main())
From Sync to Async¶
# In a synchronous context
def sync_function():
# Run async code
result = asyncio.run(async_function())
return result
# In an existing event loop (e.g., Jupyter)
import nest_asyncio
nest_asyncio.apply()
asyncio.run(main()) # Now works in Jupyter
Getting the Event Loop¶
# Inside async function
async def inside_async():
loop = asyncio.get_running_loop()
# Outside async function (deprecated pattern)
loop = asyncio.get_event_loop() # Don't use in new code
Concurrent Execution¶
asyncio.gather()¶
Run multiple coroutines concurrently.
async def fetch(url):
await asyncio.sleep(1)
return f"Result from {url}"
async def main():
# All run concurrently
results = await asyncio.gather(
fetch("url1"),
fetch("url2"),
fetch("url3"),
)
# results = ["Result from url1", "Result from url2", "Result from url3"]
return results
Error Handling in gather¶
async def may_fail(x):
if x == 2:
raise ValueError("Failed!")
return x
async def main():
# With return_exceptions=True, exceptions are returned as results
results = await asyncio.gather(
may_fail(1),
may_fail(2),
may_fail(3),
return_exceptions=True
)
# results = [1, ValueError("Failed!"), 3]
for r in results:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(f"Success: {r}")
asyncio.wait()¶
More control over task completion.
async def main():
tasks = [
asyncio.create_task(fetch(url))
for url in urls
]
# Wait for first to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Wait with timeout
done, pending = await asyncio.wait(
tasks,
timeout=5.0
)
# Cancel pending tasks
for task in pending:
task.cancel()
as_completed()¶
Process results as they complete.
async def main():
tasks = [asyncio.create_task(fetch(url)) for url in urls]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Got: {result}")
Task Management¶
Creating Tasks¶
async def background_work():
while True:
await asyncio.sleep(60)
print("Background tick")
async def main():
# Create task (starts immediately)
task = asyncio.create_task(background_work())
# Do other work
await asyncio.sleep(10)
# Cancel task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
Task Groups (Python 3.11+)¶
Structured concurrency with automatic cleanup.
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch("url1"))
task2 = tg.create_task(fetch("url2"))
# All tasks complete when exiting context
# If any task fails, others are cancelled
print(task1.result())
print(task2.result())
Background Tasks Pattern¶
class BackgroundTasks:
def __init__(self):
self.tasks: set[asyncio.Task] = set()
def add(self, coro):
task = asyncio.create_task(coro)
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
async def shutdown(self):
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)
# Usage in FastAPI
bg_tasks = BackgroundTasks()
@app.on_event("shutdown")
async def shutdown():
await bg_tasks.shutdown()
Timeouts¶
asyncio.timeout() (Python 3.11+)¶
asyncio.wait_for()¶
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
print("Operation timed out")
timeout with Cleanup¶
async def main():
try:
async with asyncio.timeout(5.0):
async with aiohttp.ClientSession() as session:
return await session.get(url)
except asyncio.TimeoutError:
# Session is properly closed due to context manager
print("Request timed out")
Queues¶
Basic Queue¶
async def producer(queue: asyncio.Queue):
for i in range(10):
await queue.put(i)
print(f"Produced: {i}")
await queue.put(None) # Sentinel
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue),
consumer(queue),
)
Multiple Consumers¶
async def main():
queue = asyncio.Queue()
# Start workers
workers = [
asyncio.create_task(consumer(queue, f"Worker-{i}"))
for i in range(3)
]
# Add work
for i in range(20):
await queue.put(i)
# Signal workers to stop
for _ in workers:
await queue.put(None)
await asyncio.gather(*workers)
Synchronization¶
Lock¶
lock = asyncio.Lock()
async def critical_section():
async with lock:
# Only one coroutine at a time
await update_shared_resource()
Semaphore¶
# Limit concurrent operations
semaphore = asyncio.Semaphore(10)
async def limited_operation(url):
async with semaphore: # Max 10 concurrent
return await fetch(url)
async def main():
tasks = [limited_operation(url) for url in urls]
results = await asyncio.gather(*tasks)
Event¶
event = asyncio.Event()
async def waiter():
print("Waiting for event...")
await event.wait()
print("Event received!")
async def setter():
await asyncio.sleep(1)
event.set()
async def main():
await asyncio.gather(waiter(), setter())
Condition¶
condition = asyncio.Condition()
data = []
async def consumer():
async with condition:
while not data:
await condition.wait()
item = data.pop(0)
return item
async def producer(item):
async with condition:
data.append(item)
condition.notify()
Running Blocking Code¶
run_in_executor¶
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
# Synchronous I/O operation
with open("large_file.txt") as f:
return f.read()
def cpu_bound():
# CPU-intensive operation
return sum(i * i for i in range(10_000_000))
async def main():
loop = asyncio.get_running_loop()
# Run in thread pool (for I/O)
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
# Run in process pool (for CPU)
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound)
# Use default executor
result = await loop.run_in_executor(None, blocking_io)
to_thread (Python 3.9+)¶
async def main():
# Simpler syntax for thread pool
result = await asyncio.to_thread(blocking_function)
Streams¶
TCP Client¶
async def tcp_client():
reader, writer = await asyncio.open_connection('localhost', 8888)
writer.write(b'Hello, server!')
await writer.drain()
data = await reader.read(100)
print(f"Received: {data.decode()}")
writer.close()
await writer.wait_closed()
TCP Server¶
async def handle_client(reader, writer):
data = await reader.read(100)
message = data.decode()
print(f"Received: {message}")
writer.write(data)
await writer.drain()
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle_client, 'localhost', 8888)
async with server:
await server.serve_forever()
Debugging¶
Debug Mode¶
# Enable debug mode
asyncio.run(main(), debug=True)
# Or via environment variable
# PYTHONASYNCIODEBUG=1 python script.py
Common Issues¶
Coroutine never awaited:
# Wrong
async def main():
fetch_data() # Returns coroutine, doesn't run!
# Right
async def main():
await fetch_data()
Blocking the event loop:
# Wrong
async def main():
time.sleep(1) # Blocks entire loop!
# Right
async def main():
await asyncio.sleep(1) # Non-blocking
Using sync libraries:
# Wrong
async def main():
requests.get(url) # Blocks!
# Right
async def main():
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
Best Practices¶
- Use asyncio.run() — Proper event loop management
- Use TaskGroup — Python 3.11+ for structured concurrency
- Handle cancellation — Tasks can be cancelled anytime
- Use timeouts — Prevent hanging operations
- Limit concurrency — Use semaphores to avoid overwhelming resources
- Don't block the loop — Use
run_in_executorfor blocking calls - Close resources — Use async context managers
Common Patterns¶
Rate Limiting¶
class RateLimiter:
def __init__(self, rate: int, period: float = 1.0):
self.rate = rate
self.period = period
self.semaphore = asyncio.Semaphore(rate)
self.release_times: list[float] = []
async def acquire(self):
await self.semaphore.acquire()
asyncio.get_running_loop().call_later(
self.period,
self.semaphore.release
)
limiter = RateLimiter(10, 1.0) # 10 requests per second
async def rate_limited_fetch(url):
await limiter.acquire()
return await fetch(url)
Retry with Backoff¶
async def retry_with_backoff(coro_func, max_retries=3, base_delay=1.0):
for attempt in range(max_retries):
try:
return await coro_func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)