Python Threading¶
Thread-based concurrency for I/O-bound tasks.
When to Use Threading¶
- I/O-bound tasks — Network, file, database operations
- Blocking calls — Waiting for external resources
- Legacy code — Where async rewrite isn't feasible
Not for: CPU-bound tasks (use multiprocessing instead).
Basic Threading¶
Creating Threads¶
import threading
def worker(name, count):
for i in range(count):
print(f"{name}: {i}")
# Create and start threads
t1 = threading.Thread(target=worker, args=("Thread-1", 5))
t2 = threading.Thread(target=worker, args=("Thread-2", 5))
t1.start()
t2.start()
# Wait for completion
t1.join()
t2.join()
print("Done")
Thread Subclass¶
import threading
class DownloadThread(threading.Thread):
def __init__(self, url):
super().__init__()
self.url = url
self.result = None
def run(self):
self.result = download(self.url)
# Usage
thread = DownloadThread("https://example.com")
thread.start()
thread.join()
print(thread.result)
ThreadPoolExecutor¶
The modern, preferred way to use threads.
Basic Usage¶
from concurrent.futures import ThreadPoolExecutor
def fetch_url(url):
response = requests.get(url)
return response.json()
urls = ["https://api.example.com/1", "https://api.example.com/2"]
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(fetch_url, urls))
Submit and Futures¶
from concurrent.futures import ThreadPoolExecutor, as_completed
def process(item):
return item * 2
with ThreadPoolExecutor(max_workers=4) as executor:
# Submit tasks
futures = {executor.submit(process, i): i for i in range(10)}
# Process as they complete
for future in as_completed(futures):
item = futures[future]
try:
result = future.result()
print(f"{item} → {result}")
except Exception as e:
print(f"{item} failed: {e}")
Timeout Handling¶
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def slow_task():
time.sleep(10)
return "done"
with ThreadPoolExecutor() as executor:
future = executor.submit(slow_task)
try:
result = future.result(timeout=2.0)
except TimeoutError:
print("Task timed out")
future.cancel() # May not stop the task immediately
Thread Synchronization¶
Lock¶
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
with lock: # Only one thread at a time
counter += 1
threads = [threading.Thread(target=increment) for _ in range(1000)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # Always 1000
RLock (Reentrant Lock)¶
import threading
lock = threading.RLock()
def outer():
with lock:
inner() # Same thread can reacquire
def inner():
with lock: # Works with RLock, deadlocks with Lock
pass
Semaphore¶
import threading
# Allow max 3 concurrent connections
semaphore = threading.Semaphore(3)
def use_connection():
with semaphore: # Blocks if 3 threads already inside
connect()
do_work()
disconnect()
Event¶
import threading
ready = threading.Event()
def worker():
print("Worker waiting...")
ready.wait() # Block until set
print("Worker running!")
t = threading.Thread(target=worker)
t.start()
time.sleep(1)
ready.set() # Signal worker to proceed
Condition¶
import threading
queue = []
condition = threading.Condition()
def producer():
with condition:
queue.append("item")
condition.notify() # Wake one consumer
def consumer():
with condition:
while not queue:
condition.wait() # Release lock and wait
item = queue.pop(0)
return item
Barrier¶
import threading
barrier = threading.Barrier(3) # Wait for 3 threads
def worker(id):
print(f"Worker {id} phase 1")
barrier.wait() # All threads sync here
print(f"Worker {id} phase 2")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
Thread-Safe Data Structures¶
Queue¶
from queue import Queue
q = Queue(maxsize=10)
def producer():
for i in range(20):
q.put(i) # Blocks if full
q.put(None) # Sentinel
def consumer():
while True:
item = q.get() # Blocks if empty
if item is None:
break
process(item)
q.task_done()
# Start threads
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
q.join() # Wait until all items processed
Other Queues¶
from queue import Queue, LifoQueue, PriorityQueue
# FIFO (default)
fifo = Queue()
# LIFO (stack)
lifo = LifoQueue()
# Priority (lowest first)
priority = PriorityQueue()
priority.put((1, "high priority"))
priority.put((10, "low priority"))
Daemon Threads¶
Daemon threads are killed when the main program exits.
import threading
def background_task():
while True:
do_something()
time.sleep(1)
# Daemon thread
t = threading.Thread(target=background_task, daemon=True)
t.start()
# Main program can exit, daemon will be killed
Thread-Local Data¶
Each thread has its own copy of the variable.
import threading
local_data = threading.local()
def worker():
local_data.value = threading.current_thread().name
time.sleep(0.1)
print(f"{threading.current_thread().name}: {local_data.value}")
# Each thread has its own local_data.value
threads = [threading.Thread(target=worker) for _ in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
Common Patterns¶
Parallel Map¶
from concurrent.futures import ThreadPoolExecutor
def parallel_map(func, items, max_workers=10):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
return list(executor.map(func, items))
# Usage
results = parallel_map(fetch_url, urls)
Worker Pool¶
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
class WorkerPool:
def __init__(self, num_workers=4):
self.executor = ThreadPoolExecutor(max_workers=num_workers)
self.futures = []
def submit(self, fn, *args, **kwargs):
future = self.executor.submit(fn, *args, **kwargs)
self.futures.append(future)
return future
def wait_all(self):
results = []
for future in self.futures:
try:
results.append(future.result())
except Exception as e:
results.append(e)
self.futures = []
return results
def shutdown(self):
self.executor.shutdown(wait=True)
Bounded Parallel Execution¶
from concurrent.futures import ThreadPoolExecutor, as_completed
def bounded_parallel(func, items, max_concurrent=10):
"""Process items with bounded concurrency."""
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
futures = {}
for item in items:
future = executor.submit(func, item)
futures[future] = item
for future in as_completed(futures):
item = futures[future]
try:
yield item, future.result()
except Exception as e:
yield item, e
Debugging Threads¶
Current Thread Info¶
import threading
def worker():
thread = threading.current_thread()
print(f"Name: {thread.name}")
print(f"ID: {thread.ident}")
print(f"Alive: {thread.is_alive()}")
List All Threads¶
for thread in threading.enumerate():
print(f"{thread.name}: {'daemon' if thread.daemon else 'normal'}")
Stack Traces¶
import sys
import traceback
def print_thread_stacks():
for thread_id, frame in sys._current_frames().items():
print(f"\n--- Thread {thread_id} ---")
traceback.print_stack(frame)
Best Practices¶
- Use ThreadPoolExecutor — Cleaner than manual thread management
- Use context managers — For locks and executors
- Keep critical sections short — Minimize time holding locks
- Prefer queues — Over shared state with locks
- Handle exceptions — Threads fail silently by default
- Set reasonable pool sizes — Not too many, not too few
- Use timeouts — Avoid indefinite blocking
Common Pitfalls¶
# Bad: Race condition
counter = 0
def bad_increment():
global counter
counter += 1 # Not atomic!
# Good: Use lock
lock = threading.Lock()
def good_increment():
global counter
with lock:
counter += 1
# Bad: Deadlock
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1():
with lock_a:
with lock_b: # Deadlock if thread_2 has lock_b
pass
# Good: Consistent lock ordering
def safe_thread():
with lock_a:
with lock_b: # Always acquire in same order
pass