Skip to content

Python Threading

Thread-based concurrency for I/O-bound tasks.

When to Use Threading

  • I/O-bound tasks — Network, file, database operations
  • Blocking calls — Waiting for external resources
  • Legacy code — Where async rewrite isn't feasible

Not for: CPU-bound tasks (use multiprocessing instead).

Basic Threading

Creating Threads

import threading

def worker(name, count):
    for i in range(count):
        print(f"{name}: {i}")

# Create and start threads
t1 = threading.Thread(target=worker, args=("Thread-1", 5))
t2 = threading.Thread(target=worker, args=("Thread-2", 5))

t1.start()
t2.start()

# Wait for completion
t1.join()
t2.join()

print("Done")

Thread Subclass

import threading

class DownloadThread(threading.Thread):
    def __init__(self, url):
        super().__init__()
        self.url = url
        self.result = None

    def run(self):
        self.result = download(self.url)

# Usage
thread = DownloadThread("https://example.com")
thread.start()
thread.join()
print(thread.result)

ThreadPoolExecutor

The modern, preferred way to use threads.

Basic Usage

from concurrent.futures import ThreadPoolExecutor

def fetch_url(url):
    response = requests.get(url)
    return response.json()

urls = ["https://api.example.com/1", "https://api.example.com/2"]

with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(fetch_url, urls))

Submit and Futures

from concurrent.futures import ThreadPoolExecutor, as_completed

def process(item):
    return item * 2

with ThreadPoolExecutor(max_workers=4) as executor:
    # Submit tasks
    futures = {executor.submit(process, i): i for i in range(10)}

    # Process as they complete
    for future in as_completed(futures):
        item = futures[future]
        try:
            result = future.result()
            print(f"{item}{result}")
        except Exception as e:
            print(f"{item} failed: {e}")

Timeout Handling

from concurrent.futures import ThreadPoolExecutor, TimeoutError

def slow_task():
    time.sleep(10)
    return "done"

with ThreadPoolExecutor() as executor:
    future = executor.submit(slow_task)

    try:
        result = future.result(timeout=2.0)
    except TimeoutError:
        print("Task timed out")
        future.cancel()  # May not stop the task immediately

Thread Synchronization

Lock

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    with lock:  # Only one thread at a time
        counter += 1

threads = [threading.Thread(target=increment) for _ in range(1000)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # Always 1000

RLock (Reentrant Lock)

import threading

lock = threading.RLock()

def outer():
    with lock:
        inner()  # Same thread can reacquire

def inner():
    with lock:  # Works with RLock, deadlocks with Lock
        pass

Semaphore

import threading

# Allow max 3 concurrent connections
semaphore = threading.Semaphore(3)

def use_connection():
    with semaphore:  # Blocks if 3 threads already inside
        connect()
        do_work()
        disconnect()

Event

import threading

ready = threading.Event()

def worker():
    print("Worker waiting...")
    ready.wait()  # Block until set
    print("Worker running!")

t = threading.Thread(target=worker)
t.start()

time.sleep(1)
ready.set()  # Signal worker to proceed

Condition

import threading

queue = []
condition = threading.Condition()

def producer():
    with condition:
        queue.append("item")
        condition.notify()  # Wake one consumer

def consumer():
    with condition:
        while not queue:
            condition.wait()  # Release lock and wait
        item = queue.pop(0)
        return item

Barrier

import threading

barrier = threading.Barrier(3)  # Wait for 3 threads

def worker(id):
    print(f"Worker {id} phase 1")
    barrier.wait()  # All threads sync here
    print(f"Worker {id} phase 2")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
    t.start()

Thread-Safe Data Structures

Queue

from queue import Queue

q = Queue(maxsize=10)

def producer():
    for i in range(20):
        q.put(i)  # Blocks if full
    q.put(None)  # Sentinel

def consumer():
    while True:
        item = q.get()  # Blocks if empty
        if item is None:
            break
        process(item)
        q.task_done()

# Start threads
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()

q.join()  # Wait until all items processed

Other Queues

from queue import Queue, LifoQueue, PriorityQueue

# FIFO (default)
fifo = Queue()

# LIFO (stack)
lifo = LifoQueue()

# Priority (lowest first)
priority = PriorityQueue()
priority.put((1, "high priority"))
priority.put((10, "low priority"))

Daemon Threads

Daemon threads are killed when the main program exits.

import threading

def background_task():
    while True:
        do_something()
        time.sleep(1)

# Daemon thread
t = threading.Thread(target=background_task, daemon=True)
t.start()

# Main program can exit, daemon will be killed

Thread-Local Data

Each thread has its own copy of the variable.

import threading

local_data = threading.local()

def worker():
    local_data.value = threading.current_thread().name
    time.sleep(0.1)
    print(f"{threading.current_thread().name}: {local_data.value}")

# Each thread has its own local_data.value
threads = [threading.Thread(target=worker) for _ in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Common Patterns

Parallel Map

from concurrent.futures import ThreadPoolExecutor

def parallel_map(func, items, max_workers=10):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        return list(executor.map(func, items))

# Usage
results = parallel_map(fetch_url, urls)

Worker Pool

from concurrent.futures import ThreadPoolExecutor
from queue import Queue

class WorkerPool:
    def __init__(self, num_workers=4):
        self.executor = ThreadPoolExecutor(max_workers=num_workers)
        self.futures = []

    def submit(self, fn, *args, **kwargs):
        future = self.executor.submit(fn, *args, **kwargs)
        self.futures.append(future)
        return future

    def wait_all(self):
        results = []
        for future in self.futures:
            try:
                results.append(future.result())
            except Exception as e:
                results.append(e)
        self.futures = []
        return results

    def shutdown(self):
        self.executor.shutdown(wait=True)

Bounded Parallel Execution

from concurrent.futures import ThreadPoolExecutor, as_completed

def bounded_parallel(func, items, max_concurrent=10):
    """Process items with bounded concurrency."""
    with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
        futures = {}

        for item in items:
            future = executor.submit(func, item)
            futures[future] = item

        for future in as_completed(futures):
            item = futures[future]
            try:
                yield item, future.result()
            except Exception as e:
                yield item, e

Debugging Threads

Current Thread Info

import threading

def worker():
    thread = threading.current_thread()
    print(f"Name: {thread.name}")
    print(f"ID: {thread.ident}")
    print(f"Alive: {thread.is_alive()}")

List All Threads

for thread in threading.enumerate():
    print(f"{thread.name}: {'daemon' if thread.daemon else 'normal'}")

Stack Traces

import sys
import traceback

def print_thread_stacks():
    for thread_id, frame in sys._current_frames().items():
        print(f"\n--- Thread {thread_id} ---")
        traceback.print_stack(frame)

Best Practices

  1. Use ThreadPoolExecutor — Cleaner than manual thread management
  2. Use context managers — For locks and executors
  3. Keep critical sections short — Minimize time holding locks
  4. Prefer queues — Over shared state with locks
  5. Handle exceptions — Threads fail silently by default
  6. Set reasonable pool sizes — Not too many, not too few
  7. Use timeouts — Avoid indefinite blocking

Common Pitfalls

# Bad: Race condition
counter = 0
def bad_increment():
    global counter
    counter += 1  # Not atomic!

# Good: Use lock
lock = threading.Lock()
def good_increment():
    global counter
    with lock:
        counter += 1

# Bad: Deadlock
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1():
    with lock_a:
        with lock_b:  # Deadlock if thread_2 has lock_b
            pass

# Good: Consistent lock ordering
def safe_thread():
    with lock_a:
        with lock_b:  # Always acquire in same order
            pass