Skip to content

Concurrency Cookbook

Ready-to-use recipes for common concurrent programming tasks.

When to Use What

Scenario Recommended Approach Why
I/O-bound tasks (HTTP calls, DB queries, file I/O) asyncio Non-blocking, single-threaded, most efficient for I/O waits
CPU-bound tasks (image processing, data crunching) ProcessPoolExecutor / multiprocessing Bypasses the GIL with separate processes
Legacy/C-extension code that releases the GIL threading Simpler than multiprocessing when GIL isn't the bottleneck
Mixed I/O + CPU in the same app asyncio + ProcessPoolExecutor Use asyncio for orchestration, offload CPU work to processes

Rule of Thumb

Start with asyncio. If profiling shows CPU is the bottleneck, add ProcessPoolExecutor. Use threading only for legacy compatibility or C extensions that release the GIL.

Python Recipes

Parallel HTTP Requests (Beginner)

import asyncio
import aiohttp

async def fetch_all(urls: list[str]) -> list[dict]:
    """Fetch multiple URLs concurrently."""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_json(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

async def fetch_json(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        response.raise_for_status()
        return await response.json()

# Usage
results = asyncio.run(fetch_all([
    "https://api.example.com/users",
    "https://api.example.com/posts",
    "https://api.example.com/comments",
]))

Rate-Limited Requests (Intermediate)

import asyncio
import aiohttp

class RateLimitedClient:
    def __init__(self, rate: int = 10, period: float = 1.0):
        self.semaphore = asyncio.Semaphore(rate)
        self.period = period

    async def fetch(self, session: aiohttp.ClientSession, url: str) -> dict:
        async with self.semaphore:
            async with session.get(url) as response:
                result = await response.json()
            await asyncio.sleep(self.period / 10)  # Spread requests
            return result

async def fetch_with_rate_limit(urls: list[str], rate: int = 10):
    client = RateLimitedClient(rate=rate)
    async with aiohttp.ClientSession() as session:
        tasks = [client.fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

Parallel File Processing (Beginner)

from concurrent.futures import ProcessPoolExecutor
import multiprocessing
from pathlib import Path

def process_file(path: Path) -> dict:
    """Process a single file (CPU-intensive)."""
    content = path.read_text()
    # Heavy processing...
    return {"path": str(path), "lines": len(content.splitlines())}

def process_files_parallel(paths: list[Path]) -> list[dict]:
    """Process multiple files in parallel."""
    workers = multiprocessing.cpu_count()
    with ProcessPoolExecutor(max_workers=workers) as executor:
        return list(executor.map(process_file, paths))

# Usage
paths = list(Path("data").glob("*.txt"))
results = process_files_parallel(paths)

Async Database Queries (Beginner)

import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

async def fetch_user_data(db: AsyncSession, user_id: int) -> dict:
    """Fetch user with related data concurrently."""
    user_query = select(User).where(User.id == user_id)
    posts_query = select(Post).where(Post.user_id == user_id)
    comments_query = select(Comment).where(Comment.user_id == user_id)

    user_result, posts_result, comments_result = await asyncio.gather(
        db.execute(user_query),
        db.execute(posts_query),
        db.execute(comments_query),
    )

    return {
        "user": user_result.scalar_one(),
        "posts": posts_result.scalars().all(),
        "comments": comments_result.scalars().all(),
    }

Timeout Wrapper (Beginner)

import asyncio
from typing import TypeVar, Awaitable

T = TypeVar('T')

async def with_timeout(
    coro: Awaitable[T],
    timeout: float,
    default: T = None
) -> T:
    """Execute coroutine with timeout, return default on timeout."""
    try:
        return await asyncio.wait_for(coro, timeout=timeout)
    except asyncio.TimeoutError:
        return default

# Usage
result = await with_timeout(slow_operation(), timeout=5.0, default={})

Retry with Exponential Backoff (Intermediate)

import asyncio
import random
from typing import TypeVar, Callable, Awaitable

T = TypeVar('T')

async def retry_with_backoff(
    fn: Callable[[], Awaitable[T]],
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True,
) -> T:
    """Retry async function with exponential backoff."""
    for attempt in range(max_retries):
        try:
            return await fn()
        except Exception as e:
            if attempt == max_retries - 1:
                raise

            delay = min(base_delay * (2 ** attempt), max_delay)
            if jitter:
                delay *= (0.5 + random.random())

            await asyncio.sleep(delay)

# Usage
result = await retry_with_backoff(
    lambda: fetch_data(url),
    max_retries=5,
    base_delay=1.0,
)

Batch Processing (Intermediate)

import asyncio
from typing import TypeVar, List, Callable, Awaitable

T = TypeVar('T')
R = TypeVar('R')

async def process_in_batches(
    items: List[T],
    batch_size: int,
    process_fn: Callable[[T], Awaitable[R]],
    max_concurrent: int = 10,
) -> List[R]:
    """Process items in batches with limited concurrency."""
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []

    async def process_with_semaphore(item: T) -> R:
        async with semaphore:
            return await process_fn(item)

    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        batch_results = await asyncio.gather(
            *[process_with_semaphore(item) for item in batch]
        )
        results.extend(batch_results)

    return results

# Usage
results = await process_in_batches(
    items=urls,
    batch_size=100,
    process_fn=fetch_url,
    max_concurrent=10,
)

Background Task Manager (Intermediate)

import asyncio
from typing import Coroutine, Any

class BackgroundTaskManager:
    def __init__(self):
        self.tasks: set[asyncio.Task] = set()

    def create_task(self, coro: Coroutine[Any, Any, Any]) -> asyncio.Task:
        """Create and track a background task."""
        task = asyncio.create_task(coro)
        self.tasks.add(task)
        task.add_done_callback(self.tasks.discard)
        return task

    async def shutdown(self, timeout: float = 5.0):
        """Cancel all tasks and wait for completion."""
        for task in self.tasks:
            task.cancel()

        if self.tasks:
            await asyncio.wait(self.tasks, timeout=timeout)

# Usage in FastAPI
from fastapi import FastAPI

app = FastAPI()
bg_tasks = BackgroundTaskManager()

@app.on_event("shutdown")
async def shutdown():
    await bg_tasks.shutdown()

@app.post("/send-email")
async def send_email(email: str):
    bg_tasks.create_task(send_email_async(email))
    return {"status": "queued"}

JavaScript Recipes

Parallel Fetches with Limit (Intermediate)

async function fetchAllWithLimit<T>(
  urls: string[],
  limit: number = 5
): Promise<T[]> {
  const results: T[] = [];
  const executing: Promise<void>[] = [];

  for (const url of urls) {
    const promise = fetch(url)
      .then(r => r.json())
      .then(data => {
        results.push(data);
      });

    executing.push(promise);

    if (executing.length >= limit) {
      await Promise.race(executing);
      executing.splice(
        executing.findIndex(p => p === promise),
        1
      );
    }
  }

  await Promise.all(executing);
  return results;
}

Debounced Async Function (Advanced)

function debounceAsync<T extends (...args: any[]) => Promise<any>>(
  fn: T,
  delay: number
): (...args: Parameters<T>) => Promise<ReturnType<T>> {
  let timeoutId: NodeJS.Timeout | null = null;
  let pendingPromise: Promise<ReturnType<T>> | null = null;
  let pendingResolve: ((value: ReturnType<T>) => void) | null = null;

  return (...args: Parameters<T>): Promise<ReturnType<T>> => {
    if (timeoutId) {
      clearTimeout(timeoutId);
    }

    if (!pendingPromise) {
      pendingPromise = new Promise(resolve => {
        pendingResolve = resolve;
      });
    }

    timeoutId = setTimeout(async () => {
      const result = await fn(...args);
      pendingResolve!(result);
      pendingPromise = null;
      pendingResolve = null;
    }, delay);

    return pendingPromise;
  };
}

// Usage
const debouncedSearch = debounceAsync(searchAPI, 300);

Sequential Promise Execution (Beginner)

async function executeSequentially<T, R>(
  items: T[],
  fn: (item: T) => Promise<R>
): Promise<R[]> {
  const results: R[] = [];

  for (const item of items) {
    const result = await fn(item);
    results.push(result);
  }

  return results;
}

// With reduce
async function executeSequentially2<T, R>(
  items: T[],
  fn: (item: T) => Promise<R>
): Promise<R[]> {
  return items.reduce(async (accPromise, item) => {
    const acc = await accPromise;
    const result = await fn(item);
    return [...acc, result];
  }, Promise.resolve([] as R[]));
}

Promise Pool (Intermediate)

class PromisePool {
  private running = 0;
  private queue: (() => void)[] = [];

  constructor(private concurrency: number) {}

  async add<T>(fn: () => Promise<T>): Promise<T> {
    if (this.running >= this.concurrency) {
      await new Promise<void>(resolve => this.queue.push(resolve));
    }

    this.running++;

    try {
      return await fn();
    } finally {
      this.running--;
      const next = this.queue.shift();
      if (next) next();
    }
  }
}

// Usage
const pool = new PromisePool(5);
const results = await Promise.all(
  urls.map(url => pool.add(() => fetch(url).then(r => r.json())))
);

Cached Async Function (Intermediate)

function cacheAsync<T extends (...args: any[]) => Promise<any>>(
  fn: T,
  keyFn: (...args: Parameters<T>) => string = (...args) => JSON.stringify(args),
  ttl: number = 60000
): T {
  const cache = new Map<string, { value: ReturnType<T>; expires: number }>();

  return (async (...args: Parameters<T>): Promise<ReturnType<T>> => {
    const key = keyFn(...args);
    const now = Date.now();

    const cached = cache.get(key);
    if (cached && cached.expires > now) {
      return cached.value;
    }

    const value = await fn(...args);
    cache.set(key, { value, expires: now + ttl });
    return value;
  }) as T;
}

// Usage
const cachedFetch = cacheAsync(
  (url: string) => fetch(url).then(r => r.json()),
  (url) => url,
  5 * 60 * 1000 // 5 minutes
);

Abort Controller Wrapper (Beginner)

function withAbort<T>(
  fn: (signal: AbortSignal) => Promise<T>,
  timeout?: number
): { promise: Promise<T>; abort: () => void } {
  const controller = new AbortController();
  let timeoutId: NodeJS.Timeout | undefined;

  if (timeout) {
    timeoutId = setTimeout(() => controller.abort(), timeout);
  }

  const promise = fn(controller.signal).finally(() => {
    if (timeoutId) clearTimeout(timeoutId);
  });

  return {
    promise,
    abort: () => controller.abort(),
  };
}

// Usage
const { promise, abort } = withAbort(
  (signal) => fetch('/api/data', { signal }).then(r => r.json()),
  5000
);

// Can call abort() to cancel

Mixed Language Patterns

API with Concurrent Backend Calls (Python → JS) (Beginner)

# Python backend
from fastapi import FastAPI
import asyncio
import aiohttp

app = FastAPI()

@app.get("/dashboard/{user_id}")
async def get_dashboard(user_id: int):
    # Fetch all data concurrently
    async with aiohttp.ClientSession() as session:
        user, metrics, notifications = await asyncio.gather(
            fetch_user(session, user_id),
            fetch_metrics(session, user_id),
            fetch_notifications(session, user_id),
        )

    return {
        "user": user,
        "metrics": metrics,
        "notifications": notifications,
    }
// JavaScript frontend
async function loadDashboard(userId: number) {
  // Single request to backend that does concurrent fetching
  const response = await fetch(`/api/dashboard/${userId}`);
  return response.json();
}

Parallel Processing with Progress (Intermediate)

import asyncio
from typing import Callable, AsyncIterator, TypeVar

T = TypeVar('T')
R = TypeVar('R')

async def process_with_progress(
    items: list[T],
    process_fn: Callable[[T], R],
    on_progress: Callable[[int, int], None],
) -> list[R]:
    """Process items with progress callback."""
    total = len(items)
    completed = 0
    results = []

    semaphore = asyncio.Semaphore(10)

    async def process_one(item: T) -> R:
        nonlocal completed
        async with semaphore:
            result = await process_fn(item)
            completed += 1
            on_progress(completed, total)
            return result

    tasks = [process_one(item) for item in items]
    return await asyncio.gather(*tasks)

# Usage
await process_with_progress(
    items=urls,
    process_fn=fetch_url,
    on_progress=lambda done, total: print(f"{done}/{total}"),
)
// JavaScript with streaming progress
async function processWithProgress<T, R>(
  items: T[],
  processFn: (item: T) => Promise<R>,
  onProgress: (completed: number, total: number) => void
): Promise<R[]> {
  const total = items.length;
  let completed = 0;
  const results: R[] = [];

  const executing: Promise<void>[] = [];
  const limit = 10;

  for (const item of items) {
    const promise = processFn(item).then(result => {
      results.push(result);
      completed++;
      onProgress(completed, total);
    });

    executing.push(promise);

    if (executing.length >= limit) {
      await Promise.race(executing);
    }
  }

  await Promise.all(executing);
  return results;
}

See Also