Skip to content

Producer-Consumer Pattern

Decouple data production from consumption using queues.

The Problem

Without queue:
Producer ──────▶ Consumer
        blocked if consumer is slow

With queue:
Producer ──▶ [Queue] ──▶ Consumer
        never blocked (unless queue full)

Python Implementation

Basic asyncio Queue

import asyncio

async def producer(queue: asyncio.Queue, items: list):
    """Produce items into the queue."""
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
    # Signal end of production
    await queue.put(None)

async def consumer(queue: asyncio.Queue, name: str):
    """Consume items from the queue."""
    while True:
        item = await queue.get()
        if item is None:
            # Put sentinel back for other consumers
            await queue.put(None)
            break
        print(f"{name} consumed: {item}")
        await asyncio.sleep(0.1)  # Simulate work
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)

    # Start producer and consumers
    await asyncio.gather(
        producer(queue, list(range(20))),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
    )

asyncio.run(main())

Multiple Producers

import asyncio
from dataclasses import dataclass
from typing import Any

@dataclass
class WorkItem:
    producer_id: str
    data: Any

async def producer(queue: asyncio.Queue, producer_id: str, items: list):
    for item in items:
        work = WorkItem(producer_id=producer_id, data=item)
        await queue.put(work)

async def consumer(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"From {item.producer_id}: {item.data}")
        queue.task_done()

async def main():
    queue = asyncio.Queue()

    producers = [
        producer(queue, "P1", range(0, 10)),
        producer(queue, "P2", range(100, 110)),
    ]
    consumers = [consumer(queue) for _ in range(3)]

    # Run producers
    await asyncio.gather(*producers)

    # Signal consumers to stop
    for _ in consumers:
        await queue.put(None)

    # Wait for consumers
    await asyncio.gather(*consumers)

Thread-Safe Queue

from queue import Queue
from threading import Thread

def producer(queue: Queue, items: list):
    for item in items:
        queue.put(item)
    queue.put(None)  # Sentinel

def consumer(queue: Queue, name: str):
    while True:
        item = queue.get()
        if item is None:
            queue.put(None)  # Pass sentinel
            break
        print(f"{name}: {item}")
        queue.task_done()

queue = Queue(maxsize=10)

prod = Thread(target=producer, args=(queue, range(20)))
cons1 = Thread(target=consumer, args=(queue, "C1"))
cons2 = Thread(target=consumer, args=(queue, "C2"))

prod.start()
cons1.start()
cons2.start()

prod.join()
cons1.join()
cons2.join()

JavaScript Implementation

Basic Queue

class AsyncQueue<T> {
  private queue: T[] = [];
  private resolvers: ((value: T | null) => void)[] = [];
  private closed = false;

  async put(item: T): Promise<void> {
    if (this.closed) throw new Error('Queue closed');

    if (this.resolvers.length > 0) {
      const resolver = this.resolvers.shift()!;
      resolver(item);
    } else {
      this.queue.push(item);
    }
  }

  async get(): Promise<T | null> {
    if (this.queue.length > 0) {
      return this.queue.shift()!;
    }

    if (this.closed) return null;

    return new Promise(resolve => {
      this.resolvers.push(resolve);
    });
  }

  close(): void {
    this.closed = true;
    this.resolvers.forEach(r => r(null));
    this.resolvers = [];
  }
}

// Usage
async function producer(queue: AsyncQueue<number>) {
  for (let i = 0; i < 10; i++) {
    await queue.put(i);
    console.log(`Produced: ${i}`);
  }
  queue.close();
}

async function consumer(queue: AsyncQueue<number>, name: string) {
  while (true) {
    const item = await queue.get();
    if (item === null) break;
    console.log(`${name} consumed: ${item}`);
    await new Promise(r => setTimeout(r, 100));
  }
}

const queue = new AsyncQueue<number>();
Promise.all([
  producer(queue),
  consumer(queue, 'C1'),
  consumer(queue, 'C2'),
]);

Bounded Queue

Limit queue size to apply backpressure.

import asyncio

class BoundedQueue:
    def __init__(self, maxsize: int):
        self.queue = asyncio.Queue(maxsize=maxsize)
        self.producers_done = asyncio.Event()
        self.num_producers = 0

    async def put(self, item):
        """Put item, blocks if queue is full."""
        await self.queue.put(item)

    async def get(self):
        """Get item, blocks if queue is empty."""
        while True:
            try:
                return await asyncio.wait_for(
                    self.queue.get(),
                    timeout=0.1
                )
            except asyncio.TimeoutError:
                if self.producers_done.is_set() and self.queue.empty():
                    return None

    def producer_done(self):
        """Mark a producer as done."""
        self.num_producers -= 1
        if self.num_producers == 0:
            self.producers_done.set()

    def add_producer(self):
        """Register a producer."""
        self.num_producers += 1

async def slow_producer(queue: BoundedQueue):
    queue.add_producer()
    for i in range(100):
        await queue.put(i)  # Blocks if queue full
        print(f"Produced {i}, queue size: {queue.queue.qsize()}")
    queue.producer_done()

async def fast_consumer(queue: BoundedQueue):
    while True:
        item = await queue.get()
        if item is None:
            break
        await asyncio.sleep(0.01)  # Fast processing

async def main():
    queue = BoundedQueue(maxsize=5)

    await asyncio.gather(
        slow_producer(queue),
        fast_consumer(queue),
    )

Priority Queue

Process high-priority items first.

import asyncio
import heapq
from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PriorityItem:
    priority: int
    item: Any = field(compare=False)

class PriorityQueue:
    def __init__(self):
        self.heap = []
        self.condition = asyncio.Condition()

    async def put(self, item: Any, priority: int = 0):
        async with self.condition:
            heapq.heappush(self.heap, PriorityItem(priority, item))
            self.condition.notify()

    async def get(self):
        async with self.condition:
            while not self.heap:
                await self.condition.wait()
            return heapq.heappop(self.heap).item

# Usage
queue = PriorityQueue()

await queue.put("low priority task", priority=10)
await queue.put("high priority task", priority=1)
await queue.put("medium priority task", priority=5)

# Gets: high, medium, low
while True:
    task = await queue.get()

Work Stealing

Load balance by allowing idle consumers to steal work.

import asyncio
from collections import deque
import random

class WorkStealingQueue:
    def __init__(self, num_workers: int):
        self.queues = [deque() for _ in range(num_workers)]
        self.locks = [asyncio.Lock() for _ in range(num_workers)]

    async def push(self, worker_id: int, item):
        async with self.locks[worker_id]:
            self.queues[worker_id].append(item)

    async def pop(self, worker_id: int):
        # Try own queue first
        async with self.locks[worker_id]:
            if self.queues[worker_id]:
                return self.queues[worker_id].popleft()

        # Try stealing from others
        for other_id in range(len(self.queues)):
            if other_id != worker_id:
                async with self.locks[other_id]:
                    if self.queues[other_id]:
                        return self.queues[other_id].pop()  # Steal from back

        return None

Patterns

Fan-Out

One producer, multiple consumers.

async def fan_out(items, num_consumers=4):
    queue = asyncio.Queue()

    async def consumer(consumer_id):
        while True:
            item = await queue.get()
            if item is None:
                await queue.put(None)  # Pass sentinel
                break
            await process(item)
            queue.task_done()

    # Start consumers
    consumers = [
        asyncio.create_task(consumer(i))
        for i in range(num_consumers)
    ]

    # Produce
    for item in items:
        await queue.put(item)
    await queue.put(None)

    await asyncio.gather(*consumers)

Fan-In

Multiple producers, one consumer.

async def fan_in(producer_fns):
    queue = asyncio.Queue()
    active_producers = len(producer_fns)

    async def wrapped_producer(fn):
        nonlocal active_producers
        async for item in fn():
            await queue.put(item)
        active_producers -= 1
        if active_producers == 0:
            await queue.put(None)

    # Start producers
    for fn in producer_fns:
        asyncio.create_task(wrapped_producer(fn))

    # Consume
    while True:
        item = await queue.get()
        if item is None:
            break
        yield item

Best Practices

  1. Use bounded queues — Prevent memory exhaustion
  2. Handle shutdown gracefully — Drain queue, stop consumers
  3. Use sentinels — Signal end of production
  4. Consider priority — For heterogeneous workloads
  5. Monitor queue size — Alert on backlog
  6. Handle errors — Don't let one failure stop processing