Skip to content

Python Multiprocessing

Process-based parallelism for CPU-bound tasks.

When to Use Multiprocessing

  • CPU-bound tasks — Computation, data processing, image processing
  • True parallelism — Bypass the GIL
  • Isolation needed — Process crash doesn't affect others

Trade-offs: - Higher memory usage (separate memory spaces) - Slower communication (serialization needed) - Startup overhead

Basic Multiprocessing

Creating Processes

import multiprocessing

def worker(name, count):
    for i in range(count):
        print(f"{name}: {i}")

if __name__ == "__main__":  # Required on Windows
    p1 = multiprocessing.Process(target=worker, args=("Process-1", 5))
    p2 = multiprocessing.Process(target=worker, args=("Process-2", 5))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

Why if __name__ == "__main__"?

On Windows, processes are spawned by re-importing the module. Without the guard, you get infinite process spawning.

# Always use this pattern
if __name__ == "__main__":
    main()

ProcessPoolExecutor

The modern, preferred way to use processes.

Basic Usage

from concurrent.futures import ProcessPoolExecutor
import multiprocessing

def cpu_intensive(n):
    return sum(i * i for i in range(n))

if __name__ == "__main__":
    data = [1_000_000] * 8

    # Use all CPUs
    workers = multiprocessing.cpu_count()

    with ProcessPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(cpu_intensive, data))

    print(sum(results))

Submit and Futures

from concurrent.futures import ProcessPoolExecutor, as_completed

def process_item(item):
    return item ** 2

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        futures = {executor.submit(process_item, i): i for i in range(10)}

        for future in as_completed(futures):
            item = futures[future]
            result = future.result()
            print(f"{item}{result}")

Pool (Classic API)

Lower-level but more control.

map and map_async

from multiprocessing import Pool

def square(x):
    return x ** 2

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        # Synchronous
        results = pool.map(square, range(10))

        # Asynchronous
        async_result = pool.map_async(square, range(10))
        results = async_result.get(timeout=10)

apply and apply_async

from multiprocessing import Pool

def compute(x, y):
    return x + y

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        # Synchronous (blocks)
        result = pool.apply(compute, args=(1, 2))

        # Asynchronous
        async_result = pool.apply_async(compute, args=(1, 2))
        result = async_result.get()

starmap

For functions with multiple arguments.

from multiprocessing import Pool

def multiply(x, y):
    return x * y

if __name__ == "__main__":
    with Pool() as pool:
        pairs = [(1, 2), (3, 4), (5, 6)]
        results = pool.starmap(multiply, pairs)
        # [2, 12, 30]

imap for Memory Efficiency

from multiprocessing import Pool

def process(x):
    return x ** 2

if __name__ == "__main__":
    with Pool() as pool:
        # Returns iterator, doesn't load all results into memory
        for result in pool.imap(process, range(1000000)):
            handle(result)

        # Unordered (faster if order doesn't matter)
        for result in pool.imap_unordered(process, range(1000000)):
            handle(result)

Inter-Process Communication

Queue

from multiprocessing import Process, Queue

def producer(q):
    for i in range(10):
        q.put(i)
    q.put(None)  # Sentinel

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Got: {item}")

if __name__ == "__main__":
    q = Queue()

    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

Pipe

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send("Hello from sender!")
    conn.close()

def receiver(conn):
    msg = conn.recv()
    print(f"Received: {msg}")

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()

    p1 = Process(target=sender, args=(child_conn,))
    p2 = Process(target=receiver, args=(parent_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

Shared State

Value and Array

from multiprocessing import Process, Value, Array

def increment(counter, array):
    counter.value += 1
    for i in range(len(array)):
        array[i] += 1

if __name__ == "__main__":
    counter = Value('i', 0)  # 'i' = integer
    array = Array('d', [0.0, 1.0, 2.0])  # 'd' = double

    processes = []
    for _ in range(4):
        p = Process(target=increment, args=(counter, array))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    print(counter.value)  # 4
    print(array[:])  # [4.0, 5.0, 6.0]

Manager (More Flexible)

from multiprocessing import Process, Manager

def worker(shared_dict, shared_list, key, value):
    shared_dict[key] = value
    shared_list.append(value)

if __name__ == "__main__":
    with Manager() as manager:
        shared_dict = manager.dict()
        shared_list = manager.list()

        processes = []
        for i in range(4):
            p = Process(target=worker, args=(shared_dict, shared_list, f"key{i}", i))
            p.start()
            processes.append(p)

        for p in processes:
            p.join()

        print(dict(shared_dict))  # {'key0': 0, 'key1': 1, ...}
        print(list(shared_list))  # [0, 1, 2, 3]

Synchronization

Lock

from multiprocessing import Process, Lock, Value

def increment(counter, lock):
    for _ in range(10000):
        with lock:
            counter.value += 1

if __name__ == "__main__":
    counter = Value('i', 0)
    lock = Lock()

    processes = [Process(target=increment, args=(counter, lock)) for _ in range(4)]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(counter.value)  # 40000

Semaphore

from multiprocessing import Process, Semaphore

def limited_resource(semaphore, name):
    with semaphore:  # Only N processes at a time
        print(f"{name} acquired resource")
        time.sleep(1)
        print(f"{name} released resource")

if __name__ == "__main__":
    sem = Semaphore(2)  # Max 2 concurrent

    processes = [Process(target=limited_resource, args=(sem, f"P{i}")) for i in range(5)]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

Chunking for Large Data

from concurrent.futures import ProcessPoolExecutor
import itertools

def process_chunk(chunk):
    return [x ** 2 for x in chunk]

def chunked(iterable, size):
    it = iter(iterable)
    while chunk := list(itertools.islice(it, size)):
        yield chunk

if __name__ == "__main__":
    data = range(1_000_000)
    chunk_size = 10000

    with ProcessPoolExecutor() as executor:
        results = []
        for chunk_result in executor.map(process_chunk, chunked(data, chunk_size)):
            results.extend(chunk_result)

Process Startup Methods

import multiprocessing

# Check current method
print(multiprocessing.get_start_method())

# Set method (must be done early)
multiprocessing.set_start_method('spawn')  # Windows default
multiprocessing.set_start_method('fork')   # Unix default (faster)
multiprocessing.set_start_method('forkserver')

# Or use context
ctx = multiprocessing.get_context('spawn')
p = ctx.Process(target=worker)
Method Platform Speed Safety
fork Unix Fast May have issues with threads
spawn All Slow Safest
forkserver Unix Medium Good balance

Handling Exceptions

from concurrent.futures import ProcessPoolExecutor

def may_fail(x):
    if x == 5:
        raise ValueError("Can't process 5")
    return x ** 2

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        futures = [executor.submit(may_fail, i) for i in range(10)]

        for future in futures:
            try:
                result = future.result()
                print(f"Result: {result}")
            except Exception as e:
                print(f"Failed: {e}")

Memory Considerations

Large Data Transfer

# Bad: Sending large data to each process
def process_large_data(data):  # data is pickled for each call
    return len(data)

# Good: Load data inside process
def process_with_path(file_path):
    data = load_data(file_path)  # Load in process
    return len(data)

Shared Memory (Python 3.8+)

from multiprocessing import shared_memory
import numpy as np

if __name__ == "__main__":
    # Create shared memory
    arr = np.array([1, 2, 3, 4, 5])
    shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)

    # Create numpy array backed by shared memory
    shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
    shared_arr[:] = arr[:]

    # In another process, attach to shared memory
    def worker(shm_name, shape, dtype):
        existing_shm = shared_memory.SharedMemory(name=shm_name)
        arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
        arr *= 2  # Modify in place
        existing_shm.close()

    # Cleanup
    shm.close()
    shm.unlink()

Best Practices

  1. Use ProcessPoolExecutor — Cleaner than raw Process
  2. Avoid shared state — Use queues and return values
  3. Chunk large iterables — Better load balancing
  4. Handle exceptions — Process failures are silent by default
  5. Mind serialization — Objects must be picklable
  6. Use name guard — Required on Windows
  7. Choose right pool size — Usually cpu_count() for CPU-bound

Common Pitfalls

# Bad: Unpicklable object
class MyClass:
    def __init__(self):
        self.lock = threading.Lock()  # Can't pickle locks

# Good: Don't include unpicklable objects
class MyClass:
    def __init__(self):
        self.data = []

# Bad: Huge return values
def bad_worker():
    return list(range(10_000_000))  # Serialized on return

# Good: Return summary or write to file
def good_worker(output_path):
    data = process()
    with open(output_path, 'wb') as f:
        pickle.dump(data, f)
    return output_path  # Return path, not data