Skip to content

Event-Driven: Event Sourcing, CQRS, and Sagas

Build loosely coupled, scalable systems.

Core Concepts

Events vs Commands

Command: "CreateOrder"       → Imperative, expects action
Event:   "OrderCreated"      → Past tense, fact that happened

Commands are requests, events are notifications.

Event Flow

┌─────────┐    ┌─────────────┐    ┌─────────────┐
│ Service │───►│ Message     │───►│ Consumer    │
│    A    │    │ Broker      │    │ Services    │
└─────────┘    └─────────────┘    └─────────────┘
                    ├───► Email Service
                    ├───► Analytics
                    └───► Inventory

Event Types

Domain Events

Business-meaningful events:

from dataclasses import dataclass
from datetime import datetime
from uuid import UUID, uuid4

@dataclass
class DomainEvent:
    event_id: UUID
    occurred_at: datetime
    aggregate_id: str

@dataclass
class OrderCreated(DomainEvent):
    customer_id: str
    items: list[dict]
    total: float

@dataclass
class PaymentProcessed(DomainEvent):
    order_id: str
    amount: float
    payment_method: str

@dataclass
class OrderShipped(DomainEvent):
    order_id: str
    tracking_number: str
    carrier: str

Integration Events

Cross-service communication:

@dataclass
class IntegrationEvent:
    event_id: UUID
    timestamp: datetime
    source: str  # Service that produced the event

@dataclass
class InventoryReserved(IntegrationEvent):
    order_id: str
    items: list[dict]

@dataclass
class ShipmentCreated(IntegrationEvent):
    order_id: str
    shipment_id: str

Message Brokers

Redis Pub/Sub (Simple)

import redis
import json
from typing import Callable

class EventBus:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.pubsub = self.redis.pubsub()
        self.handlers: dict[str, list[Callable]] = {}

    async def publish(self, event_type: str, data: dict):
        message = {
            "type": event_type,
            "data": data,
            "timestamp": datetime.utcnow().isoformat(),
        }
        await self.redis.publish(event_type, json.dumps(message))

    def subscribe(self, event_type: str, handler: Callable):
        if event_type not in self.handlers:
            self.handlers[event_type] = []
            self.pubsub.subscribe(event_type)
        self.handlers[event_type].append(handler)

    async def listen(self):
        for message in self.pubsub.listen():
            if message["type"] == "message":
                event_type = message["channel"]
                data = json.loads(message["data"])

                for handler in self.handlers.get(event_type, []):
                    await handler(data)

# Usage
bus = EventBus("redis://localhost")

@bus.subscribe("order.created")
async def handle_order_created(event: dict):
    await send_confirmation_email(event["data"]["customer_email"])
    await reserve_inventory(event["data"]["items"])

# Publish
await bus.publish("order.created", {
    "order_id": "123",
    "customer_email": "user@example.com",
    "items": [{"sku": "ABC", "quantity": 2}],
})

RabbitMQ (Robust)

import aio_pika
import json

class RabbitMQBus:
    def __init__(self, url: str):
        self.url = url
        self.connection = None
        self.channel = None

    async def connect(self):
        self.connection = await aio_pika.connect_robust(self.url)
        self.channel = await self.connection.channel()

    async def publish(self, exchange: str, routing_key: str, message: dict):
        exchange_obj = await self.channel.declare_exchange(
            exchange,
            aio_pika.ExchangeType.TOPIC,
        )

        await exchange_obj.publish(
            aio_pika.Message(
                body=json.dumps(message).encode(),
                content_type="application/json",
                delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
            ),
            routing_key=routing_key,
        )

    async def subscribe(
        self,
        exchange: str,
        queue: str,
        routing_key: str,
        handler: Callable,
    ):
        exchange_obj = await self.channel.declare_exchange(
            exchange,
            aio_pika.ExchangeType.TOPIC,
        )

        queue_obj = await self.channel.declare_queue(queue, durable=True)
        await queue_obj.bind(exchange_obj, routing_key)

        async def process_message(message: aio_pika.IncomingMessage):
            async with message.process():
                data = json.loads(message.body)
                await handler(data)

        await queue_obj.consume(process_message)

# Usage
bus = RabbitMQBus("amqp://localhost")
await bus.connect()

# Publisher (Order Service)
await bus.publish(
    exchange="orders",
    routing_key="order.created",
    message={"order_id": "123", "total": 99.99},
)

# Consumer (Notification Service)
async def handle_order(data):
    await send_email(data["order_id"])

await bus.subscribe(
    exchange="orders",
    queue="notifications",
    routing_key="order.*",
    handler=handle_order,
)

Event Sourcing

Store state as a sequence of events:

from dataclasses import dataclass, field
from typing import List
from abc import ABC, abstractmethod

@dataclass
class Event:
    aggregate_id: str
    version: int
    timestamp: datetime

class Aggregate(ABC):
    def __init__(self, id: str):
        self.id = id
        self.version = 0
        self._pending_events: List[Event] = []

    @abstractmethod
    def apply(self, event: Event):
        pass

    def load_from_history(self, events: List[Event]):
        for event in events:
            self.apply(event)
            self.version = event.version

    def _raise_event(self, event: Event):
        self.apply(event)
        self._pending_events.append(event)

@dataclass
class OrderCreated(Event):
    customer_id: str
    items: list

@dataclass
class OrderItemAdded(Event):
    item_id: str
    quantity: int
    price: float

@dataclass
class OrderSubmitted(Event):
    pass

class Order(Aggregate):
    def __init__(self, id: str):
        super().__init__(id)
        self.customer_id: str = ""
        self.items: list = []
        self.status: str = "draft"
        self.total: float = 0

    def create(self, customer_id: str):
        self._raise_event(OrderCreated(
            aggregate_id=self.id,
            version=self.version + 1,
            timestamp=datetime.utcnow(),
            customer_id=customer_id,
            items=[],
        ))

    def add_item(self, item_id: str, quantity: int, price: float):
        self._raise_event(OrderItemAdded(
            aggregate_id=self.id,
            version=self.version + 1,
            timestamp=datetime.utcnow(),
            item_id=item_id,
            quantity=quantity,
            price=price,
        ))

    def submit(self):
        if not self.items:
            raise ValueError("Cannot submit empty order")

        self._raise_event(OrderSubmitted(
            aggregate_id=self.id,
            version=self.version + 1,
            timestamp=datetime.utcnow(),
        ))

    def apply(self, event: Event):
        if isinstance(event, OrderCreated):
            self.customer_id = event.customer_id
            self.status = "draft"
        elif isinstance(event, OrderItemAdded):
            self.items.append({
                "item_id": event.item_id,
                "quantity": event.quantity,
                "price": event.price,
            })
            self.total += event.quantity * event.price
        elif isinstance(event, OrderSubmitted):
            self.status = "submitted"

Event Store

class EventStore:
    def __init__(self, db):
        self.db = db

    async def save_events(self, aggregate_id: str, events: List[Event], expected_version: int):
        async with self.db.transaction():
            # Optimistic concurrency check
            current_version = await self.db.fetchval(
                "SELECT MAX(version) FROM events WHERE aggregate_id = $1",
                aggregate_id,
            )

            if current_version != expected_version:
                raise ConcurrencyError("Aggregate was modified")

            for event in events:
                await self.db.execute(
                    """
                    INSERT INTO events (aggregate_id, version, type, data, timestamp)
                    VALUES ($1, $2, $3, $4, $5)
                    """,
                    aggregate_id,
                    event.version,
                    type(event).__name__,
                    json.dumps(asdict(event)),
                    event.timestamp,
                )

    async def get_events(self, aggregate_id: str) -> List[Event]:
        rows = await self.db.fetch(
            "SELECT * FROM events WHERE aggregate_id = $1 ORDER BY version",
            aggregate_id,
        )
        return [self._deserialize(row) for row in rows]

CQRS Pattern

Separate read and write models:

# Command Side (Write)
class OrderCommandHandler:
    def __init__(self, event_store: EventStore, event_bus: EventBus):
        self.event_store = event_store
        self.event_bus = event_bus

    async def handle_create_order(self, command: CreateOrderCommand):
        order = Order(command.order_id)
        order.create(command.customer_id)

        for item in command.items:
            order.add_item(item["id"], item["quantity"], item["price"])

        await self.event_store.save_events(
            order.id,
            order._pending_events,
            expected_version=0,
        )

        for event in order._pending_events:
            await self.event_bus.publish(type(event).__name__, asdict(event))

# Query Side (Read)
class OrderQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db

    async def get_order(self, order_id: str) -> OrderDTO:
        return await self.read_db.fetchone(
            "SELECT * FROM order_view WHERE id = $1",
            order_id,
        )

    async def get_customer_orders(self, customer_id: str) -> List[OrderDTO]:
        return await self.read_db.fetch(
            "SELECT * FROM order_view WHERE customer_id = $1",
            customer_id,
        )

# Projection (Event Handler that updates read model)
class OrderProjection:
    def __init__(self, read_db):
        self.read_db = read_db

    async def handle_order_created(self, event: dict):
        await self.read_db.execute(
            """
            INSERT INTO order_view (id, customer_id, status, total, created_at)
            VALUES ($1, $2, 'draft', 0, $3)
            """,
            event["aggregate_id"],
            event["customer_id"],
            event["timestamp"],
        )

    async def handle_order_item_added(self, event: dict):
        await self.read_db.execute(
            """
            UPDATE order_view
            SET total = total + $2
            WHERE id = $1
            """,
            event["aggregate_id"],
            event["quantity"] * event["price"],
        )

Saga Pattern

Coordinate long-running transactions across services:

class OrderSaga:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.state: dict = {}

    async def handle_order_created(self, event: dict):
        self.state[event["order_id"]] = {"status": "pending"}

        # Request inventory reservation
        await self.event_bus.publish("inventory.reserve", {
            "order_id": event["order_id"],
            "items": event["items"],
        })

    async def handle_inventory_reserved(self, event: dict):
        order_id = event["order_id"]
        self.state[order_id]["inventory"] = "reserved"

        # Request payment
        await self.event_bus.publish("payment.process", {
            "order_id": order_id,
            "amount": self.state[order_id]["total"],
        })

    async def handle_payment_processed(self, event: dict):
        order_id = event["order_id"]
        self.state[order_id]["payment"] = "processed"

        # Complete the order
        await self.event_bus.publish("order.complete", {
            "order_id": order_id,
        })

    async def handle_payment_failed(self, event: dict):
        order_id = event["order_id"]

        # Compensating action: release inventory
        await self.event_bus.publish("inventory.release", {
            "order_id": order_id,
        })

        # Cancel order
        await self.event_bus.publish("order.cancel", {
            "order_id": order_id,
            "reason": "Payment failed",
        })

Best Practices

Practice Benefit
Use past tense for events Clear semantics
Make events immutable Reliable history
Include all needed data Self-contained events
Idempotent handlers Safe retries
Version events Schema evolution
Monitor dead letters Catch failures

When to Use

Pattern When
Simple events Notifications, loose coupling
Event sourcing Audit trail, temporal queries
CQRS Different read/write patterns
Saga Distributed transactions