Skip to content

Python Concurrency

Python-specific approaches to concurrent programming.

Overview

Python offers three main concurrency models:

Model Module Best For Parallelism
Threading threading I/O-bound tasks Limited by GIL
Multiprocessing multiprocessing CPU-bound tasks True parallelism
Async/Await asyncio High-concurrency I/O Single-threaded

The Big Picture

# I/O-bound: Use asyncio
async def fetch_users():
    async with aiohttp.ClientSession() as session:
        users = await asyncio.gather(
            fetch_user(session, 1),
            fetch_user(session, 2),
            fetch_user(session, 3),
        )
    return users

# CPU-bound: Use multiprocessing
def process_images(paths):
    with ProcessPoolExecutor() as executor:
        results = executor.map(process_image, paths)
    return list(results)

# Mixed: Combine both
async def process_batch(items):
    # Fetch data concurrently
    data = await fetch_all_data(items)

    # Process CPU-intensive work in process pool
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        results = await loop.run_in_executor(pool, process_data, data)

    return results

Quick Decision Guide

Is it CPU-intensive?
├── Yes → multiprocessing (ProcessPoolExecutor)
└── No → Is it I/O-bound?
    ├── Yes → asyncio (async/await)
    └── No → Regular synchronous code is fine

Topics

Topic Description
The GIL Explained What it is, when it matters
Threading Thread-based concurrency
Multiprocessing Process-based parallelism
Asyncio Deep Dive Async/await in depth
Choosing the Right Tool Decision framework

Common Patterns

Concurrent I/O Requests

import asyncio
import aiohttp

async def fetch_json(session, url):
    async with session.get(url) as response:
        return await response.json()

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_json(session, url) for url in urls]
        return await asyncio.gather(*tasks)

# Usage
results = asyncio.run(fetch_all([
    "https://api.example.com/users",
    "https://api.example.com/posts",
    "https://api.example.com/comments",
]))

Parallel CPU Work

from concurrent.futures import ProcessPoolExecutor
import multiprocessing

def cpu_intensive(data):
    # Heavy computation
    return sum(x * x for x in data)

def process_parallel(datasets):
    workers = multiprocessing.cpu_count()
    with ProcessPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(cpu_intensive, datasets))
    return results

Background Tasks

import asyncio
from typing import Callable, Awaitable

class BackgroundTasks:
    def __init__(self):
        self.tasks: set[asyncio.Task] = set()

    def add(self, coro: Awaitable):
        task = asyncio.create_task(coro)
        self.tasks.add(task)
        task.add_done_callback(self.tasks.discard)

# Usage
bg_tasks = BackgroundTasks()
bg_tasks.add(send_email(user.email, "Welcome!"))
bg_tasks.add(log_signup(user.id))
# Tasks run in background, don't block response

Producer-Consumer Queue

import asyncio

async def producer(queue: asyncio.Queue, items):
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
    await queue.put(None)  # Sentinel to stop consumers

async def consumer(queue: asyncio.Queue, name: str):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(f"{name} consumed: {item}")
        await asyncio.sleep(0.1)  # Simulate work
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)

    # Start producer and consumers
    await asyncio.gather(
        producer(queue, range(20)),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
    )

asyncio.run(main())

Thread Safety in Python

Thread-Safe Types

from queue import Queue  # Thread-safe
from collections import deque  # NOT thread-safe for all ops

# Safe
q = Queue()
q.put(item)    # Atomic
q.get()        # Atomic

# Also safe (due to GIL, but be careful)
my_list.append(item)  # Atomic
my_dict[key] = value  # Atomic

# NOT safe
my_list[0] = item     # Compound operation

Using Locks

import threading

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()

    def increment(self):
        with self.lock:
            self.value += 1

    def get(self):
        with self.lock:
            return self.value

Performance Comparison

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def io_bound_task():
    time.sleep(0.1)  # Simulate I/O
    return "done"

def cpu_bound_task(n):
    return sum(i * i for i in range(n))

# Sequential
start = time.time()
[io_bound_task() for _ in range(10)]
print(f"Sequential: {time.time() - start:.2f}s")  # ~1.0s

# Threaded
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
    list(executor.map(lambda _: io_bound_task(), range(10)))
print(f"Threaded: {time.time() - start:.2f}s")  # ~0.1s

# Async
async def async_io():
    await asyncio.sleep(0.1)
    return "done"

start = time.time()
asyncio.run(asyncio.gather(*[async_io() for _ in range(10)]))
print(f"Async: {time.time() - start:.2f}s")  # ~0.1s

Best Practices

  1. Use asyncio for I/O — Network, database, file operations
  2. Use multiprocessing for CPU — Computation, data processing
  3. Avoid shared state — Use queues, message passing
  4. Keep the GIL in mind — Threading doesn't parallelize CPU work
  5. Profile before optimizing — Measure actual bottlenecks
  6. Use high-level abstractionsconcurrent.futures, asyncio.gather