Skip to content

Logging

Structured logging for observability and debugging.

Structured Logging with structlog

Setup

import structlog
import logging

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer()
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    context_class=dict,
    logger_factory=structlog.PrintLoggerFactory(),
    cache_logger_on_first_use=True,
)

Basic Usage

log = structlog.get_logger()

# Simple message
log.info("application_started")

# With context
log.info("user_login", user_id=123, ip="192.168.1.1")

# Different levels
log.debug("cache_hit", key="user:123")
log.warning("rate_limit_approaching", current=95, limit=100)
log.error("payment_failed", order_id=456, error="Card declined")

Output:

{"event": "user_login", "user_id": 123, "ip": "192.168.1.1", "level": "info", "timestamp": "2024-01-15T10:30:00Z"}

Request Context

FastAPI Middleware

import structlog
from contextvars import ContextVar
from fastapi import Request
import uuid

request_id: ContextVar[str] = ContextVar("request_id", default="")

@app.middleware("http")
async def logging_middleware(request: Request, call_next):
    # Generate or extract request ID
    req_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
    request_id.set(req_id)

    # Bind context for all logs in this request
    structlog.contextvars.clear_contextvars()
    structlog.contextvars.bind_contextvars(
        request_id=req_id,
        path=request.url.path,
        method=request.method,
    )

    log = structlog.get_logger()
    log.info("request_started")

    response = await call_next(request)

    log.info("request_completed", status_code=response.status_code)

    response.headers["X-Request-ID"] = req_id
    return response

Adding User Context

from fastapi import Depends

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = await verify_token(token)

    # Add user to log context
    structlog.contextvars.bind_contextvars(
        user_id=user.id,
        user_email=user.email,
    )

    return user

# All subsequent logs will include user_id and user_email

Log Levels

Level When to Use
DEBUG Detailed diagnostic info (not in production)
INFO Normal operation, business events
WARNING Unexpected but handled situations
ERROR Errors that affect functionality
CRITICAL System-wide failures

Examples

# DEBUG: Verbose diagnostic info
log.debug("cache_lookup", key="user:123", hit=True)

# INFO: Business events, state changes
log.info("order_created", order_id=456, total=99.99)
log.info("user_registered", user_id=789)

# WARNING: Degraded but functional
log.warning("rate_limit_approaching", current=95, limit=100)
log.warning("retry_attempt", attempt=2, max_attempts=3)

# ERROR: Failed operations
log.error("payment_failed", order_id=456, error="Card declined")
log.error("external_api_error", service="stripe", status=500)

# CRITICAL: System failures
log.critical("database_connection_lost")
log.critical("out_of_disk_space", available_mb=10)

Error Logging

Exception Logging

try:
    result = risky_operation()
except Exception as e:
    log.exception("operation_failed", error=str(e))
    # Includes full stack trace

# Or with exc_info
log.error("operation_failed", exc_info=True)

Error Context

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    log = structlog.get_logger()
    log.error(
        "unhandled_exception",
        error_type=type(exc).__name__,
        error_message=str(exc),
        path=request.url.path,
        method=request.method,
        exc_info=True,
    )
    return JSONResponse(
        status_code=500,
        content={"detail": "Internal server error"}
    )

Log Sanitization

Redacting Sensitive Data

SENSITIVE_KEYS = {"password", "token", "secret", "api_key", "authorization"}

def sanitize_log_data(event_dict):
    for key in event_dict:
        if any(sensitive in key.lower() for sensitive in SENSITIVE_KEYS):
            event_dict[key] = "[REDACTED]"
    return event_dict

structlog.configure(
    processors=[
        # ... other processors
        sanitize_log_data,
        structlog.processors.JSONRenderer()
    ]
)

PII Handling

def mask_email(email: str) -> str:
    local, domain = email.split("@")
    return f"{local[:2]}***@{domain}"

def mask_ip(ip: str) -> str:
    parts = ip.split(".")
    return f"{parts[0]}.{parts[1]}.*.*"

# Usage
log.info("user_action",
    email=mask_email(user.email),  # "jo***@example.com"
    ip=mask_ip(request.client.host)  # "192.168.*.*"
)

Performance Logging

Timing Operations

import time
from contextlib import contextmanager

@contextmanager
def log_timing(operation: str, **context):
    log = structlog.get_logger()
    start = time.perf_counter()
    try:
        yield
    finally:
        duration_ms = (time.perf_counter() - start) * 1000
        log.info(f"{operation}_completed", duration_ms=round(duration_ms, 2), **context)

# Usage
with log_timing("database_query", table="users"):
    users = await db.execute(select(User))

Slow Operation Warnings

@contextmanager
def log_slow_operation(operation: str, threshold_ms: float = 100):
    log = structlog.get_logger()
    start = time.perf_counter()
    try:
        yield
    finally:
        duration_ms = (time.perf_counter() - start) * 1000
        if duration_ms > threshold_ms:
            log.warning(f"{operation}_slow",
                duration_ms=round(duration_ms, 2),
                threshold_ms=threshold_ms
            )

Configuration by Environment

import os

def configure_logging():
    log_level = os.getenv("LOG_LEVEL", "INFO")
    log_format = os.getenv("LOG_FORMAT", "json")

    processors = [
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
    ]

    if log_format == "json":
        processors.append(structlog.processors.JSONRenderer())
    else:
        # Human-readable for development
        processors.append(structlog.dev.ConsoleRenderer())

    structlog.configure(
        processors=processors,
        wrapper_class=structlog.make_filtering_bound_logger(
            getattr(logging, log_level.upper())
        ),
    )

Log Aggregation

Sending to External Services

import httpx

class LogShipper:
    def __init__(self, endpoint: str, api_key: str):
        self.endpoint = endpoint
        self.api_key = api_key
        self.buffer = []
        self.buffer_size = 100

    async def ship(self, log_entry: dict):
        self.buffer.append(log_entry)
        if len(self.buffer) >= self.buffer_size:
            await self.flush()

    async def flush(self):
        if not self.buffer:
            return
        async with httpx.AsyncClient() as client:
            await client.post(
                self.endpoint,
                json={"logs": self.buffer},
                headers={"Authorization": f"Bearer {self.api_key}"}
            )
        self.buffer = []

Best Practices

  1. Use structured logging — JSON over plain text
  2. Include context — Request ID, user ID, relevant IDs
  3. Use appropriate levels — Don't overuse ERROR
  4. Sanitize sensitive data — Passwords, tokens, PII
  5. Log business events — Not just errors
  6. Include timing — Duration for slow operations
  7. Keep logs actionable — What happened, why it matters

See Also

  • Error Handling -- Exception hierarchy and structlog integration for error logging
  • Tracing -- Distributed tracing for correlating logs across services