Worker Pools¶
Manage a pool of workers for parallel task processing.
The Pattern¶
┌─────────┐
│ Task 1 │──┐
└─────────┘ │ ┌──────────┐
┌─────────┐ ├───▶│ Worker 1 │
Task Queue ──▶│ Task 2 │──┤ └──────────┘
└─────────┘ │ ┌──────────┐
┌─────────┐ ├───▶│ Worker 2 │
│ Task 3 │──┤ └──────────┘
└─────────┘ │ ┌──────────┐
└───▶│ Worker 3 │
└──────────┘
Python: concurrent.futures¶
ProcessPoolExecutor¶
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
def process_item(item):
"""CPU-intensive processing."""
return item ** 2
def main():
items = list(range(100))
workers = multiprocessing.cpu_count()
with ProcessPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(process_item, items))
print(results)
ThreadPoolExecutor¶
from concurrent.futures import ThreadPoolExecutor
def fetch_url(url):
"""I/O-bound operation."""
response = requests.get(url)
return response.json()
def main():
urls = ["https://api.example.com/1", "https://api.example.com/2"]
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(fetch_url, urls))
print(results)
Handling Exceptions¶
from concurrent.futures import ProcessPoolExecutor, as_completed
def risky_process(item):
if item == 5:
raise ValueError("Can't process 5")
return item ** 2
def main():
with ProcessPoolExecutor() as executor:
futures = {executor.submit(risky_process, i): i for i in range(10)}
for future in as_completed(futures):
item = futures[future]
try:
result = future.result()
print(f"{item} -> {result}")
except Exception as e:
print(f"{item} failed: {e}")
Async Worker Pool¶
import asyncio
from typing import TypeVar, Callable, Awaitable
T = TypeVar('T')
R = TypeVar('R')
class AsyncWorkerPool:
def __init__(self, num_workers: int):
self.num_workers = num_workers
self.queue: asyncio.Queue = None
self.workers: list[asyncio.Task] = []
self.results: dict = {}
async def start(self):
self.queue = asyncio.Queue()
self.workers = [
asyncio.create_task(self._worker(i))
for i in range(self.num_workers)
]
async def _worker(self, worker_id: int):
while True:
try:
task_id, fn, args, kwargs = await self.queue.get()
if task_id is None:
break
try:
result = await fn(*args, **kwargs)
self.results[task_id] = ('success', result)
except Exception as e:
self.results[task_id] = ('error', e)
finally:
self.queue.task_done()
except asyncio.CancelledError:
break
async def submit(self, fn: Callable[..., Awaitable[R]], *args, **kwargs) -> int:
task_id = id(fn) + hash(args)
await self.queue.put((task_id, fn, args, kwargs))
return task_id
async def shutdown(self):
for _ in self.workers:
await self.queue.put((None, None, None, None))
await asyncio.gather(*self.workers)
# Usage
async def process(item):
await asyncio.sleep(0.1)
return item ** 2
async def main():
pool = AsyncWorkerPool(num_workers=4)
await pool.start()
task_ids = []
for i in range(20):
task_id = await pool.submit(process, i)
task_ids.append(task_id)
await pool.queue.join()
await pool.shutdown()
for task_id in task_ids:
status, result = pool.results[task_id]
print(f"Task {task_id}: {status} = {result}")
Celery (Distributed Tasks)¶
Setup¶
# celery_app.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def process_item(item):
return item ** 2
@app.task
def send_email(to, subject, body):
# Send email logic
return True
Usage¶
from celery_app import process_item, send_email
# Async execution
result = process_item.delay(42)
# Get result (blocks)
value = result.get(timeout=10)
# Check status
print(result.status) # PENDING, SUCCESS, FAILURE
# With options
process_item.apply_async(
args=[42],
countdown=60, # Start after 60 seconds
expires=3600, # Expire after 1 hour
)
Chaining Tasks¶
from celery import chain, group, chord
# Chain: sequential execution
chain(
fetch_data.s(url),
process_data.s(),
save_results.s()
).delay()
# Group: parallel execution
group(
process_item.s(1),
process_item.s(2),
process_item.s(3),
).delay()
# Chord: parallel then callback
chord(
[process_item.s(i) for i in range(10)],
aggregate_results.s()
).delay()
Celery Configuration¶
# celery_config.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/0'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
task_time_limit = 300 # 5 minutes
task_soft_time_limit = 240 # 4 minutes (raises exception)
worker_prefetch_multiplier = 1 # Disable prefetching for fair distribution
worker_concurrency = 4 # Number of worker processes
RQ (Redis Queue)¶
Simpler alternative to Celery.
# tasks.py
def process_item(item):
return item ** 2
# worker.py
from redis import Redis
from rq import Queue, Worker
redis_conn = Redis()
q = Queue(connection=redis_conn)
# Enqueue task
from tasks import process_item
job = q.enqueue(process_item, 42)
# Check result
print(job.result) # None until complete
job.refresh()
print(job.result) # 1764
# Run worker
# rq worker
JavaScript Worker Pool¶
class WorkerPool {
private workers: Worker[] = [];
private queue: Array<{
data: any;
resolve: (value: any) => void;
reject: (error: Error) => void;
}> = [];
private busyWorkers = new Set<Worker>();
constructor(workerScript: string, poolSize: number = navigator.hardwareConcurrency) {
for (let i = 0; i < poolSize; i++) {
const worker = new Worker(workerScript);
this.workers.push(worker);
}
}
async execute<T>(data: any): Promise<T> {
return new Promise((resolve, reject) => {
this.queue.push({ data, resolve, reject });
this.processQueue();
});
}
private processQueue() {
const availableWorker = this.workers.find(w => !this.busyWorkers.has(w));
const task = this.queue.shift();
if (availableWorker && task) {
this.busyWorkers.add(availableWorker);
const cleanup = () => {
this.busyWorkers.delete(availableWorker);
this.processQueue();
};
availableWorker.onmessage = (e) => {
task.resolve(e.data);
cleanup();
};
availableWorker.onerror = (e) => {
task.reject(new Error(e.message));
cleanup();
};
availableWorker.postMessage(task.data);
}
}
terminate() {
this.workers.forEach(w => w.terminate());
this.workers = [];
}
}
// Usage
const pool = new WorkerPool('worker.js', 4);
const results = await Promise.all(
items.map(item => pool.execute(item))
);
pool.terminate();
Pool Sizing¶
CPU-Bound Tasks¶
import multiprocessing
# Match CPU count
workers = multiprocessing.cpu_count()
# Leave some for system
workers = max(1, multiprocessing.cpu_count() - 1)
I/O-Bound Tasks¶
# Can exceed CPU count since they're mostly waiting
workers = min(32, len(tasks)) # Or based on external service limits
# Rule of thumb for I/O
workers = 2 * multiprocessing.cpu_count() + 1
Memory Considerations¶
# Limit based on memory
import psutil
available_memory = psutil.virtual_memory().available
memory_per_worker = 500 * 1024 * 1024 # 500MB estimated
max_workers = available_memory // memory_per_worker
workers = min(multiprocessing.cpu_count(), max_workers)
Monitoring¶
from dataclasses import dataclass
from datetime import datetime
import statistics
@dataclass
class PoolStats:
tasks_completed: int = 0
tasks_failed: int = 0
total_time: float = 0.0
task_times: list = None
def __post_init__(self):
self.task_times = []
def record_task(self, duration: float, success: bool):
if success:
self.tasks_completed += 1
else:
self.tasks_failed += 1
self.total_time += duration
self.task_times.append(duration)
def summary(self):
return {
'completed': self.tasks_completed,
'failed': self.tasks_failed,
'total_time': self.total_time,
'avg_time': statistics.mean(self.task_times) if self.task_times else 0,
'p95_time': statistics.quantiles(self.task_times, n=20)[18] if len(self.task_times) > 20 else 0,
}
Best Practices¶
- Size pools appropriately — CPU count for CPU-bound, more for I/O
- Handle failures — Don't let one failure crash the pool
- Use bounded queues — Prevent memory exhaustion
- Monitor performance — Track task times, failure rates
- Graceful shutdown — Drain queue before stopping
- Consider Celery/RQ — For production job queues
- Timeout tasks — Prevent hanging workers