Python Multiprocessing¶
Process-based parallelism for CPU-bound tasks.
When to Use Multiprocessing¶
- CPU-bound tasks — Computation, data processing, image processing
- True parallelism — Bypass the GIL
- Isolation needed — Process crash doesn't affect others
Trade-offs: - Higher memory usage (separate memory spaces) - Slower communication (serialization needed) - Startup overhead
Basic Multiprocessing¶
Creating Processes¶
import multiprocessing
def worker(name, count):
for i in range(count):
print(f"{name}: {i}")
if __name__ == "__main__": # Required on Windows
p1 = multiprocessing.Process(target=worker, args=("Process-1", 5))
p2 = multiprocessing.Process(target=worker, args=("Process-2", 5))
p1.start()
p2.start()
p1.join()
p2.join()
Why if __name__ == "__main__"?¶
On Windows, processes are spawned by re-importing the module. Without the guard, you get infinite process spawning.
ProcessPoolExecutor¶
The modern, preferred way to use processes.
Basic Usage¶
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
def cpu_intensive(n):
return sum(i * i for i in range(n))
if __name__ == "__main__":
data = [1_000_000] * 8
# Use all CPUs
workers = multiprocessing.cpu_count()
with ProcessPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(cpu_intensive, data))
print(sum(results))
Submit and Futures¶
from concurrent.futures import ProcessPoolExecutor, as_completed
def process_item(item):
return item ** 2
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
futures = {executor.submit(process_item, i): i for i in range(10)}
for future in as_completed(futures):
item = futures[future]
result = future.result()
print(f"{item} → {result}")
Pool (Classic API)¶
Lower-level but more control.
map and map_async¶
from multiprocessing import Pool
def square(x):
return x ** 2
if __name__ == "__main__":
with Pool(processes=4) as pool:
# Synchronous
results = pool.map(square, range(10))
# Asynchronous
async_result = pool.map_async(square, range(10))
results = async_result.get(timeout=10)
apply and apply_async¶
from multiprocessing import Pool
def compute(x, y):
return x + y
if __name__ == "__main__":
with Pool(processes=4) as pool:
# Synchronous (blocks)
result = pool.apply(compute, args=(1, 2))
# Asynchronous
async_result = pool.apply_async(compute, args=(1, 2))
result = async_result.get()
starmap¶
For functions with multiple arguments.
from multiprocessing import Pool
def multiply(x, y):
return x * y
if __name__ == "__main__":
with Pool() as pool:
pairs = [(1, 2), (3, 4), (5, 6)]
results = pool.starmap(multiply, pairs)
# [2, 12, 30]
imap for Memory Efficiency¶
from multiprocessing import Pool
def process(x):
return x ** 2
if __name__ == "__main__":
with Pool() as pool:
# Returns iterator, doesn't load all results into memory
for result in pool.imap(process, range(1000000)):
handle(result)
# Unordered (faster if order doesn't matter)
for result in pool.imap_unordered(process, range(1000000)):
handle(result)
Inter-Process Communication¶
Queue¶
from multiprocessing import Process, Queue
def producer(q):
for i in range(10):
q.put(i)
q.put(None) # Sentinel
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"Got: {item}")
if __name__ == "__main__":
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
Pipe¶
from multiprocessing import Process, Pipe
def sender(conn):
conn.send("Hello from sender!")
conn.close()
def receiver(conn):
msg = conn.recv()
print(f"Received: {msg}")
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
Shared State¶
Value and Array¶
from multiprocessing import Process, Value, Array
def increment(counter, array):
counter.value += 1
for i in range(len(array)):
array[i] += 1
if __name__ == "__main__":
counter = Value('i', 0) # 'i' = integer
array = Array('d', [0.0, 1.0, 2.0]) # 'd' = double
processes = []
for _ in range(4):
p = Process(target=increment, args=(counter, array))
p.start()
processes.append(p)
for p in processes:
p.join()
print(counter.value) # 4
print(array[:]) # [4.0, 5.0, 6.0]
Manager (More Flexible)¶
from multiprocessing import Process, Manager
def worker(shared_dict, shared_list, key, value):
shared_dict[key] = value
shared_list.append(value)
if __name__ == "__main__":
with Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
processes = []
for i in range(4):
p = Process(target=worker, args=(shared_dict, shared_list, f"key{i}", i))
p.start()
processes.append(p)
for p in processes:
p.join()
print(dict(shared_dict)) # {'key0': 0, 'key1': 1, ...}
print(list(shared_list)) # [0, 1, 2, 3]
Synchronization¶
Lock¶
from multiprocessing import Process, Lock, Value
def increment(counter, lock):
for _ in range(10000):
with lock:
counter.value += 1
if __name__ == "__main__":
counter = Value('i', 0)
lock = Lock()
processes = [Process(target=increment, args=(counter, lock)) for _ in range(4)]
for p in processes:
p.start()
for p in processes:
p.join()
print(counter.value) # 40000
Semaphore¶
from multiprocessing import Process, Semaphore
def limited_resource(semaphore, name):
with semaphore: # Only N processes at a time
print(f"{name} acquired resource")
time.sleep(1)
print(f"{name} released resource")
if __name__ == "__main__":
sem = Semaphore(2) # Max 2 concurrent
processes = [Process(target=limited_resource, args=(sem, f"P{i}")) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
Chunking for Large Data¶
from concurrent.futures import ProcessPoolExecutor
import itertools
def process_chunk(chunk):
return [x ** 2 for x in chunk]
def chunked(iterable, size):
it = iter(iterable)
while chunk := list(itertools.islice(it, size)):
yield chunk
if __name__ == "__main__":
data = range(1_000_000)
chunk_size = 10000
with ProcessPoolExecutor() as executor:
results = []
for chunk_result in executor.map(process_chunk, chunked(data, chunk_size)):
results.extend(chunk_result)
Process Startup Methods¶
import multiprocessing
# Check current method
print(multiprocessing.get_start_method())
# Set method (must be done early)
multiprocessing.set_start_method('spawn') # Windows default
multiprocessing.set_start_method('fork') # Unix default (faster)
multiprocessing.set_start_method('forkserver')
# Or use context
ctx = multiprocessing.get_context('spawn')
p = ctx.Process(target=worker)
| Method | Platform | Speed | Safety |
|---|---|---|---|
| fork | Unix | Fast | May have issues with threads |
| spawn | All | Slow | Safest |
| forkserver | Unix | Medium | Good balance |
Handling Exceptions¶
from concurrent.futures import ProcessPoolExecutor
def may_fail(x):
if x == 5:
raise ValueError("Can't process 5")
return x ** 2
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
futures = [executor.submit(may_fail, i) for i in range(10)]
for future in futures:
try:
result = future.result()
print(f"Result: {result}")
except Exception as e:
print(f"Failed: {e}")
Memory Considerations¶
Large Data Transfer¶
# Bad: Sending large data to each process
def process_large_data(data): # data is pickled for each call
return len(data)
# Good: Load data inside process
def process_with_path(file_path):
data = load_data(file_path) # Load in process
return len(data)
Shared Memory (Python 3.8+)¶
from multiprocessing import shared_memory
import numpy as np
if __name__ == "__main__":
# Create shared memory
arr = np.array([1, 2, 3, 4, 5])
shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
# Create numpy array backed by shared memory
shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
shared_arr[:] = arr[:]
# In another process, attach to shared memory
def worker(shm_name, shape, dtype):
existing_shm = shared_memory.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
arr *= 2 # Modify in place
existing_shm.close()
# Cleanup
shm.close()
shm.unlink()
Best Practices¶
- Use ProcessPoolExecutor — Cleaner than raw Process
- Avoid shared state — Use queues and return values
- Chunk large iterables — Better load balancing
- Handle exceptions — Process failures are silent by default
- Mind serialization — Objects must be picklable
- Use name guard — Required on Windows
- Choose right pool size — Usually
cpu_count()for CPU-bound
Common Pitfalls¶
# Bad: Unpicklable object
class MyClass:
def __init__(self):
self.lock = threading.Lock() # Can't pickle locks
# Good: Don't include unpicklable objects
class MyClass:
def __init__(self):
self.data = []
# Bad: Huge return values
def bad_worker():
return list(range(10_000_000)) # Serialized on return
# Good: Return summary or write to file
def good_worker(output_path):
data = process()
with open(output_path, 'wb') as f:
pickle.dump(data, f)
return output_path # Return path, not data