Concurrency Cookbook¶
Ready-to-use recipes for common concurrent programming tasks.
When to Use What¶
| Scenario | Recommended Approach | Why |
|---|---|---|
| I/O-bound tasks (HTTP calls, DB queries, file I/O) | asyncio |
Non-blocking, single-threaded, most efficient for I/O waits |
| CPU-bound tasks (image processing, data crunching) | ProcessPoolExecutor / multiprocessing |
Bypasses the GIL with separate processes |
| Legacy/C-extension code that releases the GIL | threading |
Simpler than multiprocessing when GIL isn't the bottleneck |
| Mixed I/O + CPU in the same app | asyncio + ProcessPoolExecutor |
Use asyncio for orchestration, offload CPU work to processes |
Rule of Thumb
Start with asyncio. If profiling shows CPU is the bottleneck, add ProcessPoolExecutor. Use threading only for legacy compatibility or C extensions that release the GIL.
Python Recipes¶
Parallel HTTP Requests (Beginner)¶
import asyncio
import aiohttp
async def fetch_all(urls: list[str]) -> list[dict]:
"""Fetch multiple URLs concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [fetch_json(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
async def fetch_json(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
# Usage
results = asyncio.run(fetch_all([
"https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments",
]))
Rate-Limited Requests (Intermediate)¶
import asyncio
import aiohttp
class RateLimitedClient:
def __init__(self, rate: int = 10, period: float = 1.0):
self.semaphore = asyncio.Semaphore(rate)
self.period = period
async def fetch(self, session: aiohttp.ClientSession, url: str) -> dict:
async with self.semaphore:
async with session.get(url) as response:
result = await response.json()
await asyncio.sleep(self.period / 10) # Spread requests
return result
async def fetch_with_rate_limit(urls: list[str], rate: int = 10):
client = RateLimitedClient(rate=rate)
async with aiohttp.ClientSession() as session:
tasks = [client.fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
Parallel File Processing (Beginner)¶
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
from pathlib import Path
def process_file(path: Path) -> dict:
"""Process a single file (CPU-intensive)."""
content = path.read_text()
# Heavy processing...
return {"path": str(path), "lines": len(content.splitlines())}
def process_files_parallel(paths: list[Path]) -> list[dict]:
"""Process multiple files in parallel."""
workers = multiprocessing.cpu_count()
with ProcessPoolExecutor(max_workers=workers) as executor:
return list(executor.map(process_file, paths))
# Usage
paths = list(Path("data").glob("*.txt"))
results = process_files_parallel(paths)
Async Database Queries (Beginner)¶
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
async def fetch_user_data(db: AsyncSession, user_id: int) -> dict:
"""Fetch user with related data concurrently."""
user_query = select(User).where(User.id == user_id)
posts_query = select(Post).where(Post.user_id == user_id)
comments_query = select(Comment).where(Comment.user_id == user_id)
user_result, posts_result, comments_result = await asyncio.gather(
db.execute(user_query),
db.execute(posts_query),
db.execute(comments_query),
)
return {
"user": user_result.scalar_one(),
"posts": posts_result.scalars().all(),
"comments": comments_result.scalars().all(),
}
Timeout Wrapper (Beginner)¶
import asyncio
from typing import TypeVar, Awaitable
T = TypeVar('T')
async def with_timeout(
coro: Awaitable[T],
timeout: float,
default: T = None
) -> T:
"""Execute coroutine with timeout, return default on timeout."""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
return default
# Usage
result = await with_timeout(slow_operation(), timeout=5.0, default={})
Retry with Exponential Backoff (Intermediate)¶
import asyncio
import random
from typing import TypeVar, Callable, Awaitable
T = TypeVar('T')
async def retry_with_backoff(
fn: Callable[[], Awaitable[T]],
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True,
) -> T:
"""Retry async function with exponential backoff."""
for attempt in range(max_retries):
try:
return await fn()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay *= (0.5 + random.random())
await asyncio.sleep(delay)
# Usage
result = await retry_with_backoff(
lambda: fetch_data(url),
max_retries=5,
base_delay=1.0,
)
Batch Processing (Intermediate)¶
import asyncio
from typing import TypeVar, List, Callable, Awaitable
T = TypeVar('T')
R = TypeVar('R')
async def process_in_batches(
items: List[T],
batch_size: int,
process_fn: Callable[[T], Awaitable[R]],
max_concurrent: int = 10,
) -> List[R]:
"""Process items in batches with limited concurrency."""
semaphore = asyncio.Semaphore(max_concurrent)
results = []
async def process_with_semaphore(item: T) -> R:
async with semaphore:
return await process_fn(item)
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = await asyncio.gather(
*[process_with_semaphore(item) for item in batch]
)
results.extend(batch_results)
return results
# Usage
results = await process_in_batches(
items=urls,
batch_size=100,
process_fn=fetch_url,
max_concurrent=10,
)
Background Task Manager (Intermediate)¶
import asyncio
from typing import Coroutine, Any
class BackgroundTaskManager:
def __init__(self):
self.tasks: set[asyncio.Task] = set()
def create_task(self, coro: Coroutine[Any, Any, Any]) -> asyncio.Task:
"""Create and track a background task."""
task = asyncio.create_task(coro)
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
return task
async def shutdown(self, timeout: float = 5.0):
"""Cancel all tasks and wait for completion."""
for task in self.tasks:
task.cancel()
if self.tasks:
await asyncio.wait(self.tasks, timeout=timeout)
# Usage in FastAPI
from fastapi import FastAPI
app = FastAPI()
bg_tasks = BackgroundTaskManager()
@app.on_event("shutdown")
async def shutdown():
await bg_tasks.shutdown()
@app.post("/send-email")
async def send_email(email: str):
bg_tasks.create_task(send_email_async(email))
return {"status": "queued"}
JavaScript Recipes¶
Parallel Fetches with Limit (Intermediate)¶
async function fetchAllWithLimit<T>(
urls: string[],
limit: number = 5
): Promise<T[]> {
const results: T[] = [];
const executing: Promise<void>[] = [];
for (const url of urls) {
const promise = fetch(url)
.then(r => r.json())
.then(data => {
results.push(data);
});
executing.push(promise);
if (executing.length >= limit) {
await Promise.race(executing);
executing.splice(
executing.findIndex(p => p === promise),
1
);
}
}
await Promise.all(executing);
return results;
}
Debounced Async Function (Advanced)¶
function debounceAsync<T extends (...args: any[]) => Promise<any>>(
fn: T,
delay: number
): (...args: Parameters<T>) => Promise<ReturnType<T>> {
let timeoutId: NodeJS.Timeout | null = null;
let pendingPromise: Promise<ReturnType<T>> | null = null;
let pendingResolve: ((value: ReturnType<T>) => void) | null = null;
return (...args: Parameters<T>): Promise<ReturnType<T>> => {
if (timeoutId) {
clearTimeout(timeoutId);
}
if (!pendingPromise) {
pendingPromise = new Promise(resolve => {
pendingResolve = resolve;
});
}
timeoutId = setTimeout(async () => {
const result = await fn(...args);
pendingResolve!(result);
pendingPromise = null;
pendingResolve = null;
}, delay);
return pendingPromise;
};
}
// Usage
const debouncedSearch = debounceAsync(searchAPI, 300);
Sequential Promise Execution (Beginner)¶
async function executeSequentially<T, R>(
items: T[],
fn: (item: T) => Promise<R>
): Promise<R[]> {
const results: R[] = [];
for (const item of items) {
const result = await fn(item);
results.push(result);
}
return results;
}
// With reduce
async function executeSequentially2<T, R>(
items: T[],
fn: (item: T) => Promise<R>
): Promise<R[]> {
return items.reduce(async (accPromise, item) => {
const acc = await accPromise;
const result = await fn(item);
return [...acc, result];
}, Promise.resolve([] as R[]));
}
Promise Pool (Intermediate)¶
class PromisePool {
private running = 0;
private queue: (() => void)[] = [];
constructor(private concurrency: number) {}
async add<T>(fn: () => Promise<T>): Promise<T> {
if (this.running >= this.concurrency) {
await new Promise<void>(resolve => this.queue.push(resolve));
}
this.running++;
try {
return await fn();
} finally {
this.running--;
const next = this.queue.shift();
if (next) next();
}
}
}
// Usage
const pool = new PromisePool(5);
const results = await Promise.all(
urls.map(url => pool.add(() => fetch(url).then(r => r.json())))
);
Cached Async Function (Intermediate)¶
function cacheAsync<T extends (...args: any[]) => Promise<any>>(
fn: T,
keyFn: (...args: Parameters<T>) => string = (...args) => JSON.stringify(args),
ttl: number = 60000
): T {
const cache = new Map<string, { value: ReturnType<T>; expires: number }>();
return (async (...args: Parameters<T>): Promise<ReturnType<T>> => {
const key = keyFn(...args);
const now = Date.now();
const cached = cache.get(key);
if (cached && cached.expires > now) {
return cached.value;
}
const value = await fn(...args);
cache.set(key, { value, expires: now + ttl });
return value;
}) as T;
}
// Usage
const cachedFetch = cacheAsync(
(url: string) => fetch(url).then(r => r.json()),
(url) => url,
5 * 60 * 1000 // 5 minutes
);
Abort Controller Wrapper (Beginner)¶
function withAbort<T>(
fn: (signal: AbortSignal) => Promise<T>,
timeout?: number
): { promise: Promise<T>; abort: () => void } {
const controller = new AbortController();
let timeoutId: NodeJS.Timeout | undefined;
if (timeout) {
timeoutId = setTimeout(() => controller.abort(), timeout);
}
const promise = fn(controller.signal).finally(() => {
if (timeoutId) clearTimeout(timeoutId);
});
return {
promise,
abort: () => controller.abort(),
};
}
// Usage
const { promise, abort } = withAbort(
(signal) => fetch('/api/data', { signal }).then(r => r.json()),
5000
);
// Can call abort() to cancel
Mixed Language Patterns¶
API with Concurrent Backend Calls (Python → JS) (Beginner)¶
# Python backend
from fastapi import FastAPI
import asyncio
import aiohttp
app = FastAPI()
@app.get("/dashboard/{user_id}")
async def get_dashboard(user_id: int):
# Fetch all data concurrently
async with aiohttp.ClientSession() as session:
user, metrics, notifications = await asyncio.gather(
fetch_user(session, user_id),
fetch_metrics(session, user_id),
fetch_notifications(session, user_id),
)
return {
"user": user,
"metrics": metrics,
"notifications": notifications,
}
// JavaScript frontend
async function loadDashboard(userId: number) {
// Single request to backend that does concurrent fetching
const response = await fetch(`/api/dashboard/${userId}`);
return response.json();
}
Parallel Processing with Progress (Intermediate)¶
import asyncio
from typing import Callable, AsyncIterator, TypeVar
T = TypeVar('T')
R = TypeVar('R')
async def process_with_progress(
items: list[T],
process_fn: Callable[[T], R],
on_progress: Callable[[int, int], None],
) -> list[R]:
"""Process items with progress callback."""
total = len(items)
completed = 0
results = []
semaphore = asyncio.Semaphore(10)
async def process_one(item: T) -> R:
nonlocal completed
async with semaphore:
result = await process_fn(item)
completed += 1
on_progress(completed, total)
return result
tasks = [process_one(item) for item in items]
return await asyncio.gather(*tasks)
# Usage
await process_with_progress(
items=urls,
process_fn=fetch_url,
on_progress=lambda done, total: print(f"{done}/{total}"),
)
// JavaScript with streaming progress
async function processWithProgress<T, R>(
items: T[],
processFn: (item: T) => Promise<R>,
onProgress: (completed: number, total: number) => void
): Promise<R[]> {
const total = items.length;
let completed = 0;
const results: R[] = [];
const executing: Promise<void>[] = [];
const limit = 10;
for (const item of items) {
const promise = processFn(item).then(result => {
results.push(result);
completed++;
onProgress(completed, total);
});
executing.push(promise);
if (executing.length >= limit) {
await Promise.race(executing);
}
}
await Promise.all(executing);
return results;
}
See Also¶
- Concurrency Deep Dive -- Fundamentals, GIL, asyncio internals, and advanced concurrency patterns
- Background Tasks -- FastAPI BackgroundTasks and Celery task queue patterns