Synchronization¶
Coordinating access to shared resources between concurrent tasks.
Why Synchronization?¶
Without synchronization, concurrent access to shared state causes bugs.
# Unsafe: Two threads incrementing counter
counter = 0
def increment():
global counter
temp = counter # Thread A reads 0
temp = temp + 1 # Thread A computes 1
# Context switch!
counter = temp # Thread A writes 1, but B already wrote 1
# Result: counter = 1 (should be 2)
Mutex (Mutual Exclusion)¶
Ensures only one thread accesses a resource at a time.
Thread A: ─request─[=== locked: working ===]─release─
Thread B: ─request─[waiting...]───────────[locked: working]─release─
Python Example¶
import threading
counter = 0
lock = threading.Lock()
def safe_increment():
global counter
with lock: # Acquire lock
counter += 1
# Lock automatically released
# Multiple threads can safely call safe_increment()
Mutex Properties¶
- Mutual exclusion: Only one holder at a time
- Progress: If no one holds it, someone waiting gets it
- Bounded waiting: Requests are eventually granted
Semaphore¶
Allows N threads to access a resource (generalized mutex).
Semaphore(3) — up to 3 concurrent holders
Thread A: ─acquire(count=3→2)─[working]─release(count=2→3)─
Thread B: ─acquire(count=2→1)─[working]─release(count=1→2)─
Thread C: ─acquire(count=1→0)─[working]─release(count=0→1)─
Thread D: ─acquire(count=0)[waiting...wait...wait]─[working]─
Python Example¶
import threading
# Allow 3 concurrent database connections
db_semaphore = threading.Semaphore(3)
def query_database():
with db_semaphore: # Blocks if 3 threads already inside
connection = get_connection()
result = connection.execute(query)
return result
Use Cases¶
- Connection pools
- Rate limiting
- Resource pools
Condition Variables¶
Wait for a specific condition, not just lock availability.
import threading
queue = []
lock = threading.Lock()
not_empty = threading.Condition(lock)
not_full = threading.Condition(lock)
MAX_SIZE = 10
def producer(item):
with not_full:
while len(queue) >= MAX_SIZE:
not_full.wait() # Release lock and wait
queue.append(item)
not_empty.notify() # Wake a consumer
def consumer():
with not_empty:
while len(queue) == 0:
not_empty.wait() # Release lock and wait
item = queue.pop(0)
not_full.notify() # Wake a producer
return item
Key Operations¶
- wait(): Release lock, sleep until notified, reacquire lock
- notify(): Wake one waiting thread
- notify_all(): Wake all waiting threads
Read-Write Locks¶
Multiple readers OR one writer (not both).
Python Example¶
import threading
class ReadWriteLock:
def __init__(self):
self._read_ready = threading.Condition(threading.Lock())
self._readers = 0
def acquire_read(self):
with self._read_ready:
self._readers += 1
def release_read(self):
with self._read_ready:
self._readers -= 1
if self._readers == 0:
self._read_ready.notify_all()
def acquire_write(self):
self._read_ready.acquire()
while self._readers > 0:
self._read_ready.wait()
def release_write(self):
self._read_ready.release()
Use Cases¶
- Caches (many reads, few writes)
- Configuration (read often, update rarely)
- Databases (read replicas)
Barriers¶
Synchronize threads at a common point.
Thread A: ─work─work─│barrier│─work─work─
Thread B: ─work─────│barrier│─work─
Thread C: ─work─work─work─│barrier│─work─
All threads wait until everyone reaches barrier
Python Example¶
import threading
# All 3 threads must reach barrier before any continues
barrier = threading.Barrier(3)
def worker(id):
print(f"Worker {id}: Computing...")
result = compute()
barrier.wait() # Wait for all workers
print(f"Worker {id}: All done, proceeding...")
Use Cases¶
- Parallel algorithms with phases
- Test synchronization
- Multi-stage pipelines
Atomic Operations¶
Operations that complete without interruption.
# Non-atomic (can be interrupted)
counter = counter + 1
# Actually: read → add → write (3 operations)
# Atomic (cannot be interrupted)
counter.increment() # Single operation
Python Atomics¶
import threading
# Using Lock for atomicity
lock = threading.Lock()
def atomic_increment():
with lock:
global counter
counter += 1
# Or use queue.Queue (internally synchronized)
from queue import Queue
q = Queue()
q.put(item) # Atomic
item = q.get() # Atomic
Hardware Atomics¶
Modern CPUs provide atomic instructions: - Compare-and-swap (CAS) - Fetch-and-add - Test-and-set
Lock-Free Data Structures¶
Use atomics instead of locks for better performance.
# Conceptual lock-free stack (Python doesn't have native CAS)
class LockFreeStack:
def __init__(self):
self.head = None
def push(self, value):
while True:
old_head = self.head
new_node = Node(value, old_head)
if compare_and_swap(self.head, old_head, new_node):
return # Success
def pop(self):
while True:
old_head = self.head
if old_head is None:
return None
new_head = old_head.next
if compare_and_swap(self.head, old_head, new_head):
return old_head.value
Trade-offs¶
| Approach | Pros | Cons |
|---|---|---|
| Locks | Simple, general | Contention, deadlock risk |
| Lock-free | No deadlocks, better scalability | Complex, harder to debug |
Synchronization Guidelines¶
Do¶
- Keep critical sections short — Hold locks briefly
- Lock at the right granularity — Not too coarse, not too fine
- Use RAII/context managers — Ensure locks are released
- Prefer higher-level abstractions — Queues, pools, actors
Don't¶
- Hold locks during I/O — Blocks other threads
- Nest locks carelessly — Risk of deadlock
- Over-synchronize — Kills parallelism
- Share mutable state when you can avoid it
Summary Table¶
| Primitive | Purpose | Python |
|---|---|---|
| Mutex | One-at-a-time access | threading.Lock() |
| Semaphore | N-at-a-time access | threading.Semaphore(n) |
| Condition | Wait for condition | threading.Condition() |
| RWLock | Multiple readers OR one writer | Custom or libraries |
| Barrier | Synchronize at point | threading.Barrier(n) |
| Event | Signal occurrence | threading.Event() |