Skip to content

Worker Pools

Manage a pool of workers for parallel task processing.

The Pattern

              ┌─────────┐
              │ Task 1  │──┐
              └─────────┘  │    ┌──────────┐
              ┌─────────┐  ├───▶│ Worker 1 │
Task Queue ──▶│ Task 2  │──┤    └──────────┘
              └─────────┘  │    ┌──────────┐
              ┌─────────┐  ├───▶│ Worker 2 │
              │ Task 3  │──┤    └──────────┘
              └─────────┘  │    ┌──────────┐
                           └───▶│ Worker 3 │
                                └──────────┘

Python: concurrent.futures

ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import multiprocessing

def process_item(item):
    """CPU-intensive processing."""
    return item ** 2

def main():
    items = list(range(100))
    workers = multiprocessing.cpu_count()

    with ProcessPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(process_item, items))

    print(results)

ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor

def fetch_url(url):
    """I/O-bound operation."""
    response = requests.get(url)
    return response.json()

def main():
    urls = ["https://api.example.com/1", "https://api.example.com/2"]

    with ThreadPoolExecutor(max_workers=10) as executor:
        results = list(executor.map(fetch_url, urls))

    print(results)

Handling Exceptions

from concurrent.futures import ProcessPoolExecutor, as_completed

def risky_process(item):
    if item == 5:
        raise ValueError("Can't process 5")
    return item ** 2

def main():
    with ProcessPoolExecutor() as executor:
        futures = {executor.submit(risky_process, i): i for i in range(10)}

        for future in as_completed(futures):
            item = futures[future]
            try:
                result = future.result()
                print(f"{item} -> {result}")
            except Exception as e:
                print(f"{item} failed: {e}")

Async Worker Pool

import asyncio
from typing import TypeVar, Callable, Awaitable

T = TypeVar('T')
R = TypeVar('R')

class AsyncWorkerPool:
    def __init__(self, num_workers: int):
        self.num_workers = num_workers
        self.queue: asyncio.Queue = None
        self.workers: list[asyncio.Task] = []
        self.results: dict = {}

    async def start(self):
        self.queue = asyncio.Queue()
        self.workers = [
            asyncio.create_task(self._worker(i))
            for i in range(self.num_workers)
        ]

    async def _worker(self, worker_id: int):
        while True:
            try:
                task_id, fn, args, kwargs = await self.queue.get()
                if task_id is None:
                    break
                try:
                    result = await fn(*args, **kwargs)
                    self.results[task_id] = ('success', result)
                except Exception as e:
                    self.results[task_id] = ('error', e)
                finally:
                    self.queue.task_done()
            except asyncio.CancelledError:
                break

    async def submit(self, fn: Callable[..., Awaitable[R]], *args, **kwargs) -> int:
        task_id = id(fn) + hash(args)
        await self.queue.put((task_id, fn, args, kwargs))
        return task_id

    async def shutdown(self):
        for _ in self.workers:
            await self.queue.put((None, None, None, None))
        await asyncio.gather(*self.workers)

# Usage
async def process(item):
    await asyncio.sleep(0.1)
    return item ** 2

async def main():
    pool = AsyncWorkerPool(num_workers=4)
    await pool.start()

    task_ids = []
    for i in range(20):
        task_id = await pool.submit(process, i)
        task_ids.append(task_id)

    await pool.queue.join()
    await pool.shutdown()

    for task_id in task_ids:
        status, result = pool.results[task_id]
        print(f"Task {task_id}: {status} = {result}")

Celery (Distributed Tasks)

Setup

# celery_app.py
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def process_item(item):
    return item ** 2

@app.task
def send_email(to, subject, body):
    # Send email logic
    return True

Usage

from celery_app import process_item, send_email

# Async execution
result = process_item.delay(42)

# Get result (blocks)
value = result.get(timeout=10)

# Check status
print(result.status)  # PENDING, SUCCESS, FAILURE

# With options
process_item.apply_async(
    args=[42],
    countdown=60,  # Start after 60 seconds
    expires=3600,  # Expire after 1 hour
)

Chaining Tasks

from celery import chain, group, chord

# Chain: sequential execution
chain(
    fetch_data.s(url),
    process_data.s(),
    save_results.s()
).delay()

# Group: parallel execution
group(
    process_item.s(1),
    process_item.s(2),
    process_item.s(3),
).delay()

# Chord: parallel then callback
chord(
    [process_item.s(i) for i in range(10)],
    aggregate_results.s()
).delay()

Celery Configuration

# celery_config.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/0'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

task_time_limit = 300  # 5 minutes
task_soft_time_limit = 240  # 4 minutes (raises exception)

worker_prefetch_multiplier = 1  # Disable prefetching for fair distribution
worker_concurrency = 4  # Number of worker processes

RQ (Redis Queue)

Simpler alternative to Celery.

# tasks.py
def process_item(item):
    return item ** 2

# worker.py
from redis import Redis
from rq import Queue, Worker

redis_conn = Redis()
q = Queue(connection=redis_conn)

# Enqueue task
from tasks import process_item
job = q.enqueue(process_item, 42)

# Check result
print(job.result)  # None until complete
job.refresh()
print(job.result)  # 1764

# Run worker
# rq worker

JavaScript Worker Pool

class WorkerPool {
  private workers: Worker[] = [];
  private queue: Array<{
    data: any;
    resolve: (value: any) => void;
    reject: (error: Error) => void;
  }> = [];
  private busyWorkers = new Set<Worker>();

  constructor(workerScript: string, poolSize: number = navigator.hardwareConcurrency) {
    for (let i = 0; i < poolSize; i++) {
      const worker = new Worker(workerScript);
      this.workers.push(worker);
    }
  }

  async execute<T>(data: any): Promise<T> {
    return new Promise((resolve, reject) => {
      this.queue.push({ data, resolve, reject });
      this.processQueue();
    });
  }

  private processQueue() {
    const availableWorker = this.workers.find(w => !this.busyWorkers.has(w));
    const task = this.queue.shift();

    if (availableWorker && task) {
      this.busyWorkers.add(availableWorker);

      const cleanup = () => {
        this.busyWorkers.delete(availableWorker);
        this.processQueue();
      };

      availableWorker.onmessage = (e) => {
        task.resolve(e.data);
        cleanup();
      };

      availableWorker.onerror = (e) => {
        task.reject(new Error(e.message));
        cleanup();
      };

      availableWorker.postMessage(task.data);
    }
  }

  terminate() {
    this.workers.forEach(w => w.terminate());
    this.workers = [];
  }
}

// Usage
const pool = new WorkerPool('worker.js', 4);

const results = await Promise.all(
  items.map(item => pool.execute(item))
);

pool.terminate();

Pool Sizing

CPU-Bound Tasks

import multiprocessing

# Match CPU count
workers = multiprocessing.cpu_count()

# Leave some for system
workers = max(1, multiprocessing.cpu_count() - 1)

I/O-Bound Tasks

# Can exceed CPU count since they're mostly waiting
workers = min(32, len(tasks))  # Or based on external service limits

# Rule of thumb for I/O
workers = 2 * multiprocessing.cpu_count() + 1

Memory Considerations

# Limit based on memory
import psutil

available_memory = psutil.virtual_memory().available
memory_per_worker = 500 * 1024 * 1024  # 500MB estimated
max_workers = available_memory // memory_per_worker
workers = min(multiprocessing.cpu_count(), max_workers)

Monitoring

from dataclasses import dataclass
from datetime import datetime
import statistics

@dataclass
class PoolStats:
    tasks_completed: int = 0
    tasks_failed: int = 0
    total_time: float = 0.0
    task_times: list = None

    def __post_init__(self):
        self.task_times = []

    def record_task(self, duration: float, success: bool):
        if success:
            self.tasks_completed += 1
        else:
            self.tasks_failed += 1
        self.total_time += duration
        self.task_times.append(duration)

    def summary(self):
        return {
            'completed': self.tasks_completed,
            'failed': self.tasks_failed,
            'total_time': self.total_time,
            'avg_time': statistics.mean(self.task_times) if self.task_times else 0,
            'p95_time': statistics.quantiles(self.task_times, n=20)[18] if len(self.task_times) > 20 else 0,
        }

Best Practices

  1. Size pools appropriately — CPU count for CPU-bound, more for I/O
  2. Handle failures — Don't let one failure crash the pool
  3. Use bounded queues — Prevent memory exhaustion
  4. Monitor performance — Track task times, failure rates
  5. Graceful shutdown — Drain queue before stopping
  6. Consider Celery/RQ — For production job queues
  7. Timeout tasks — Prevent hanging workers