Event-Driven: Event Sourcing, CQRS, and Sagas¶
Build loosely coupled, scalable systems.
Core Concepts¶
Events vs Commands¶
Command: "CreateOrder" → Imperative, expects action
Event: "OrderCreated" → Past tense, fact that happened
Commands are requests, events are notifications.
Event Flow¶
┌─────────┐ ┌─────────────┐ ┌─────────────┐
│ Service │───►│ Message │───►│ Consumer │
│ A │ │ Broker │ │ Services │
└─────────┘ └─────────────┘ └─────────────┘
│
├───► Email Service
├───► Analytics
└───► Inventory
Event Types¶
Domain Events¶
Business-meaningful events:
from dataclasses import dataclass
from datetime import datetime
from uuid import UUID, uuid4
@dataclass
class DomainEvent:
event_id: UUID
occurred_at: datetime
aggregate_id: str
@dataclass
class OrderCreated(DomainEvent):
customer_id: str
items: list[dict]
total: float
@dataclass
class PaymentProcessed(DomainEvent):
order_id: str
amount: float
payment_method: str
@dataclass
class OrderShipped(DomainEvent):
order_id: str
tracking_number: str
carrier: str
Integration Events¶
Cross-service communication:
@dataclass
class IntegrationEvent:
event_id: UUID
timestamp: datetime
source: str # Service that produced the event
@dataclass
class InventoryReserved(IntegrationEvent):
order_id: str
items: list[dict]
@dataclass
class ShipmentCreated(IntegrationEvent):
order_id: str
shipment_id: str
Message Brokers¶
Redis Pub/Sub (Simple)¶
import redis
import json
from typing import Callable
class EventBus:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.pubsub = self.redis.pubsub()
self.handlers: dict[str, list[Callable]] = {}
async def publish(self, event_type: str, data: dict):
message = {
"type": event_type,
"data": data,
"timestamp": datetime.utcnow().isoformat(),
}
await self.redis.publish(event_type, json.dumps(message))
def subscribe(self, event_type: str, handler: Callable):
if event_type not in self.handlers:
self.handlers[event_type] = []
self.pubsub.subscribe(event_type)
self.handlers[event_type].append(handler)
async def listen(self):
for message in self.pubsub.listen():
if message["type"] == "message":
event_type = message["channel"]
data = json.loads(message["data"])
for handler in self.handlers.get(event_type, []):
await handler(data)
# Usage
bus = EventBus("redis://localhost")
@bus.subscribe("order.created")
async def handle_order_created(event: dict):
await send_confirmation_email(event["data"]["customer_email"])
await reserve_inventory(event["data"]["items"])
# Publish
await bus.publish("order.created", {
"order_id": "123",
"customer_email": "user@example.com",
"items": [{"sku": "ABC", "quantity": 2}],
})
RabbitMQ (Robust)¶
import aio_pika
import json
class RabbitMQBus:
def __init__(self, url: str):
self.url = url
self.connection = None
self.channel = None
async def connect(self):
self.connection = await aio_pika.connect_robust(self.url)
self.channel = await self.connection.channel()
async def publish(self, exchange: str, routing_key: str, message: dict):
exchange_obj = await self.channel.declare_exchange(
exchange,
aio_pika.ExchangeType.TOPIC,
)
await exchange_obj.publish(
aio_pika.Message(
body=json.dumps(message).encode(),
content_type="application/json",
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
),
routing_key=routing_key,
)
async def subscribe(
self,
exchange: str,
queue: str,
routing_key: str,
handler: Callable,
):
exchange_obj = await self.channel.declare_exchange(
exchange,
aio_pika.ExchangeType.TOPIC,
)
queue_obj = await self.channel.declare_queue(queue, durable=True)
await queue_obj.bind(exchange_obj, routing_key)
async def process_message(message: aio_pika.IncomingMessage):
async with message.process():
data = json.loads(message.body)
await handler(data)
await queue_obj.consume(process_message)
# Usage
bus = RabbitMQBus("amqp://localhost")
await bus.connect()
# Publisher (Order Service)
await bus.publish(
exchange="orders",
routing_key="order.created",
message={"order_id": "123", "total": 99.99},
)
# Consumer (Notification Service)
async def handle_order(data):
await send_email(data["order_id"])
await bus.subscribe(
exchange="orders",
queue="notifications",
routing_key="order.*",
handler=handle_order,
)
Event Sourcing¶
Store state as a sequence of events:
from dataclasses import dataclass, field
from typing import List
from abc import ABC, abstractmethod
@dataclass
class Event:
aggregate_id: str
version: int
timestamp: datetime
class Aggregate(ABC):
def __init__(self, id: str):
self.id = id
self.version = 0
self._pending_events: List[Event] = []
@abstractmethod
def apply(self, event: Event):
pass
def load_from_history(self, events: List[Event]):
for event in events:
self.apply(event)
self.version = event.version
def _raise_event(self, event: Event):
self.apply(event)
self._pending_events.append(event)
@dataclass
class OrderCreated(Event):
customer_id: str
items: list
@dataclass
class OrderItemAdded(Event):
item_id: str
quantity: int
price: float
@dataclass
class OrderSubmitted(Event):
pass
class Order(Aggregate):
def __init__(self, id: str):
super().__init__(id)
self.customer_id: str = ""
self.items: list = []
self.status: str = "draft"
self.total: float = 0
def create(self, customer_id: str):
self._raise_event(OrderCreated(
aggregate_id=self.id,
version=self.version + 1,
timestamp=datetime.utcnow(),
customer_id=customer_id,
items=[],
))
def add_item(self, item_id: str, quantity: int, price: float):
self._raise_event(OrderItemAdded(
aggregate_id=self.id,
version=self.version + 1,
timestamp=datetime.utcnow(),
item_id=item_id,
quantity=quantity,
price=price,
))
def submit(self):
if not self.items:
raise ValueError("Cannot submit empty order")
self._raise_event(OrderSubmitted(
aggregate_id=self.id,
version=self.version + 1,
timestamp=datetime.utcnow(),
))
def apply(self, event: Event):
if isinstance(event, OrderCreated):
self.customer_id = event.customer_id
self.status = "draft"
elif isinstance(event, OrderItemAdded):
self.items.append({
"item_id": event.item_id,
"quantity": event.quantity,
"price": event.price,
})
self.total += event.quantity * event.price
elif isinstance(event, OrderSubmitted):
self.status = "submitted"
Event Store¶
class EventStore:
def __init__(self, db):
self.db = db
async def save_events(self, aggregate_id: str, events: List[Event], expected_version: int):
async with self.db.transaction():
# Optimistic concurrency check
current_version = await self.db.fetchval(
"SELECT MAX(version) FROM events WHERE aggregate_id = $1",
aggregate_id,
)
if current_version != expected_version:
raise ConcurrencyError("Aggregate was modified")
for event in events:
await self.db.execute(
"""
INSERT INTO events (aggregate_id, version, type, data, timestamp)
VALUES ($1, $2, $3, $4, $5)
""",
aggregate_id,
event.version,
type(event).__name__,
json.dumps(asdict(event)),
event.timestamp,
)
async def get_events(self, aggregate_id: str) -> List[Event]:
rows = await self.db.fetch(
"SELECT * FROM events WHERE aggregate_id = $1 ORDER BY version",
aggregate_id,
)
return [self._deserialize(row) for row in rows]
CQRS Pattern¶
Separate read and write models:
# Command Side (Write)
class OrderCommandHandler:
def __init__(self, event_store: EventStore, event_bus: EventBus):
self.event_store = event_store
self.event_bus = event_bus
async def handle_create_order(self, command: CreateOrderCommand):
order = Order(command.order_id)
order.create(command.customer_id)
for item in command.items:
order.add_item(item["id"], item["quantity"], item["price"])
await self.event_store.save_events(
order.id,
order._pending_events,
expected_version=0,
)
for event in order._pending_events:
await self.event_bus.publish(type(event).__name__, asdict(event))
# Query Side (Read)
class OrderQueryHandler:
def __init__(self, read_db):
self.read_db = read_db
async def get_order(self, order_id: str) -> OrderDTO:
return await self.read_db.fetchone(
"SELECT * FROM order_view WHERE id = $1",
order_id,
)
async def get_customer_orders(self, customer_id: str) -> List[OrderDTO]:
return await self.read_db.fetch(
"SELECT * FROM order_view WHERE customer_id = $1",
customer_id,
)
# Projection (Event Handler that updates read model)
class OrderProjection:
def __init__(self, read_db):
self.read_db = read_db
async def handle_order_created(self, event: dict):
await self.read_db.execute(
"""
INSERT INTO order_view (id, customer_id, status, total, created_at)
VALUES ($1, $2, 'draft', 0, $3)
""",
event["aggregate_id"],
event["customer_id"],
event["timestamp"],
)
async def handle_order_item_added(self, event: dict):
await self.read_db.execute(
"""
UPDATE order_view
SET total = total + $2
WHERE id = $1
""",
event["aggregate_id"],
event["quantity"] * event["price"],
)
Saga Pattern¶
Coordinate long-running transactions across services:
class OrderSaga:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.state: dict = {}
async def handle_order_created(self, event: dict):
self.state[event["order_id"]] = {"status": "pending"}
# Request inventory reservation
await self.event_bus.publish("inventory.reserve", {
"order_id": event["order_id"],
"items": event["items"],
})
async def handle_inventory_reserved(self, event: dict):
order_id = event["order_id"]
self.state[order_id]["inventory"] = "reserved"
# Request payment
await self.event_bus.publish("payment.process", {
"order_id": order_id,
"amount": self.state[order_id]["total"],
})
async def handle_payment_processed(self, event: dict):
order_id = event["order_id"]
self.state[order_id]["payment"] = "processed"
# Complete the order
await self.event_bus.publish("order.complete", {
"order_id": order_id,
})
async def handle_payment_failed(self, event: dict):
order_id = event["order_id"]
# Compensating action: release inventory
await self.event_bus.publish("inventory.release", {
"order_id": order_id,
})
# Cancel order
await self.event_bus.publish("order.cancel", {
"order_id": order_id,
"reason": "Payment failed",
})
Best Practices¶
| Practice | Benefit |
|---|---|
| Use past tense for events | Clear semantics |
| Make events immutable | Reliable history |
| Include all needed data | Self-contained events |
| Idempotent handlers | Safe retries |
| Version events | Schema evolution |
| Monitor dead letters | Catch failures |
When to Use¶
| Pattern | When |
|---|---|
| Simple events | Notifications, loose coupling |
| Event sourcing | Audit trail, temporal queries |
| CQRS | Different read/write patterns |
| Saga | Distributed transactions |