Producer-Consumer Pattern¶
Decouple data production from consumption using queues.
The Problem¶
Without queue:
Producer ──────▶ Consumer
blocked if consumer is slow
With queue:
Producer ──▶ [Queue] ──▶ Consumer
never blocked (unless queue full)
Python Implementation¶
Basic asyncio Queue¶
import asyncio
async def producer(queue: asyncio.Queue, items: list):
"""Produce items into the queue."""
for item in items:
await queue.put(item)
print(f"Produced: {item}")
# Signal end of production
await queue.put(None)
async def consumer(queue: asyncio.Queue, name: str):
"""Consume items from the queue."""
while True:
item = await queue.get()
if item is None:
# Put sentinel back for other consumers
await queue.put(None)
break
print(f"{name} consumed: {item}")
await asyncio.sleep(0.1) # Simulate work
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# Start producer and consumers
await asyncio.gather(
producer(queue, list(range(20))),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
)
asyncio.run(main())
Multiple Producers¶
import asyncio
from dataclasses import dataclass
from typing import Any
@dataclass
class WorkItem:
producer_id: str
data: Any
async def producer(queue: asyncio.Queue, producer_id: str, items: list):
for item in items:
work = WorkItem(producer_id=producer_id, data=item)
await queue.put(work)
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
print(f"From {item.producer_id}: {item.data}")
queue.task_done()
async def main():
queue = asyncio.Queue()
producers = [
producer(queue, "P1", range(0, 10)),
producer(queue, "P2", range(100, 110)),
]
consumers = [consumer(queue) for _ in range(3)]
# Run producers
await asyncio.gather(*producers)
# Signal consumers to stop
for _ in consumers:
await queue.put(None)
# Wait for consumers
await asyncio.gather(*consumers)
Thread-Safe Queue¶
from queue import Queue
from threading import Thread
def producer(queue: Queue, items: list):
for item in items:
queue.put(item)
queue.put(None) # Sentinel
def consumer(queue: Queue, name: str):
while True:
item = queue.get()
if item is None:
queue.put(None) # Pass sentinel
break
print(f"{name}: {item}")
queue.task_done()
queue = Queue(maxsize=10)
prod = Thread(target=producer, args=(queue, range(20)))
cons1 = Thread(target=consumer, args=(queue, "C1"))
cons2 = Thread(target=consumer, args=(queue, "C2"))
prod.start()
cons1.start()
cons2.start()
prod.join()
cons1.join()
cons2.join()
JavaScript Implementation¶
Basic Queue¶
class AsyncQueue<T> {
private queue: T[] = [];
private resolvers: ((value: T | null) => void)[] = [];
private closed = false;
async put(item: T): Promise<void> {
if (this.closed) throw new Error('Queue closed');
if (this.resolvers.length > 0) {
const resolver = this.resolvers.shift()!;
resolver(item);
} else {
this.queue.push(item);
}
}
async get(): Promise<T | null> {
if (this.queue.length > 0) {
return this.queue.shift()!;
}
if (this.closed) return null;
return new Promise(resolve => {
this.resolvers.push(resolve);
});
}
close(): void {
this.closed = true;
this.resolvers.forEach(r => r(null));
this.resolvers = [];
}
}
// Usage
async function producer(queue: AsyncQueue<number>) {
for (let i = 0; i < 10; i++) {
await queue.put(i);
console.log(`Produced: ${i}`);
}
queue.close();
}
async function consumer(queue: AsyncQueue<number>, name: string) {
while (true) {
const item = await queue.get();
if (item === null) break;
console.log(`${name} consumed: ${item}`);
await new Promise(r => setTimeout(r, 100));
}
}
const queue = new AsyncQueue<number>();
Promise.all([
producer(queue),
consumer(queue, 'C1'),
consumer(queue, 'C2'),
]);
Bounded Queue¶
Limit queue size to apply backpressure.
import asyncio
class BoundedQueue:
def __init__(self, maxsize: int):
self.queue = asyncio.Queue(maxsize=maxsize)
self.producers_done = asyncio.Event()
self.num_producers = 0
async def put(self, item):
"""Put item, blocks if queue is full."""
await self.queue.put(item)
async def get(self):
"""Get item, blocks if queue is empty."""
while True:
try:
return await asyncio.wait_for(
self.queue.get(),
timeout=0.1
)
except asyncio.TimeoutError:
if self.producers_done.is_set() and self.queue.empty():
return None
def producer_done(self):
"""Mark a producer as done."""
self.num_producers -= 1
if self.num_producers == 0:
self.producers_done.set()
def add_producer(self):
"""Register a producer."""
self.num_producers += 1
async def slow_producer(queue: BoundedQueue):
queue.add_producer()
for i in range(100):
await queue.put(i) # Blocks if queue full
print(f"Produced {i}, queue size: {queue.queue.qsize()}")
queue.producer_done()
async def fast_consumer(queue: BoundedQueue):
while True:
item = await queue.get()
if item is None:
break
await asyncio.sleep(0.01) # Fast processing
async def main():
queue = BoundedQueue(maxsize=5)
await asyncio.gather(
slow_producer(queue),
fast_consumer(queue),
)
Priority Queue¶
Process high-priority items first.
import asyncio
import heapq
from dataclasses import dataclass, field
from typing import Any
@dataclass(order=True)
class PriorityItem:
priority: int
item: Any = field(compare=False)
class PriorityQueue:
def __init__(self):
self.heap = []
self.condition = asyncio.Condition()
async def put(self, item: Any, priority: int = 0):
async with self.condition:
heapq.heappush(self.heap, PriorityItem(priority, item))
self.condition.notify()
async def get(self):
async with self.condition:
while not self.heap:
await self.condition.wait()
return heapq.heappop(self.heap).item
# Usage
queue = PriorityQueue()
await queue.put("low priority task", priority=10)
await queue.put("high priority task", priority=1)
await queue.put("medium priority task", priority=5)
# Gets: high, medium, low
while True:
task = await queue.get()
Work Stealing¶
Load balance by allowing idle consumers to steal work.
import asyncio
from collections import deque
import random
class WorkStealingQueue:
def __init__(self, num_workers: int):
self.queues = [deque() for _ in range(num_workers)]
self.locks = [asyncio.Lock() for _ in range(num_workers)]
async def push(self, worker_id: int, item):
async with self.locks[worker_id]:
self.queues[worker_id].append(item)
async def pop(self, worker_id: int):
# Try own queue first
async with self.locks[worker_id]:
if self.queues[worker_id]:
return self.queues[worker_id].popleft()
# Try stealing from others
for other_id in range(len(self.queues)):
if other_id != worker_id:
async with self.locks[other_id]:
if self.queues[other_id]:
return self.queues[other_id].pop() # Steal from back
return None
Patterns¶
Fan-Out¶
One producer, multiple consumers.
async def fan_out(items, num_consumers=4):
queue = asyncio.Queue()
async def consumer(consumer_id):
while True:
item = await queue.get()
if item is None:
await queue.put(None) # Pass sentinel
break
await process(item)
queue.task_done()
# Start consumers
consumers = [
asyncio.create_task(consumer(i))
for i in range(num_consumers)
]
# Produce
for item in items:
await queue.put(item)
await queue.put(None)
await asyncio.gather(*consumers)
Fan-In¶
Multiple producers, one consumer.
async def fan_in(producer_fns):
queue = asyncio.Queue()
active_producers = len(producer_fns)
async def wrapped_producer(fn):
nonlocal active_producers
async for item in fn():
await queue.put(item)
active_producers -= 1
if active_producers == 0:
await queue.put(None)
# Start producers
for fn in producer_fns:
asyncio.create_task(wrapped_producer(fn))
# Consume
while True:
item = await queue.get()
if item is None:
break
yield item
Best Practices¶
- Use bounded queues — Prevent memory exhaustion
- Handle shutdown gracefully — Drain queue, stop consumers
- Use sentinels — Signal end of production
- Consider priority — For heterogeneous workloads
- Monitor queue size — Alert on backlog
- Handle errors — Don't let one failure stop processing