Python Concurrency¶
Python-specific approaches to concurrent programming.
Overview¶
Python offers three main concurrency models:
| Model | Module | Best For | Parallelism |
|---|---|---|---|
| Threading | threading |
I/O-bound tasks | Limited by GIL |
| Multiprocessing | multiprocessing |
CPU-bound tasks | True parallelism |
| Async/Await | asyncio |
High-concurrency I/O | Single-threaded |
The Big Picture¶
# I/O-bound: Use asyncio
async def fetch_users():
async with aiohttp.ClientSession() as session:
users = await asyncio.gather(
fetch_user(session, 1),
fetch_user(session, 2),
fetch_user(session, 3),
)
return users
# CPU-bound: Use multiprocessing
def process_images(paths):
with ProcessPoolExecutor() as executor:
results = executor.map(process_image, paths)
return list(results)
# Mixed: Combine both
async def process_batch(items):
# Fetch data concurrently
data = await fetch_all_data(items)
# Process CPU-intensive work in process pool
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
results = await loop.run_in_executor(pool, process_data, data)
return results
Quick Decision Guide¶
Is it CPU-intensive?
├── Yes → multiprocessing (ProcessPoolExecutor)
└── No → Is it I/O-bound?
├── Yes → asyncio (async/await)
└── No → Regular synchronous code is fine
Topics¶
| Topic | Description |
|---|---|
| The GIL Explained | What it is, when it matters |
| Threading | Thread-based concurrency |
| Multiprocessing | Process-based parallelism |
| Asyncio Deep Dive | Async/await in depth |
| Choosing the Right Tool | Decision framework |
Common Patterns¶
Concurrent I/O Requests¶
import asyncio
import aiohttp
async def fetch_json(session, url):
async with session.get(url) as response:
return await response.json()
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_json(session, url) for url in urls]
return await asyncio.gather(*tasks)
# Usage
results = asyncio.run(fetch_all([
"https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments",
]))
Parallel CPU Work¶
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
def cpu_intensive(data):
# Heavy computation
return sum(x * x for x in data)
def process_parallel(datasets):
workers = multiprocessing.cpu_count()
with ProcessPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(cpu_intensive, datasets))
return results
Background Tasks¶
import asyncio
from typing import Callable, Awaitable
class BackgroundTasks:
def __init__(self):
self.tasks: set[asyncio.Task] = set()
def add(self, coro: Awaitable):
task = asyncio.create_task(coro)
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
# Usage
bg_tasks = BackgroundTasks()
bg_tasks.add(send_email(user.email, "Welcome!"))
bg_tasks.add(log_signup(user.id))
# Tasks run in background, don't block response
Producer-Consumer Queue¶
import asyncio
async def producer(queue: asyncio.Queue, items):
for item in items:
await queue.put(item)
print(f"Produced: {item}")
await queue.put(None) # Sentinel to stop consumers
async def consumer(queue: asyncio.Queue, name: str):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"{name} consumed: {item}")
await asyncio.sleep(0.1) # Simulate work
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# Start producer and consumers
await asyncio.gather(
producer(queue, range(20)),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
)
asyncio.run(main())
Thread Safety in Python¶
Thread-Safe Types¶
from queue import Queue # Thread-safe
from collections import deque # NOT thread-safe for all ops
# Safe
q = Queue()
q.put(item) # Atomic
q.get() # Atomic
# Also safe (due to GIL, but be careful)
my_list.append(item) # Atomic
my_dict[key] = value # Atomic
# NOT safe
my_list[0] = item # Compound operation
Using Locks¶
import threading
class Counter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()
def increment(self):
with self.lock:
self.value += 1
def get(self):
with self.lock:
return self.value
Performance Comparison¶
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def io_bound_task():
time.sleep(0.1) # Simulate I/O
return "done"
def cpu_bound_task(n):
return sum(i * i for i in range(n))
# Sequential
start = time.time()
[io_bound_task() for _ in range(10)]
print(f"Sequential: {time.time() - start:.2f}s") # ~1.0s
# Threaded
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
list(executor.map(lambda _: io_bound_task(), range(10)))
print(f"Threaded: {time.time() - start:.2f}s") # ~0.1s
# Async
async def async_io():
await asyncio.sleep(0.1)
return "done"
start = time.time()
asyncio.run(asyncio.gather(*[async_io() for _ in range(10)]))
print(f"Async: {time.time() - start:.2f}s") # ~0.1s
Best Practices¶
- Use asyncio for I/O — Network, database, file operations
- Use multiprocessing for CPU — Computation, data processing
- Avoid shared state — Use queues, message passing
- Keep the GIL in mind — Threading doesn't parallelize CPU work
- Profile before optimizing — Measure actual bottlenecks
- Use high-level abstractions —
concurrent.futures,asyncio.gather
Related Documentation¶
- Asyncio Deep Dive — Detailed async patterns
- Performance & Profiling — Measuring performance
- Backend Standards — Async patterns in FastAPI